network.cpp 13.5 KB
Newer Older
1
2
3
4
/*!
 * Copyright (c) 2016 Microsoft Corporation. All rights reserved.
 * Licensed under the MIT License. See LICENSE file in the project root for license information.
 */
5
6
7
8
#include <LightGBM/network.h>

#include <LightGBM/utils/common.h>

9
#include <algorithm>
10
11
#include <cstdlib>
#include <cstring>
12
13
#include <memory>
#include <vector>
14

15
#include "linkers.h"
Guolin Ke's avatar
Guolin Ke committed
16
17
18

namespace LightGBM {

Hui Xue's avatar
Hui Xue committed
19
// static member definition
20
21
THREAD_LOCAL int Network::num_machines_ = 1;
THREAD_LOCAL int Network::rank_ = 0;
22
23
24
THREAD_LOCAL std::unique_ptr<Linkers> Network::linkers_;
THREAD_LOCAL BruckMap Network::bruck_map_;
THREAD_LOCAL RecursiveHalvingMap Network::recursive_halving_map_;
Guolin Ke's avatar
Guolin Ke committed
25
26
THREAD_LOCAL std::vector<comm_size_t> Network::block_start_;
THREAD_LOCAL std::vector<comm_size_t>  Network::block_len_;
27
THREAD_LOCAL comm_size_t Network::buffer_size_ = 0;
28
THREAD_LOCAL std::vector<char> Network::buffer_;
29
30
THREAD_LOCAL ReduceScatterFunction Network::reduce_scatter_ext_fun_ = nullptr;
THREAD_LOCAL AllgatherFunction Network::allgather_ext_fun_ = nullptr;
ww's avatar
ww committed
31

Guolin Ke's avatar
Guolin Ke committed
32

Guolin Ke's avatar
Guolin Ke committed
33
void Network::Init(Config config) {
34
35
36
37
38
39
  if (config.num_machines > 1) {
    linkers_.reset(new Linkers(config));
    rank_ = linkers_->rank();
    num_machines_ = linkers_->num_machines();
    bruck_map_ = linkers_->bruck_map();
    recursive_halving_map_ = linkers_->recursive_halving_map();
Guolin Ke's avatar
Guolin Ke committed
40
41
    block_start_ = std::vector<comm_size_t>(num_machines_);
    block_len_ = std::vector<comm_size_t>(num_machines_);
42
43
44
45
    buffer_size_ = 1024 * 1024;
    buffer_.resize(buffer_size_);
    Log::Info("Local rank: %d, total number of machines: %d", rank_, num_machines_);
  }
Guolin Ke's avatar
Guolin Ke committed
46
47
}

48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
void Network::Init(int num_machines, int rank,
                   ReduceScatterFunction reduce_scatter_ext_fun, AllgatherFunction allgather_ext_fun) {
  if (num_machines > 1) {
    rank_ = rank;
    num_machines_ = num_machines;
    block_start_ = std::vector<comm_size_t>(num_machines_);
    block_len_ = std::vector<comm_size_t>(num_machines_);
    buffer_size_ = 1024 * 1024;
    buffer_.resize(buffer_size_);
    reduce_scatter_ext_fun_ = reduce_scatter_ext_fun;
    allgather_ext_fun_ = allgather_ext_fun;
    Log::Info("Local rank: %d, total number of machines: %d", rank_, num_machines_);
  }
}

Guolin Ke's avatar
Guolin Ke committed
63
void Network::Dispose() {
64
65
66
  num_machines_ = 1;
  rank_ = 0;
  linkers_.reset(new Linkers());
67
68
  reduce_scatter_ext_fun_ = nullptr;
  allgather_ext_fun_ = nullptr;
Guolin Ke's avatar
Guolin Ke committed
69
70
}

Guolin Ke's avatar
Guolin Ke committed
71
void Network::Allreduce(char* input, comm_size_t input_size, int type_size, char* output, const ReduceFunction& reducer) {
72
  if (num_machines_ <= 1) {
James Lamb's avatar
James Lamb committed
73
    Log::Fatal("Please initialize the network interface first");
74
  }
Guolin Ke's avatar
Guolin Ke committed
75
  comm_size_t count = input_size / type_size;
Guolin Ke's avatar
Guolin Ke committed
76
77
  // if small package or small count , do it by all gather.(reduce the communication times.)
  if (count < num_machines_ || input_size < 4096) {
Guolin Ke's avatar
Guolin Ke committed
78
    AllreduceByAllGather(input, input_size, type_size, output, reducer);
Guolin Ke's avatar
Guolin Ke committed
79
80
81
    return;
  }
  // assign the blocks to every rank.
Guolin Ke's avatar
Guolin Ke committed
82
  comm_size_t step = (count + num_machines_ - 1) / num_machines_;
Guolin Ke's avatar
Guolin Ke committed
83
84
85
86
87
  if (step < 1) {
    step = 1;
  }
  block_start_[0] = 0;
  for (int i = 0; i < num_machines_ - 1; ++i) {
Guolin Ke's avatar
Guolin Ke committed
88
    block_len_[i] = std::min<comm_size_t>(step * type_size, input_size - block_start_[i]);
Guolin Ke's avatar
Guolin Ke committed
89
90
91
92
    block_start_[i + 1] = block_start_[i] + block_len_[i];
  }
  block_len_[num_machines_ - 1] = input_size - block_start_[num_machines_ - 1];
  // do reduce scatter
Guolin Ke's avatar
Guolin Ke committed
93
  ReduceScatter(input, input_size, type_size, block_start_.data(), block_len_.data(), output, input_size, reducer);
Guolin Ke's avatar
Guolin Ke committed
94
  // do all gather
Guolin Ke's avatar
Guolin Ke committed
95
  Allgather(output, block_start_.data(), block_len_.data(), output, input_size);
Guolin Ke's avatar
Guolin Ke committed
96
97
}

Guolin Ke's avatar
Guolin Ke committed
98
void Network::AllreduceByAllGather(char* input, comm_size_t input_size, int type_size, char* output, const ReduceFunction& reducer) {
99
  if (num_machines_ <= 1) {
James Lamb's avatar
James Lamb committed
100
    Log::Fatal("Please initialize the network interface first");
101
  }
Guolin Ke's avatar
Guolin Ke committed
102
  // assign blocks
Guolin Ke's avatar
Guolin Ke committed
103
  comm_size_t all_size = input_size * num_machines_;
Guolin Ke's avatar
Guolin Ke committed
104
105
106
107
108
109
110
111
112
  block_start_[0] = 0;
  block_len_[0] = input_size;
  for (int i = 1; i < num_machines_; ++i) {
    block_start_[i] = block_start_[i - 1] + block_len_[i - 1];
    block_len_[i] = input_size;
  }
  // need use buffer here, since size of "output" is smaller than size after all gather
  if (input_size*num_machines_ > buffer_size_) {
    buffer_size_ = input_size*num_machines_;
Guolin Ke's avatar
Guolin Ke committed
113
    buffer_.resize(buffer_size_);
Guolin Ke's avatar
Guolin Ke committed
114
115
  }

Guolin Ke's avatar
Guolin Ke committed
116
  Allgather(input, block_start_.data(), block_len_.data(), buffer_.data(), all_size);
Guolin Ke's avatar
Guolin Ke committed
117
  for (int i = 1; i < num_machines_; ++i) {
Guolin Ke's avatar
Guolin Ke committed
118
    reducer(buffer_.data() + block_start_[i], buffer_.data() + block_start_[0], type_size, input_size);
Guolin Ke's avatar
Guolin Ke committed
119
120
  }
  // copy back
Guolin Ke's avatar
Guolin Ke committed
121
  std::memcpy(output, buffer_.data(), input_size);
Guolin Ke's avatar
Guolin Ke committed
122
123
}

Guolin Ke's avatar
Guolin Ke committed
124
void Network::Allgather(char* input, comm_size_t send_size, char* output) {
125
  if (num_machines_ <= 1) {
James Lamb's avatar
James Lamb committed
126
    Log::Fatal("Please initialize the network interface first");
127
    return;
128
  }
Guolin Ke's avatar
Guolin Ke committed
129
130
131
132
133
134
135
136
  // assign blocks
  block_start_[0] = 0;
  block_len_[0] = send_size;
  for (int i = 1; i < num_machines_; ++i) {
    block_start_[i] = block_start_[i - 1] + block_len_[i - 1];
    block_len_[i] = send_size;
  }
  // start all gather
Guolin Ke's avatar
Guolin Ke committed
137
  Allgather(input, block_start_.data(), block_len_.data(), output, send_size * num_machines_);
Guolin Ke's avatar
Guolin Ke committed
138
139
}

Guolin Ke's avatar
Guolin Ke committed
140
void Network::Allgather(char* input, const comm_size_t* block_start, const comm_size_t* block_len, char* output, comm_size_t all_size) {
141
  if (num_machines_ <= 1) {
James Lamb's avatar
James Lamb committed
142
    Log::Fatal("Please initialize the network interface first");
143
  }
144
  if (allgather_ext_fun_ != nullptr) {
Guolin Ke's avatar
Guolin Ke committed
145
    return allgather_ext_fun_(input, block_len[rank_], block_start, block_len, num_machines_, output, all_size);
146
  }
147
  const comm_size_t kRingThreshold = 10 * 1024 * 1024;  // 10MB
Guolin Ke's avatar
Guolin Ke committed
148
149
150
151
152
  const int kRingNodeThreshold = 64;
  if (all_size > kRingThreshold && num_machines_ < kRingNodeThreshold) {
    // when num_machines is small and data is large
    AllgatherRing(input, block_start, block_len, output, all_size);
  } else if (recursive_halving_map_.is_power_of_2) {
Guolin Ke's avatar
Guolin Ke committed
153
154
    AllgatherRecursiveDoubling(input, block_start, block_len, output, all_size);
  } else {
Guolin Ke's avatar
Guolin Ke committed
155
    AllgatherBruck(input, block_start, block_len, output, all_size);
Guolin Ke's avatar
Guolin Ke committed
156
157
158
159
  }
}

void Network::AllgatherBruck(char* input, const comm_size_t* block_start, const comm_size_t* block_len, char* output, comm_size_t all_size) {
Guolin Ke's avatar
Guolin Ke committed
160
  comm_size_t write_pos = 0;
Guolin Ke's avatar
Guolin Ke committed
161
162
163
164
165
166
  // use output as receive buffer
  std::memcpy(output, input, block_len[rank_]);
  write_pos += block_len[rank_];
  int accumulated_block = 1;
  for (int i = 0; i < bruck_map_.k; ++i) {
    // get current local block size
Guolin Ke's avatar
Guolin Ke committed
167
    int cur_block_size = std::min(1 << i, num_machines_ - accumulated_block);
Guolin Ke's avatar
Guolin Ke committed
168
169
170
171
    // get out rank
    int out_rank = bruck_map_.out_ranks[i];
    // get in rank
    int in_rank = bruck_map_.in_ranks[i];
Guolin Ke's avatar
Guolin Ke committed
172
    // get send information
Guolin Ke's avatar
Guolin Ke committed
173
    comm_size_t need_send_len = 0;
Guolin Ke's avatar
Guolin Ke committed
174
    // get recv information
Guolin Ke's avatar
Guolin Ke committed
175
    comm_size_t need_recv_len = 0;
Guolin Ke's avatar
Guolin Ke committed
176
    for (int j = 0; j < cur_block_size; ++j) {
Guolin Ke's avatar
Guolin Ke committed
177
178
      need_send_len += block_len[(rank_ + j) % num_machines_];
      need_recv_len += block_len[(rank_ + accumulated_block + j) % num_machines_];
Guolin Ke's avatar
Guolin Ke committed
179
180
    }
    // send and recv at same time
Guolin Ke's avatar
Guolin Ke committed
181
182
    linkers_->SendRecv(out_rank, output, need_send_len, in_rank, output + write_pos, need_recv_len);
    write_pos += need_recv_len;
Guolin Ke's avatar
Guolin Ke committed
183
184
185
186
187
188
189
190
    accumulated_block += cur_block_size;
  }
  // rotate in-place
  std::reverse<char*>(output, output + all_size);
  std::reverse<char*>(output, output + block_start[rank_]);
  std::reverse<char*>(output + block_start[rank_], output + all_size);
}

Guolin Ke's avatar
Guolin Ke committed
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
void Network::AllgatherRecursiveDoubling(char* input, const comm_size_t* block_start, const comm_size_t* block_len, char* output, comm_size_t) {
  // use output as receive buffer
  std::memcpy(output + block_start[rank_], input, block_len[rank_]);
  for (int i = 0; i < bruck_map_.k; ++i) {
    // get current local block size
    int cur_step = 1 << i;
    const int vgroup = rank_ / cur_step;
    const int vrank = vgroup * cur_step;
    int target = rank_ + cur_step;
    int target_vrank = (vgroup + 1) * cur_step;
    if (vgroup & 1) {
      target = rank_ - cur_step;
      target_vrank = (vgroup - 1) * cur_step;
    }
    // get send information
    comm_size_t need_send_len = 0;
    // get recv information
    comm_size_t need_recv_len = 0;
    for (int j = 0; j < cur_step; ++j) {
      need_send_len += block_len[(vrank + j)];
      need_recv_len += block_len[(target_vrank + j)];
    }
    // send and recv at same time
Guolin Ke's avatar
Guolin Ke committed
214
    linkers_->SendRecv(target, output + block_start[vrank], need_send_len,
Guolin Ke's avatar
Guolin Ke committed
215
216
217
218
219
220
221
222
223
                       target, output + block_start[target_vrank], need_recv_len);
  }
}

void Network::AllgatherRing(char* input, const comm_size_t* block_start, const comm_size_t* block_len, char* output, comm_size_t) {
  // use output as receive buffer
  std::memcpy(output + block_start[rank_], input, block_len[rank_]);
  int out_rank = (rank_ + 1) % num_machines_;
  int in_rank = (rank_ - 1 + num_machines_) % num_machines_;
Guolin Ke's avatar
Guolin Ke committed
224
225
  int out_block = rank_;
  int in_block = in_rank;
Guolin Ke's avatar
Guolin Ke committed
226
227
  for (int i = 1; i < num_machines_; ++i) {
    // send and recv at same time
Guolin Ke's avatar
Guolin Ke committed
228
229
230
231
    linkers_->SendRecv(out_rank, output + block_start[out_block], block_len[out_block],
                       in_rank, output + block_start[in_block], block_len[in_block]);
    out_block = (out_block - 1 + num_machines_) % num_machines_;
    in_block = (in_block - 1 + num_machines_) % num_machines_;
Guolin Ke's avatar
Guolin Ke committed
232
233
234
  }
}

Guolin Ke's avatar
Guolin Ke committed
235
236
237
void Network::ReduceScatter(char* input, comm_size_t input_size, int type_size,
                            const comm_size_t* block_start, const comm_size_t* block_len, char* output,
                            comm_size_t output_size, const ReduceFunction& reducer) {
238
  if (num_machines_ <= 1) {
James Lamb's avatar
James Lamb committed
239
    Log::Fatal("Please initialize the network interface first");
240
  }
241
  if (reduce_scatter_ext_fun_ != nullptr) {
Guolin Ke's avatar
Guolin Ke committed
242
    return reduce_scatter_ext_fun_(input, input_size, type_size, block_start, block_len, num_machines_, output, output_size, reducer);
ww's avatar
ww committed
243
  }
244
  const comm_size_t kRingThreshold = 10 * 1024 * 1024;  // 10MB
Guolin Ke's avatar
Guolin Ke committed
245
246
  if (recursive_halving_map_.is_power_of_2 || input_size < kRingThreshold) {
    ReduceScatterRecursiveHalving(input, input_size, type_size, block_start, block_len, output, output_size, reducer);
Guolin Ke's avatar
Guolin Ke committed
247
  } else {
Guolin Ke's avatar
Guolin Ke committed
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
    ReduceScatterRing(input, input_size, type_size, block_start, block_len, output, output_size, reducer);
  }
}

void Network::ReduceScatterRecursiveHalving(char* input, comm_size_t input_size, int type_size,
                                            const comm_size_t* block_start, const comm_size_t* block_len, char* output,
                                            comm_size_t, const ReduceFunction& reducer) {
  if (!recursive_halving_map_.is_power_of_2) {
    if (recursive_halving_map_.type == RecursiveHalvingNodeType::Other) {
      // send local data to neighbor first
      linkers_->Send(recursive_halving_map_.neighbor, input, input_size);
    } else if (recursive_halving_map_.type == RecursiveHalvingNodeType::GroupLeader) {
      // receive neighbor data first
      int need_recv_cnt = input_size;
      linkers_->Recv(recursive_halving_map_.neighbor, output, need_recv_cnt);
      // reduce
      reducer(output, input, type_size, input_size);
    }
  }
  if (recursive_halving_map_.type != RecursiveHalvingNodeType::Other) {
Guolin Ke's avatar
Guolin Ke committed
268
269
270
    for (int i = 0; i < recursive_halving_map_.k; ++i) {
      // get target
      int target = recursive_halving_map_.ranks[i];
Guolin Ke's avatar
Guolin Ke committed
271
272
      comm_size_t send_block_start = recursive_halving_map_.send_block_start[i];
      comm_size_t recv_block_start = recursive_halving_map_.recv_block_start[i];
Guolin Ke's avatar
Guolin Ke committed
273
      // get send information
Guolin Ke's avatar
Guolin Ke committed
274
      comm_size_t send_size = 0;
Guolin Ke's avatar
Guolin Ke committed
275
276
277
278
      for (int j = 0; j < recursive_halving_map_.send_block_len[i]; ++j) {
        send_size += block_len[send_block_start + j];
      }
      // get recv information
Guolin Ke's avatar
Guolin Ke committed
279
      comm_size_t need_recv_cnt = 0;
Guolin Ke's avatar
Guolin Ke committed
280
281
282
283
284
285
      for (int j = 0; j < recursive_halving_map_.recv_block_len[i]; ++j) {
        need_recv_cnt += block_len[recv_block_start + j];
      }
      // send and recv at same time
      linkers_->SendRecv(target, input + block_start[send_block_start], send_size, target, output, need_recv_cnt);
      // reduce
Guolin Ke's avatar
Guolin Ke committed
286
      reducer(output, input + block_start[recv_block_start], type_size, need_recv_cnt);
Guolin Ke's avatar
Guolin Ke committed
287
288
    }
  }
Guolin Ke's avatar
Guolin Ke committed
289
290
291
292
293
294
295
296
297
298
299
300
301
  if (!recursive_halving_map_.is_power_of_2) {
    if (recursive_halving_map_.type == RecursiveHalvingNodeType::GroupLeader) {
      // send result to neighbor
      linkers_->Send(recursive_halving_map_.neighbor,
                     input + block_start[recursive_halving_map_.neighbor],
                     block_len[recursive_halving_map_.neighbor]);
    } else if (recursive_halving_map_.type == RecursiveHalvingNodeType::Other) {
      // receive result from neighbor
      int need_recv_cnt = block_len[rank_];
      linkers_->Recv(recursive_halving_map_.neighbor, output, need_recv_cnt);
      return;
    }
  }
Guolin Ke's avatar
Guolin Ke committed
302
303
304
305
  // copy result
  std::memcpy(output, input + block_start[rank_], block_len[rank_]);
}

Guolin Ke's avatar
Guolin Ke committed
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
void Network::ReduceScatterRing(char* input, comm_size_t, int type_size,
                                const comm_size_t* block_start, const comm_size_t* block_len, char* output,
                                comm_size_t, const ReduceFunction& reducer) {
  const int out_rank = (rank_ + 1) % num_machines_;
  const int in_rank = (rank_ - 1 + num_machines_) % num_machines_;
  int out_block = in_rank;
  int in_block = (in_rank - 1 + num_machines_) % num_machines_;
  for (int i = 1; i < num_machines_; ++i) {
    linkers_->SendRecv(out_rank, input + block_start[out_block], block_len[out_block],
                       in_rank, output, block_len[in_block]);
    reducer(output, input + block_start[in_block], type_size, block_len[in_block]);
    out_block = (out_block - 1 + num_machines_) % num_machines_;
    in_block = (in_block - 1 + num_machines_) % num_machines_;
  }
  std::memcpy(output, input + block_start[rank_], block_len[rank_]);
}

323
324
325
326
327
328
329
330
int Network::rank() {
  return rank_;
}

int Network::num_machines() {
  return num_machines_;
}

Guolin Ke's avatar
Guolin Ke committed
331
}  // namespace LightGBM