"tests/git@developer.sourcefind.cn:tianlh/lightgbm-dcu.git" did not exist on "e2f3fc5ef41a414724be013b718b2e6a03ac0e5a"
network.cpp 13.5 KB
Newer Older
1
2
3
4
/*!
 * Copyright (c) 2016 Microsoft Corporation. All rights reserved.
 * Licensed under the MIT License. See LICENSE file in the project root for license information.
 */
5
6
7
8
#include <LightGBM/network.h>

#include <LightGBM/utils/common.h>

9
10
11
#include <cstdlib>
#include <cstring>

12
#include "linkers.h"
Guolin Ke's avatar
Guolin Ke committed
13
14
15

namespace LightGBM {

Hui Xue's avatar
Hui Xue committed
16
// static member definition
17
18
THREAD_LOCAL int Network::num_machines_ = 1;
THREAD_LOCAL int Network::rank_ = 0;
19
20
21
THREAD_LOCAL std::unique_ptr<Linkers> Network::linkers_;
THREAD_LOCAL BruckMap Network::bruck_map_;
THREAD_LOCAL RecursiveHalvingMap Network::recursive_halving_map_;
Guolin Ke's avatar
Guolin Ke committed
22
23
THREAD_LOCAL std::vector<comm_size_t> Network::block_start_;
THREAD_LOCAL std::vector<comm_size_t>  Network::block_len_;
24
THREAD_LOCAL comm_size_t Network::buffer_size_ = 0;
25
THREAD_LOCAL std::vector<char> Network::buffer_;
26
27
THREAD_LOCAL ReduceScatterFunction Network::reduce_scatter_ext_fun_ = nullptr;
THREAD_LOCAL AllgatherFunction Network::allgather_ext_fun_ = nullptr;
ww's avatar
ww committed
28

Guolin Ke's avatar
Guolin Ke committed
29

Guolin Ke's avatar
Guolin Ke committed
30
void Network::Init(Config config) {
31
32
33
34
35
36
  if (config.num_machines > 1) {
    linkers_.reset(new Linkers(config));
    rank_ = linkers_->rank();
    num_machines_ = linkers_->num_machines();
    bruck_map_ = linkers_->bruck_map();
    recursive_halving_map_ = linkers_->recursive_halving_map();
Guolin Ke's avatar
Guolin Ke committed
37
38
    block_start_ = std::vector<comm_size_t>(num_machines_);
    block_len_ = std::vector<comm_size_t>(num_machines_);
39
40
41
42
    buffer_size_ = 1024 * 1024;
    buffer_.resize(buffer_size_);
    Log::Info("Local rank: %d, total number of machines: %d", rank_, num_machines_);
  }
Guolin Ke's avatar
Guolin Ke committed
43
44
}

45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
void Network::Init(int num_machines, int rank,
                   ReduceScatterFunction reduce_scatter_ext_fun, AllgatherFunction allgather_ext_fun) {
  if (num_machines > 1) {
    rank_ = rank;
    num_machines_ = num_machines;
    block_start_ = std::vector<comm_size_t>(num_machines_);
    block_len_ = std::vector<comm_size_t>(num_machines_);
    buffer_size_ = 1024 * 1024;
    buffer_.resize(buffer_size_);
    reduce_scatter_ext_fun_ = reduce_scatter_ext_fun;
    allgather_ext_fun_ = allgather_ext_fun;
    Log::Info("Local rank: %d, total number of machines: %d", rank_, num_machines_);
  }
}

Guolin Ke's avatar
Guolin Ke committed
60
void Network::Dispose() {
61
62
63
  num_machines_ = 1;
  rank_ = 0;
  linkers_.reset(new Linkers());
64
65
  reduce_scatter_ext_fun_ = nullptr;
  allgather_ext_fun_ = nullptr;
Guolin Ke's avatar
Guolin Ke committed
66
67
}

Guolin Ke's avatar
Guolin Ke committed
68
void Network::Allreduce(char* input, comm_size_t input_size, int type_size, char* output, const ReduceFunction& reducer) {
69
  if (num_machines_ <= 1) {
James Lamb's avatar
James Lamb committed
70
    Log::Fatal("Please initialize the network interface first");
71
  }
Guolin Ke's avatar
Guolin Ke committed
72
  comm_size_t count = input_size / type_size;
Guolin Ke's avatar
Guolin Ke committed
73
74
  // if small package or small count , do it by all gather.(reduce the communication times.)
  if (count < num_machines_ || input_size < 4096) {
Guolin Ke's avatar
Guolin Ke committed
75
    AllreduceByAllGather(input, input_size, type_size, output, reducer);
Guolin Ke's avatar
Guolin Ke committed
76
77
78
    return;
  }
  // assign the blocks to every rank.
Guolin Ke's avatar
Guolin Ke committed
79
  comm_size_t step = (count + num_machines_ - 1) / num_machines_;
Guolin Ke's avatar
Guolin Ke committed
80
81
82
83
84
  if (step < 1) {
    step = 1;
  }
  block_start_[0] = 0;
  for (int i = 0; i < num_machines_ - 1; ++i) {
Guolin Ke's avatar
Guolin Ke committed
85
    block_len_[i] = std::min<comm_size_t>(step * type_size, input_size - block_start_[i]);
Guolin Ke's avatar
Guolin Ke committed
86
87
88
89
    block_start_[i + 1] = block_start_[i] + block_len_[i];
  }
  block_len_[num_machines_ - 1] = input_size - block_start_[num_machines_ - 1];
  // do reduce scatter
Guolin Ke's avatar
Guolin Ke committed
90
  ReduceScatter(input, input_size, type_size, block_start_.data(), block_len_.data(), output, input_size, reducer);
Guolin Ke's avatar
Guolin Ke committed
91
  // do all gather
Guolin Ke's avatar
Guolin Ke committed
92
  Allgather(output, block_start_.data(), block_len_.data(), output, input_size);
Guolin Ke's avatar
Guolin Ke committed
93
94
}

Guolin Ke's avatar
Guolin Ke committed
95
void Network::AllreduceByAllGather(char* input, comm_size_t input_size, int type_size, char* output, const ReduceFunction& reducer) {
96
  if (num_machines_ <= 1) {
James Lamb's avatar
James Lamb committed
97
    Log::Fatal("Please initialize the network interface first");
98
  }
Guolin Ke's avatar
Guolin Ke committed
99
  // assign blocks
Guolin Ke's avatar
Guolin Ke committed
100
  comm_size_t all_size = input_size * num_machines_;
Guolin Ke's avatar
Guolin Ke committed
101
102
103
104
105
106
107
108
109
  block_start_[0] = 0;
  block_len_[0] = input_size;
  for (int i = 1; i < num_machines_; ++i) {
    block_start_[i] = block_start_[i - 1] + block_len_[i - 1];
    block_len_[i] = input_size;
  }
  // need use buffer here, since size of "output" is smaller than size after all gather
  if (input_size*num_machines_ > buffer_size_) {
    buffer_size_ = input_size*num_machines_;
Guolin Ke's avatar
Guolin Ke committed
110
    buffer_.resize(buffer_size_);
Guolin Ke's avatar
Guolin Ke committed
111
112
  }

Guolin Ke's avatar
Guolin Ke committed
113
  Allgather(input, block_start_.data(), block_len_.data(), buffer_.data(), all_size);
Guolin Ke's avatar
Guolin Ke committed
114
  for (int i = 1; i < num_machines_; ++i) {
Guolin Ke's avatar
Guolin Ke committed
115
    reducer(buffer_.data() + block_start_[i], buffer_.data() + block_start_[0], type_size, input_size);
Guolin Ke's avatar
Guolin Ke committed
116
117
  }
  // copy back
Guolin Ke's avatar
Guolin Ke committed
118
  std::memcpy(output, buffer_.data(), input_size);
Guolin Ke's avatar
Guolin Ke committed
119
120
}

Guolin Ke's avatar
Guolin Ke committed
121
void Network::Allgather(char* input, comm_size_t send_size, char* output) {
122
  if (num_machines_ <= 1) {
James Lamb's avatar
James Lamb committed
123
    Log::Fatal("Please initialize the network interface first");
124
    return;
125
  }
Guolin Ke's avatar
Guolin Ke committed
126
127
128
129
130
131
132
133
  // assign blocks
  block_start_[0] = 0;
  block_len_[0] = send_size;
  for (int i = 1; i < num_machines_; ++i) {
    block_start_[i] = block_start_[i - 1] + block_len_[i - 1];
    block_len_[i] = send_size;
  }
  // start all gather
Guolin Ke's avatar
Guolin Ke committed
134
  Allgather(input, block_start_.data(), block_len_.data(), output, send_size * num_machines_);
Guolin Ke's avatar
Guolin Ke committed
135
136
}

Guolin Ke's avatar
Guolin Ke committed
137
void Network::Allgather(char* input, const comm_size_t* block_start, const comm_size_t* block_len, char* output, comm_size_t all_size) {
138
  if (num_machines_ <= 1) {
James Lamb's avatar
James Lamb committed
139
    Log::Fatal("Please initialize the network interface first");
140
  }
141
  if (allgather_ext_fun_ != nullptr) {
Guolin Ke's avatar
Guolin Ke committed
142
    return allgather_ext_fun_(input, block_len[rank_], block_start, block_len, num_machines_, output, all_size);
143
  }
144
  const comm_size_t kRingThreshold = 10 * 1024 * 1024;  // 10MB
Guolin Ke's avatar
Guolin Ke committed
145
146
147
148
149
  const int kRingNodeThreshold = 64;
  if (all_size > kRingThreshold && num_machines_ < kRingNodeThreshold) {
    // when num_machines is small and data is large
    AllgatherRing(input, block_start, block_len, output, all_size);
  } else if (recursive_halving_map_.is_power_of_2) {
Guolin Ke's avatar
Guolin Ke committed
150
151
    AllgatherRecursiveDoubling(input, block_start, block_len, output, all_size);
  } else {
Guolin Ke's avatar
Guolin Ke committed
152
    AllgatherBruck(input, block_start, block_len, output, all_size);
Guolin Ke's avatar
Guolin Ke committed
153
154
155
156
  }
}

void Network::AllgatherBruck(char* input, const comm_size_t* block_start, const comm_size_t* block_len, char* output, comm_size_t all_size) {
Guolin Ke's avatar
Guolin Ke committed
157
  comm_size_t write_pos = 0;
Guolin Ke's avatar
Guolin Ke committed
158
159
160
161
162
163
  // use output as receive buffer
  std::memcpy(output, input, block_len[rank_]);
  write_pos += block_len[rank_];
  int accumulated_block = 1;
  for (int i = 0; i < bruck_map_.k; ++i) {
    // get current local block size
Guolin Ke's avatar
Guolin Ke committed
164
    int cur_block_size = std::min(1 << i, num_machines_ - accumulated_block);
Guolin Ke's avatar
Guolin Ke committed
165
166
167
168
    // get out rank
    int out_rank = bruck_map_.out_ranks[i];
    // get in rank
    int in_rank = bruck_map_.in_ranks[i];
Guolin Ke's avatar
Guolin Ke committed
169
    // get send information
Guolin Ke's avatar
Guolin Ke committed
170
    comm_size_t need_send_len = 0;
Guolin Ke's avatar
Guolin Ke committed
171
    // get recv information
Guolin Ke's avatar
Guolin Ke committed
172
    comm_size_t need_recv_len = 0;
Guolin Ke's avatar
Guolin Ke committed
173
    for (int j = 0; j < cur_block_size; ++j) {
Guolin Ke's avatar
Guolin Ke committed
174
175
      need_send_len += block_len[(rank_ + j) % num_machines_];
      need_recv_len += block_len[(rank_ + accumulated_block + j) % num_machines_];
Guolin Ke's avatar
Guolin Ke committed
176
177
    }
    // send and recv at same time
Guolin Ke's avatar
Guolin Ke committed
178
179
    linkers_->SendRecv(out_rank, output, need_send_len, in_rank, output + write_pos, need_recv_len);
    write_pos += need_recv_len;
Guolin Ke's avatar
Guolin Ke committed
180
181
182
183
184
185
186
187
    accumulated_block += cur_block_size;
  }
  // rotate in-place
  std::reverse<char*>(output, output + all_size);
  std::reverse<char*>(output, output + block_start[rank_]);
  std::reverse<char*>(output + block_start[rank_], output + all_size);
}

Guolin Ke's avatar
Guolin Ke committed
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
void Network::AllgatherRecursiveDoubling(char* input, const comm_size_t* block_start, const comm_size_t* block_len, char* output, comm_size_t) {
  // use output as receive buffer
  std::memcpy(output + block_start[rank_], input, block_len[rank_]);
  for (int i = 0; i < bruck_map_.k; ++i) {
    // get current local block size
    int cur_step = 1 << i;
    const int vgroup = rank_ / cur_step;
    const int vrank = vgroup * cur_step;
    int target = rank_ + cur_step;
    int target_vrank = (vgroup + 1) * cur_step;
    if (vgroup & 1) {
      target = rank_ - cur_step;
      target_vrank = (vgroup - 1) * cur_step;
    }
    // get send information
    comm_size_t need_send_len = 0;
    // get recv information
    comm_size_t need_recv_len = 0;
    for (int j = 0; j < cur_step; ++j) {
      need_send_len += block_len[(vrank + j)];
      need_recv_len += block_len[(target_vrank + j)];
    }
    // send and recv at same time
Guolin Ke's avatar
Guolin Ke committed
211
    linkers_->SendRecv(target, output + block_start[vrank], need_send_len,
Guolin Ke's avatar
Guolin Ke committed
212
213
214
215
216
217
218
219
220
                       target, output + block_start[target_vrank], need_recv_len);
  }
}

void Network::AllgatherRing(char* input, const comm_size_t* block_start, const comm_size_t* block_len, char* output, comm_size_t) {
  // use output as receive buffer
  std::memcpy(output + block_start[rank_], input, block_len[rank_]);
  int out_rank = (rank_ + 1) % num_machines_;
  int in_rank = (rank_ - 1 + num_machines_) % num_machines_;
Guolin Ke's avatar
Guolin Ke committed
221
222
  int out_block = rank_;
  int in_block = in_rank;
Guolin Ke's avatar
Guolin Ke committed
223
224
  for (int i = 1; i < num_machines_; ++i) {
    // send and recv at same time
Guolin Ke's avatar
Guolin Ke committed
225
226
227
228
    linkers_->SendRecv(out_rank, output + block_start[out_block], block_len[out_block],
                       in_rank, output + block_start[in_block], block_len[in_block]);
    out_block = (out_block - 1 + num_machines_) % num_machines_;
    in_block = (in_block - 1 + num_machines_) % num_machines_;
Guolin Ke's avatar
Guolin Ke committed
229
230
231
  }
}

Guolin Ke's avatar
Guolin Ke committed
232
233
234
void Network::ReduceScatter(char* input, comm_size_t input_size, int type_size,
                            const comm_size_t* block_start, const comm_size_t* block_len, char* output,
                            comm_size_t output_size, const ReduceFunction& reducer) {
235
  if (num_machines_ <= 1) {
James Lamb's avatar
James Lamb committed
236
    Log::Fatal("Please initialize the network interface first");
237
  }
238
  if (reduce_scatter_ext_fun_ != nullptr) {
Guolin Ke's avatar
Guolin Ke committed
239
    return reduce_scatter_ext_fun_(input, input_size, type_size, block_start, block_len, num_machines_, output, output_size, reducer);
ww's avatar
ww committed
240
  }
241
  const comm_size_t kRingThreshold = 10 * 1024 * 1024;  // 10MB
Guolin Ke's avatar
Guolin Ke committed
242
243
  if (recursive_halving_map_.is_power_of_2 || input_size < kRingThreshold) {
    ReduceScatterRecursiveHalving(input, input_size, type_size, block_start, block_len, output, output_size, reducer);
Guolin Ke's avatar
Guolin Ke committed
244
  } else {
Guolin Ke's avatar
Guolin Ke committed
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
    ReduceScatterRing(input, input_size, type_size, block_start, block_len, output, output_size, reducer);
  }
}

void Network::ReduceScatterRecursiveHalving(char* input, comm_size_t input_size, int type_size,
                                            const comm_size_t* block_start, const comm_size_t* block_len, char* output,
                                            comm_size_t, const ReduceFunction& reducer) {
  if (!recursive_halving_map_.is_power_of_2) {
    if (recursive_halving_map_.type == RecursiveHalvingNodeType::Other) {
      // send local data to neighbor first
      linkers_->Send(recursive_halving_map_.neighbor, input, input_size);
    } else if (recursive_halving_map_.type == RecursiveHalvingNodeType::GroupLeader) {
      // receive neighbor data first
      int need_recv_cnt = input_size;
      linkers_->Recv(recursive_halving_map_.neighbor, output, need_recv_cnt);
      // reduce
      reducer(output, input, type_size, input_size);
    }
  }
  if (recursive_halving_map_.type != RecursiveHalvingNodeType::Other) {
Guolin Ke's avatar
Guolin Ke committed
265
266
267
    for (int i = 0; i < recursive_halving_map_.k; ++i) {
      // get target
      int target = recursive_halving_map_.ranks[i];
Guolin Ke's avatar
Guolin Ke committed
268
269
      comm_size_t send_block_start = recursive_halving_map_.send_block_start[i];
      comm_size_t recv_block_start = recursive_halving_map_.recv_block_start[i];
Guolin Ke's avatar
Guolin Ke committed
270
      // get send information
Guolin Ke's avatar
Guolin Ke committed
271
      comm_size_t send_size = 0;
Guolin Ke's avatar
Guolin Ke committed
272
273
274
275
      for (int j = 0; j < recursive_halving_map_.send_block_len[i]; ++j) {
        send_size += block_len[send_block_start + j];
      }
      // get recv information
Guolin Ke's avatar
Guolin Ke committed
276
      comm_size_t need_recv_cnt = 0;
Guolin Ke's avatar
Guolin Ke committed
277
278
279
280
281
282
      for (int j = 0; j < recursive_halving_map_.recv_block_len[i]; ++j) {
        need_recv_cnt += block_len[recv_block_start + j];
      }
      // send and recv at same time
      linkers_->SendRecv(target, input + block_start[send_block_start], send_size, target, output, need_recv_cnt);
      // reduce
Guolin Ke's avatar
Guolin Ke committed
283
      reducer(output, input + block_start[recv_block_start], type_size, need_recv_cnt);
Guolin Ke's avatar
Guolin Ke committed
284
285
    }
  }
Guolin Ke's avatar
Guolin Ke committed
286
287
288
289
290
291
292
293
294
295
296
297
298
  if (!recursive_halving_map_.is_power_of_2) {
    if (recursive_halving_map_.type == RecursiveHalvingNodeType::GroupLeader) {
      // send result to neighbor
      linkers_->Send(recursive_halving_map_.neighbor,
                     input + block_start[recursive_halving_map_.neighbor],
                     block_len[recursive_halving_map_.neighbor]);
    } else if (recursive_halving_map_.type == RecursiveHalvingNodeType::Other) {
      // receive result from neighbor
      int need_recv_cnt = block_len[rank_];
      linkers_->Recv(recursive_halving_map_.neighbor, output, need_recv_cnt);
      return;
    }
  }
Guolin Ke's avatar
Guolin Ke committed
299
300
301
302
  // copy result
  std::memcpy(output, input + block_start[rank_], block_len[rank_]);
}

Guolin Ke's avatar
Guolin Ke committed
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
void Network::ReduceScatterRing(char* input, comm_size_t, int type_size,
                                const comm_size_t* block_start, const comm_size_t* block_len, char* output,
                                comm_size_t, const ReduceFunction& reducer) {
  const int out_rank = (rank_ + 1) % num_machines_;
  const int in_rank = (rank_ - 1 + num_machines_) % num_machines_;
  int out_block = in_rank;
  int in_block = (in_rank - 1 + num_machines_) % num_machines_;
  for (int i = 1; i < num_machines_; ++i) {
    linkers_->SendRecv(out_rank, input + block_start[out_block], block_len[out_block],
                       in_rank, output, block_len[in_block]);
    reducer(output, input + block_start[in_block], type_size, block_len[in_block]);
    out_block = (out_block - 1 + num_machines_) % num_machines_;
    in_block = (in_block - 1 + num_machines_) % num_machines_;
  }
  std::memcpy(output, input + block_start[rank_], block_len[rank_]);
}

320
321
322
323
324
325
326
327
int Network::rank() {
  return rank_;
}

int Network::num_machines() {
  return num_machines_;
}

Guolin Ke's avatar
Guolin Ke committed
328
}  // namespace LightGBM