network.cpp 6.06 KB
Newer Older
Guolin Ke's avatar
Guolin Ke committed
1
2
3
4
5
6
7
8
9
10
11
#include <LightGBM/network.h>

#include <LightGBM/utils/common.h>

#include "linkers.h"

#include <cstring>
#include <cstdlib>

namespace LightGBM {

Hui Xue's avatar
Hui Xue committed
12
// static member definition
Guolin Ke's avatar
Guolin Ke committed
13
14
int Network::num_machines_;
int Network::rank_;
Guolin Ke's avatar
Guolin Ke committed
15
std::unique_ptr<Linkers> Network::linkers_;
Guolin Ke's avatar
Guolin Ke committed
16
17
BruckMap Network::bruck_map_;
RecursiveHalvingMap Network::recursive_halving_map_;
Guolin Ke's avatar
Guolin Ke committed
18
19
std::vector<int> Network::block_start_;
std::vector<int>  Network::block_len_;
Guolin Ke's avatar
Guolin Ke committed
20
int Network::buffer_size_;
Guolin Ke's avatar
Guolin Ke committed
21
std::vector<char> Network::buffer_;
Guolin Ke's avatar
Guolin Ke committed
22
23

void Network::Init(NetworkConfig config) {
Guolin Ke's avatar
Guolin Ke committed
24
  linkers_.reset(new Linkers(config));
Guolin Ke's avatar
Guolin Ke committed
25
26
27
28
  rank_ = linkers_->rank();
  num_machines_ = linkers_->num_machines();
  bruck_map_ = linkers_->bruck_map();
  recursive_halving_map_ = linkers_->recursive_halving_map();
Guolin Ke's avatar
Guolin Ke committed
29
30
  block_start_ = std::vector<int>(num_machines_);
  block_len_ = std::vector<int>(num_machines_);
Guolin Ke's avatar
Guolin Ke committed
31
  buffer_size_ = 1024 * 1024;
Guolin Ke's avatar
Guolin Ke committed
32
  buffer_.resize(buffer_size_);
33
  Log::Info("Local rank: %d, total number of machines: %d", rank_, num_machines_);
Guolin Ke's avatar
Guolin Ke committed
34
35
36
}

void Network::Dispose() {
Guolin Ke's avatar
Guolin Ke committed
37

Guolin Ke's avatar
Guolin Ke committed
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
}

void Network::Allreduce(char* input, int input_size, int type_size, char* output, const ReduceFunction& reducer) {
  int count = input_size / type_size;
  // if small package or small count , do it by all gather.(reduce the communication times.)
  if (count < num_machines_ || input_size < 4096) {
    AllreduceByAllGather(input, input_size, output, reducer);
    return;
  }
  // assign the blocks to every rank.
  int step = (count + num_machines_ - 1) / num_machines_;
  if (step < 1) {
    step = 1;
  }
  block_start_[0] = 0;
  for (int i = 0; i < num_machines_ - 1; ++i) {
Guolin Ke's avatar
Guolin Ke committed
54
    block_len_[i] = std::min(step * type_size, input_size - block_start_[i]);
Guolin Ke's avatar
Guolin Ke committed
55
56
57
58
    block_start_[i + 1] = block_start_[i] + block_len_[i];
  }
  block_len_[num_machines_ - 1] = input_size - block_start_[num_machines_ - 1];
  // do reduce scatter
Guolin Ke's avatar
Guolin Ke committed
59
  ReduceScatter(input, input_size, block_start_.data(), block_len_.data(), output, reducer);
Guolin Ke's avatar
Guolin Ke committed
60
  // do all gather
Guolin Ke's avatar
Guolin Ke committed
61
  Allgather(output, input_size, block_start_.data(), block_len_.data(), output);
Guolin Ke's avatar
Guolin Ke committed
62
63
64
65
66
67
68
69
70
71
72
73
74
75
}

void Network::AllreduceByAllGather(char* input, int input_size, char* output, const ReduceFunction& reducer) {
  // assign blocks
  int all_size = input_size * num_machines_;
  block_start_[0] = 0;
  block_len_[0] = input_size;
  for (int i = 1; i < num_machines_; ++i) {
    block_start_[i] = block_start_[i - 1] + block_len_[i - 1];
    block_len_[i] = input_size;
  }
  // need use buffer here, since size of "output" is smaller than size after all gather
  if (input_size*num_machines_ > buffer_size_) {
    buffer_size_ = input_size*num_machines_;
Guolin Ke's avatar
Guolin Ke committed
76
    buffer_.resize(buffer_size_);
Guolin Ke's avatar
Guolin Ke committed
77
78
  }

Guolin Ke's avatar
Guolin Ke committed
79
  Allgather(input, all_size, block_start_.data(), block_len_.data(), buffer_.data());
Guolin Ke's avatar
Guolin Ke committed
80
  for (int i = 1; i < num_machines_; ++i) {
Guolin Ke's avatar
Guolin Ke committed
81
    reducer(buffer_.data() + block_start_[i], buffer_.data() + block_start_[0], input_size);
Guolin Ke's avatar
Guolin Ke committed
82
83
  }
  // copy back
Guolin Ke's avatar
Guolin Ke committed
84
  std::memcpy(output, buffer_.data(), input_size);
Guolin Ke's avatar
Guolin Ke committed
85
86
87
88
89
90
91
92
93
94
95
}

void Network::Allgather(char* input, int send_size, char* output) {
  // assign blocks
  block_start_[0] = 0;
  block_len_[0] = send_size;
  for (int i = 1; i < num_machines_; ++i) {
    block_start_[i] = block_start_[i - 1] + block_len_[i - 1];
    block_len_[i] = send_size;
  }
  // start all gather
Guolin Ke's avatar
Guolin Ke committed
96
  Allgather(input, send_size * num_machines_, block_start_.data(), block_len_.data(), output);
Guolin Ke's avatar
Guolin Ke committed
97
98
}

Guolin Ke's avatar
Guolin Ke committed
99
void Network::Allgather(char* input, int all_size, const int* block_start, const int* block_len, char* output) {
Guolin Ke's avatar
Guolin Ke committed
100
101
102
103
104
105
106
  int write_pos = 0;
  // use output as receive buffer
  std::memcpy(output, input, block_len[rank_]);
  write_pos += block_len[rank_];
  int accumulated_block = 1;
  for (int i = 0; i < bruck_map_.k; ++i) {
    // get current local block size
Guolin Ke's avatar
Guolin Ke committed
107
    int cur_block_size = std::min(1 << i, num_machines_ - accumulated_block);
Guolin Ke's avatar
Guolin Ke committed
108
109
110
111
    // get out rank
    int out_rank = bruck_map_.out_ranks[i];
    // get in rank
    int in_rank = bruck_map_.in_ranks[i];
Guolin Ke's avatar
Guolin Ke committed
112
113
    // get send information
    int need_send_len = 0;
Guolin Ke's avatar
Guolin Ke committed
114
    // get recv information
Guolin Ke's avatar
Guolin Ke committed
115
    int need_recv_len = 0;
Guolin Ke's avatar
Guolin Ke committed
116
    for (int j = 0; j < cur_block_size; ++j) {
Guolin Ke's avatar
Guolin Ke committed
117
118
      need_send_len += block_len[(rank_ + j) % num_machines_];
      need_recv_len += block_len[(rank_ + accumulated_block + j) % num_machines_];
Guolin Ke's avatar
Guolin Ke committed
119
120
    }
    // send and recv at same time
Guolin Ke's avatar
Guolin Ke committed
121
122
    linkers_->SendRecv(out_rank, output, need_send_len, in_rank, output + write_pos, need_recv_len);
    write_pos += need_recv_len;
Guolin Ke's avatar
Guolin Ke committed
123
124
125
126
127
128
129
130
    accumulated_block += cur_block_size;
  }
  // rotate in-place
  std::reverse<char*>(output, output + all_size);
  std::reverse<char*>(output, output + block_start[rank_]);
  std::reverse<char*>(output + block_start[rank_], output + all_size);
}

Guolin Ke's avatar
Guolin Ke committed
131
132
133
134
135
136
137
void Network::ReduceScatter(char* input, int, const int* block_start, const int* block_len, char* output, const ReduceFunction& reducer) {
  if (recursive_halving_map_.need_pairwise) {
    for (int i = 1; i < num_machines_; ++i) {
      int out_rank = (rank_ + i) % num_machines_;
      int in_rank = (rank_ - i + num_machines_) % num_machines_;
      linkers_->SendRecv(out_rank, input + block_start[out_rank], block_len[out_rank], in_rank, output, block_len[rank_]);
      reducer(output, input + block_start[rank_], block_len[rank_]);
Guolin Ke's avatar
Guolin Ke committed
138
    }
Guolin Ke's avatar
Guolin Ke committed
139
  } else {
Guolin Ke's avatar
Guolin Ke committed
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
    for (int i = 0; i < recursive_halving_map_.k; ++i) {
      // get target
      int target = recursive_halving_map_.ranks[i];
      int send_block_start = recursive_halving_map_.send_block_start[i];
      int recv_block_start = recursive_halving_map_.recv_block_start[i];
      // get send information
      int send_size = 0;
      for (int j = 0; j < recursive_halving_map_.send_block_len[i]; ++j) {
        send_size += block_len[send_block_start + j];
      }
      // get recv information
      int need_recv_cnt = 0;
      for (int j = 0; j < recursive_halving_map_.recv_block_len[i]; ++j) {
        need_recv_cnt += block_len[recv_block_start + j];
      }
      // send and recv at same time
      linkers_->SendRecv(target, input + block_start[send_block_start], send_size, target, output, need_recv_cnt);
      // reduce
      reducer(output, input + block_start[recv_block_start], need_recv_cnt);
    }
  }
  // copy result
  std::memcpy(output, input + block_start[rank_], block_len[rank_]);
}

}  // namespace LightGBM