network.cpp 13.4 KB
Newer Older
1
2
3
4
/*!
 * Copyright (c) 2016 Microsoft Corporation. All rights reserved.
 * Licensed under the MIT License. See LICENSE file in the project root for license information.
 */
Guolin Ke's avatar
Guolin Ke committed
5
6

#include <cstdlib>
7
8
#include <cstring>

9
10
11
12
#include <LightGBM/network.h>

#include <LightGBM/utils/common.h>

13
#include "linkers.h"
Guolin Ke's avatar
Guolin Ke committed
14
15
16

namespace LightGBM {

Hui Xue's avatar
Hui Xue committed
17
// static member definition
18
19
THREAD_LOCAL int Network::num_machines_ = 1;
THREAD_LOCAL int Network::rank_ = 0;
20
21
22
THREAD_LOCAL std::unique_ptr<Linkers> Network::linkers_;
THREAD_LOCAL BruckMap Network::bruck_map_;
THREAD_LOCAL RecursiveHalvingMap Network::recursive_halving_map_;
Guolin Ke's avatar
Guolin Ke committed
23
24
THREAD_LOCAL std::vector<comm_size_t> Network::block_start_;
THREAD_LOCAL std::vector<comm_size_t>  Network::block_len_;
25
THREAD_LOCAL comm_size_t Network::buffer_size_ = 0;
26
THREAD_LOCAL std::vector<char> Network::buffer_;
27
28
THREAD_LOCAL ReduceScatterFunction Network::reduce_scatter_ext_fun_ = nullptr;
THREAD_LOCAL AllgatherFunction Network::allgather_ext_fun_ = nullptr;
ww's avatar
ww committed
29

Guolin Ke's avatar
Guolin Ke committed
30

Guolin Ke's avatar
Guolin Ke committed
31
void Network::Init(Config config) {
32
33
34
35
36
37
  if (config.num_machines > 1) {
    linkers_.reset(new Linkers(config));
    rank_ = linkers_->rank();
    num_machines_ = linkers_->num_machines();
    bruck_map_ = linkers_->bruck_map();
    recursive_halving_map_ = linkers_->recursive_halving_map();
Guolin Ke's avatar
Guolin Ke committed
38
39
    block_start_ = std::vector<comm_size_t>(num_machines_);
    block_len_ = std::vector<comm_size_t>(num_machines_);
40
41
42
43
    buffer_size_ = 1024 * 1024;
    buffer_.resize(buffer_size_);
    Log::Info("Local rank: %d, total number of machines: %d", rank_, num_machines_);
  }
Guolin Ke's avatar
Guolin Ke committed
44
45
}

46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
void Network::Init(int num_machines, int rank,
                   ReduceScatterFunction reduce_scatter_ext_fun, AllgatherFunction allgather_ext_fun) {
  if (num_machines > 1) {
    rank_ = rank;
    num_machines_ = num_machines;
    block_start_ = std::vector<comm_size_t>(num_machines_);
    block_len_ = std::vector<comm_size_t>(num_machines_);
    buffer_size_ = 1024 * 1024;
    buffer_.resize(buffer_size_);
    reduce_scatter_ext_fun_ = reduce_scatter_ext_fun;
    allgather_ext_fun_ = allgather_ext_fun;
    Log::Info("Local rank: %d, total number of machines: %d", rank_, num_machines_);
  }
}

Guolin Ke's avatar
Guolin Ke committed
61
void Network::Dispose() {
62
63
64
  num_machines_ = 1;
  rank_ = 0;
  linkers_.reset(new Linkers());
65
66
  reduce_scatter_ext_fun_ = nullptr;
  allgather_ext_fun_ = nullptr;
Guolin Ke's avatar
Guolin Ke committed
67
68
}

Guolin Ke's avatar
Guolin Ke committed
69
void Network::Allreduce(char* input, comm_size_t input_size, int type_size, char* output, const ReduceFunction& reducer) {
70
71
72
  if (num_machines_ <= 1) {
    Log::Fatal("Please initilize the network interface first");
  }
Guolin Ke's avatar
Guolin Ke committed
73
  comm_size_t count = input_size / type_size;
Guolin Ke's avatar
Guolin Ke committed
74
75
  // if small package or small count , do it by all gather.(reduce the communication times.)
  if (count < num_machines_ || input_size < 4096) {
Guolin Ke's avatar
Guolin Ke committed
76
    AllreduceByAllGather(input, input_size, type_size, output, reducer);
Guolin Ke's avatar
Guolin Ke committed
77
78
79
    return;
  }
  // assign the blocks to every rank.
Guolin Ke's avatar
Guolin Ke committed
80
  comm_size_t step = (count + num_machines_ - 1) / num_machines_;
Guolin Ke's avatar
Guolin Ke committed
81
82
83
84
85
  if (step < 1) {
    step = 1;
  }
  block_start_[0] = 0;
  for (int i = 0; i < num_machines_ - 1; ++i) {
Guolin Ke's avatar
Guolin Ke committed
86
    block_len_[i] = std::min<comm_size_t>(step * type_size, input_size - block_start_[i]);
Guolin Ke's avatar
Guolin Ke committed
87
88
89
90
    block_start_[i + 1] = block_start_[i] + block_len_[i];
  }
  block_len_[num_machines_ - 1] = input_size - block_start_[num_machines_ - 1];
  // do reduce scatter
Guolin Ke's avatar
Guolin Ke committed
91
  ReduceScatter(input, input_size, type_size, block_start_.data(), block_len_.data(), output, input_size, reducer);
Guolin Ke's avatar
Guolin Ke committed
92
  // do all gather
Guolin Ke's avatar
Guolin Ke committed
93
  Allgather(output, block_start_.data(), block_len_.data(), output, input_size);
Guolin Ke's avatar
Guolin Ke committed
94
95
}

Guolin Ke's avatar
Guolin Ke committed
96
void Network::AllreduceByAllGather(char* input, comm_size_t input_size, int type_size, char* output, const ReduceFunction& reducer) {
97
98
99
  if (num_machines_ <= 1) {
    Log::Fatal("Please initilize the network interface first");
  }
Guolin Ke's avatar
Guolin Ke committed
100
  // assign blocks
Guolin Ke's avatar
Guolin Ke committed
101
  comm_size_t all_size = input_size * num_machines_;
Guolin Ke's avatar
Guolin Ke committed
102
103
104
105
106
107
108
109
110
  block_start_[0] = 0;
  block_len_[0] = input_size;
  for (int i = 1; i < num_machines_; ++i) {
    block_start_[i] = block_start_[i - 1] + block_len_[i - 1];
    block_len_[i] = input_size;
  }
  // need use buffer here, since size of "output" is smaller than size after all gather
  if (input_size*num_machines_ > buffer_size_) {
    buffer_size_ = input_size*num_machines_;
Guolin Ke's avatar
Guolin Ke committed
111
    buffer_.resize(buffer_size_);
Guolin Ke's avatar
Guolin Ke committed
112
113
  }

Guolin Ke's avatar
Guolin Ke committed
114
  Allgather(input, block_start_.data(), block_len_.data(), buffer_.data(), all_size);
Guolin Ke's avatar
Guolin Ke committed
115
  for (int i = 1; i < num_machines_; ++i) {
Guolin Ke's avatar
Guolin Ke committed
116
    reducer(buffer_.data() + block_start_[i], buffer_.data() + block_start_[0], type_size, input_size);
Guolin Ke's avatar
Guolin Ke committed
117
118
  }
  // copy back
Guolin Ke's avatar
Guolin Ke committed
119
  std::memcpy(output, buffer_.data(), input_size);
Guolin Ke's avatar
Guolin Ke committed
120
121
}

Guolin Ke's avatar
Guolin Ke committed
122
void Network::Allgather(char* input, comm_size_t send_size, char* output) {
123
124
  if (num_machines_ <= 1) {
    Log::Fatal("Please initilize the network interface first");
125
    return;
126
  }
Guolin Ke's avatar
Guolin Ke committed
127
128
129
130
131
132
133
134
  // assign blocks
  block_start_[0] = 0;
  block_len_[0] = send_size;
  for (int i = 1; i < num_machines_; ++i) {
    block_start_[i] = block_start_[i - 1] + block_len_[i - 1];
    block_len_[i] = send_size;
  }
  // start all gather
Guolin Ke's avatar
Guolin Ke committed
135
  Allgather(input, block_start_.data(), block_len_.data(), output, send_size * num_machines_);
Guolin Ke's avatar
Guolin Ke committed
136
137
}

Guolin Ke's avatar
Guolin Ke committed
138
void Network::Allgather(char* input, const comm_size_t* block_start, const comm_size_t* block_len, char* output, comm_size_t all_size) {
139
140
141
  if (num_machines_ <= 1) {
    Log::Fatal("Please initilize the network interface first");
  }
142
  if (allgather_ext_fun_ != nullptr) {
Guolin Ke's avatar
Guolin Ke committed
143
    return allgather_ext_fun_(input, block_len[rank_], block_start, block_len, num_machines_, output, all_size);
144
  }
145
  const comm_size_t kRingThreshold = 10 * 1024 * 1024;  // 10MB
Guolin Ke's avatar
Guolin Ke committed
146
147
148
149
150
  const int kRingNodeThreshold = 64;
  if (all_size > kRingThreshold && num_machines_ < kRingNodeThreshold) {
    // when num_machines is small and data is large
    AllgatherRing(input, block_start, block_len, output, all_size);
  } else if (recursive_halving_map_.is_power_of_2) {
Guolin Ke's avatar
Guolin Ke committed
151
152
    AllgatherRecursiveDoubling(input, block_start, block_len, output, all_size);
  } else {
Guolin Ke's avatar
Guolin Ke committed
153
    AllgatherBruck(input, block_start, block_len, output, all_size);
Guolin Ke's avatar
Guolin Ke committed
154
155
156
157
  }
}

void Network::AllgatherBruck(char* input, const comm_size_t* block_start, const comm_size_t* block_len, char* output, comm_size_t all_size) {
Guolin Ke's avatar
Guolin Ke committed
158
  comm_size_t write_pos = 0;
Guolin Ke's avatar
Guolin Ke committed
159
160
161
162
163
164
  // use output as receive buffer
  std::memcpy(output, input, block_len[rank_]);
  write_pos += block_len[rank_];
  int accumulated_block = 1;
  for (int i = 0; i < bruck_map_.k; ++i) {
    // get current local block size
Guolin Ke's avatar
Guolin Ke committed
165
    int cur_block_size = std::min(1 << i, num_machines_ - accumulated_block);
Guolin Ke's avatar
Guolin Ke committed
166
167
168
169
    // get out rank
    int out_rank = bruck_map_.out_ranks[i];
    // get in rank
    int in_rank = bruck_map_.in_ranks[i];
Guolin Ke's avatar
Guolin Ke committed
170
    // get send information
Guolin Ke's avatar
Guolin Ke committed
171
    comm_size_t need_send_len = 0;
Guolin Ke's avatar
Guolin Ke committed
172
    // get recv information
Guolin Ke's avatar
Guolin Ke committed
173
    comm_size_t need_recv_len = 0;
Guolin Ke's avatar
Guolin Ke committed
174
    for (int j = 0; j < cur_block_size; ++j) {
Guolin Ke's avatar
Guolin Ke committed
175
176
      need_send_len += block_len[(rank_ + j) % num_machines_];
      need_recv_len += block_len[(rank_ + accumulated_block + j) % num_machines_];
Guolin Ke's avatar
Guolin Ke committed
177
178
    }
    // send and recv at same time
Guolin Ke's avatar
Guolin Ke committed
179
180
    linkers_->SendRecv(out_rank, output, need_send_len, in_rank, output + write_pos, need_recv_len);
    write_pos += need_recv_len;
Guolin Ke's avatar
Guolin Ke committed
181
182
183
184
185
186
187
188
    accumulated_block += cur_block_size;
  }
  // rotate in-place
  std::reverse<char*>(output, output + all_size);
  std::reverse<char*>(output, output + block_start[rank_]);
  std::reverse<char*>(output + block_start[rank_], output + all_size);
}

Guolin Ke's avatar
Guolin Ke committed
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
void Network::AllgatherRecursiveDoubling(char* input, const comm_size_t* block_start, const comm_size_t* block_len, char* output, comm_size_t) {
  // use output as receive buffer
  std::memcpy(output + block_start[rank_], input, block_len[rank_]);
  for (int i = 0; i < bruck_map_.k; ++i) {
    // get current local block size
    int cur_step = 1 << i;
    const int vgroup = rank_ / cur_step;
    const int vrank = vgroup * cur_step;
    int target = rank_ + cur_step;
    int target_vrank = (vgroup + 1) * cur_step;
    if (vgroup & 1) {
      target = rank_ - cur_step;
      target_vrank = (vgroup - 1) * cur_step;
    }
    // get send information
    comm_size_t need_send_len = 0;
    // get recv information
    comm_size_t need_recv_len = 0;
    for (int j = 0; j < cur_step; ++j) {
      need_send_len += block_len[(vrank + j)];
      need_recv_len += block_len[(target_vrank + j)];
    }
    // send and recv at same time
Guolin Ke's avatar
Guolin Ke committed
212
    linkers_->SendRecv(target, output + block_start[vrank], need_send_len,
Guolin Ke's avatar
Guolin Ke committed
213
214
215
216
217
218
219
220
221
                       target, output + block_start[target_vrank], need_recv_len);
  }
}

void Network::AllgatherRing(char* input, const comm_size_t* block_start, const comm_size_t* block_len, char* output, comm_size_t) {
  // use output as receive buffer
  std::memcpy(output + block_start[rank_], input, block_len[rank_]);
  int out_rank = (rank_ + 1) % num_machines_;
  int in_rank = (rank_ - 1 + num_machines_) % num_machines_;
Guolin Ke's avatar
Guolin Ke committed
222
223
  int out_block = rank_;
  int in_block = in_rank;
Guolin Ke's avatar
Guolin Ke committed
224
225
  for (int i = 1; i < num_machines_; ++i) {
    // send and recv at same time
Guolin Ke's avatar
Guolin Ke committed
226
227
228
229
    linkers_->SendRecv(out_rank, output + block_start[out_block], block_len[out_block],
                       in_rank, output + block_start[in_block], block_len[in_block]);
    out_block = (out_block - 1 + num_machines_) % num_machines_;
    in_block = (in_block - 1 + num_machines_) % num_machines_;
Guolin Ke's avatar
Guolin Ke committed
230
231
232
  }
}

Guolin Ke's avatar
Guolin Ke committed
233
234
235
void Network::ReduceScatter(char* input, comm_size_t input_size, int type_size,
                            const comm_size_t* block_start, const comm_size_t* block_len, char* output,
                            comm_size_t output_size, const ReduceFunction& reducer) {
236
237
238
  if (num_machines_ <= 1) {
    Log::Fatal("Please initilize the network interface first");
  }
239
  if (reduce_scatter_ext_fun_ != nullptr) {
Guolin Ke's avatar
Guolin Ke committed
240
    return reduce_scatter_ext_fun_(input, input_size, type_size, block_start, block_len, num_machines_, output, output_size, reducer);
ww's avatar
ww committed
241
  }
242
  const comm_size_t kRingThreshold = 10 * 1024 * 1024;  // 10MB
Guolin Ke's avatar
Guolin Ke committed
243
244
  if (recursive_halving_map_.is_power_of_2 || input_size < kRingThreshold) {
    ReduceScatterRecursiveHalving(input, input_size, type_size, block_start, block_len, output, output_size, reducer);
Guolin Ke's avatar
Guolin Ke committed
245
  } else {
Guolin Ke's avatar
Guolin Ke committed
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
    ReduceScatterRing(input, input_size, type_size, block_start, block_len, output, output_size, reducer);
  }
}

void Network::ReduceScatterRecursiveHalving(char* input, comm_size_t input_size, int type_size,
                                            const comm_size_t* block_start, const comm_size_t* block_len, char* output,
                                            comm_size_t, const ReduceFunction& reducer) {
  if (!recursive_halving_map_.is_power_of_2) {
    if (recursive_halving_map_.type == RecursiveHalvingNodeType::Other) {
      // send local data to neighbor first
      linkers_->Send(recursive_halving_map_.neighbor, input, input_size);
    } else if (recursive_halving_map_.type == RecursiveHalvingNodeType::GroupLeader) {
      // receive neighbor data first
      int need_recv_cnt = input_size;
      linkers_->Recv(recursive_halving_map_.neighbor, output, need_recv_cnt);
      // reduce
      reducer(output, input, type_size, input_size);
    }
  }
  if (recursive_halving_map_.type != RecursiveHalvingNodeType::Other) {
Guolin Ke's avatar
Guolin Ke committed
266
267
268
    for (int i = 0; i < recursive_halving_map_.k; ++i) {
      // get target
      int target = recursive_halving_map_.ranks[i];
Guolin Ke's avatar
Guolin Ke committed
269
270
      comm_size_t send_block_start = recursive_halving_map_.send_block_start[i];
      comm_size_t recv_block_start = recursive_halving_map_.recv_block_start[i];
Guolin Ke's avatar
Guolin Ke committed
271
      // get send information
Guolin Ke's avatar
Guolin Ke committed
272
      comm_size_t send_size = 0;
Guolin Ke's avatar
Guolin Ke committed
273
274
275
276
      for (int j = 0; j < recursive_halving_map_.send_block_len[i]; ++j) {
        send_size += block_len[send_block_start + j];
      }
      // get recv information
Guolin Ke's avatar
Guolin Ke committed
277
      comm_size_t need_recv_cnt = 0;
Guolin Ke's avatar
Guolin Ke committed
278
279
280
281
282
283
      for (int j = 0; j < recursive_halving_map_.recv_block_len[i]; ++j) {
        need_recv_cnt += block_len[recv_block_start + j];
      }
      // send and recv at same time
      linkers_->SendRecv(target, input + block_start[send_block_start], send_size, target, output, need_recv_cnt);
      // reduce
Guolin Ke's avatar
Guolin Ke committed
284
      reducer(output, input + block_start[recv_block_start], type_size, need_recv_cnt);
Guolin Ke's avatar
Guolin Ke committed
285
286
    }
  }
Guolin Ke's avatar
Guolin Ke committed
287
288
289
290
291
292
293
294
295
296
297
298
299
  if (!recursive_halving_map_.is_power_of_2) {
    if (recursive_halving_map_.type == RecursiveHalvingNodeType::GroupLeader) {
      // send result to neighbor
      linkers_->Send(recursive_halving_map_.neighbor,
                     input + block_start[recursive_halving_map_.neighbor],
                     block_len[recursive_halving_map_.neighbor]);
    } else if (recursive_halving_map_.type == RecursiveHalvingNodeType::Other) {
      // receive result from neighbor
      int need_recv_cnt = block_len[rank_];
      linkers_->Recv(recursive_halving_map_.neighbor, output, need_recv_cnt);
      return;
    }
  }
Guolin Ke's avatar
Guolin Ke committed
300
301
302
303
  // copy result
  std::memcpy(output, input + block_start[rank_], block_len[rank_]);
}

Guolin Ke's avatar
Guolin Ke committed
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
void Network::ReduceScatterRing(char* input, comm_size_t, int type_size,
                                const comm_size_t* block_start, const comm_size_t* block_len, char* output,
                                comm_size_t, const ReduceFunction& reducer) {
  const int out_rank = (rank_ + 1) % num_machines_;
  const int in_rank = (rank_ - 1 + num_machines_) % num_machines_;
  int out_block = in_rank;
  int in_block = (in_rank - 1 + num_machines_) % num_machines_;
  for (int i = 1; i < num_machines_; ++i) {
    linkers_->SendRecv(out_rank, input + block_start[out_block], block_len[out_block],
                       in_rank, output, block_len[in_block]);
    reducer(output, input + block_start[in_block], type_size, block_len[in_block]);
    out_block = (out_block - 1 + num_machines_) % num_machines_;
    in_block = (in_block - 1 + num_machines_) % num_machines_;
  }
  std::memcpy(output, input + block_start[rank_], block_len[rank_]);
}

Guolin Ke's avatar
Guolin Ke committed
321
}  // namespace LightGBM