moe.cpp 5.77 KB
Newer Older
Jiezhong Qiu's avatar
can run  
Jiezhong Qiu committed
1
2
3
4
5
6
#include <torch/extension.h>

#include <cstdio>
#include <iostream>
#include <vector>

7
#include "moe_cuda_kernel.h"
Jiezhong Qiu's avatar
update  
Jiezhong Qiu committed
8
9
10
11
12
13

// NOTE: AT_ASSERT has become AT_CHECK on master after 0.4.
#define CHECK_CUDA(x) AT_ASSERTM(x.type().is_cuda(), #x " must be a CUDA tensor")
#define CHECK_CONTIGUOUS(x) AT_ASSERTM(x.is_contiguous(), #x " must be contiguous")
#define CHECK_INPUT(x) CHECK_CUDA(x); CHECK_CONTIGUOUS(x)

Rick Ho's avatar
Rick Ho committed
14
std::vector<torch::Tensor> moe_expert_count(
15
16
		torch::Tensor gate, 
		size_t num_expert) {
Rick Ho's avatar
Rick Ho committed
17
	CHECK_INPUT(gate);
18
	return moe_cuda_expert_count(gate, num_expert);
Rick Ho's avatar
Rick Ho committed
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
}

std::vector<torch::Tensor> moe_local_scatter(
		torch::Tensor input,
		torch::Tensor pos) {
	CHECK_INPUT(input);
	return moe_cuda_local_scatter(input, pos);
}

std::vector<torch::Tensor> moe_local_gather(
		torch::Tensor output_buf,
		torch::Tensor pos) {
	CHECK_INPUT(output_buf);
	return moe_cuda_local_gather(output_buf, pos);
}


36
37
38
39
40
41
42
43
44
45
46
47
48
49
void merge_bias(torch::Tensor &input_buf, torch::Tensor &weight, at::optional<torch::Tensor> bias_o) {
	torch::Tensor bias = bias_o.value();
	
	weight = at::cat({weight, bias.unsqueeze(2)}, 2); // [W b]
	
	auto options = torch::TensorOptions()
		.device(input_buf.device())
		.dtype(input_buf.dtype());
	
	auto ones = at::ones(input_buf.size(0), options).unsqueeze(1);
	
	input_buf = at::cat({input_buf, ones}, 1); // [X 1]
}

Jiezhong Qiu's avatar
Jiezhong Qiu committed
50
std::vector<torch::Tensor> moe_forward(
51
52
53
54
        torch::Tensor input_buf,     		// [batch_size x in_feat]
		torch::Tensor expert_count,  		// [batch_size]
        torch::Tensor weight,        		// [num_expert x out_feat x in_feat]
		at::optional<torch::Tensor> bias_o  // [num_expert x out_feat] or None
Jiezhong Qiu's avatar
update  
Jiezhong Qiu committed
55
        ) {
56
57
58
59
60

	// Wx+b = [W b] [x]
    //              [1]  
	if (bias_o.has_value()) merge_bias(input_buf, weight, bias_o);

Rick Ho's avatar
Rick Ho committed
61
    CHECK_INPUT(input_buf);
Rick Ho's avatar
Rick Ho committed
62
    CHECK_INPUT(weight);
63
64

	return moe_cuda_forward(input_buf, expert_count, weight);
Jiezhong Qiu's avatar
update  
Jiezhong Qiu committed
65
66
}

Jiezhong Qiu's avatar
Jiezhong Qiu committed
67
std::vector<torch::Tensor> moe_backward(
68
69
70
71
72
        torch::Tensor grad_output_buf, 		// [batch_size x out_feat]
        torch::Tensor input_buf,       		// [batch_size x in_feat]
		torch::Tensor expert_count,
        torch::Tensor weight,           	// [num_expert x out_feat x in_feat]
		at::optional<torch::Tensor> bias_o  // [num_expert x out_feat] or None
Jiezhong Qiu's avatar
update  
Jiezhong Qiu committed
73
        ) {
74
75
76
77
78
	
	// Wx+b = [W b] [x]
    //              [1]  
	if (bias_o.has_value()) merge_bias(input_buf, weight, bias_o);

Rick Ho's avatar
Rick Ho committed
79
80
    CHECK_INPUT(grad_output_buf);
    CHECK_INPUT(input_buf);
Jiezhong Qiu's avatar
update  
Jiezhong Qiu committed
81
    CHECK_INPUT(weight);
82
83
	
    return moe_cuda_backward(grad_output_buf, input_buf, expert_count, weight, bias_o.has_value());
Jiezhong Qiu's avatar
can run  
Jiezhong Qiu committed
84
85
}

86
87
#ifdef MOE_USE_NCCL

Rick Ho's avatar
Rick Ho committed
88
89
90
91
92
93
std::vector<torch::Tensor> moe_expert_exchange(
		torch::Tensor local_expert_count,
		size_t num_expert, size_t n_workers) {
	return moe_cuda_expert_exchange(local_expert_count, num_expert, n_workers);
}

94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
std::vector<torch::Tensor> moe_global_scatter(
		torch::Tensor input_buf,
		torch::Tensor local_expert_count,
		torch::Tensor global_expert_count,
		size_t batch_size, size_t n_workers) {
	CHECK_INPUT(input_buf);
	return moe_cuda_global_scatter(input_buf,
		   	local_expert_count, global_expert_count,
			batch_size, n_workers);
}

std::vector<torch::Tensor> moe_global_gather(
		torch::Tensor output_buf,
		torch::Tensor local_expert_count,
		torch::Tensor global_expert_count,
		size_t batch_size, size_t n_workers) {
	CHECK_INPUT(output_buf);
	return moe_cuda_global_gather(output_buf,
		   	local_expert_count, global_expert_count,
			batch_size, n_workers);
}

Rick Ho's avatar
Rick Ho committed
116
117
118
119
120
121
122
123
124
125
126
127
128
129

std::vector<torch::Tensor> moe_global_fused_forward(
		torch::Tensor input_buf,
        torch::Tensor weight,
		torch::Tensor local_expert_count,
		torch::Tensor global_expert_count,
		long global_batch_size, long local_batch_size, long n_workers) {
	CHECK_INPUT(input_buf);
	CHECK_INPUT(weight);
	return moe_cuda_global_fused_forward(
			input_buf, weight, local_expert_count, global_expert_count,
			global_batch_size, local_batch_size, n_workers);
}

Rick Ho's avatar
Rick Ho committed
130
131
132
133
134
135
136
#include <c10d/ProcessGroupNCCL.hpp>
#include "cuda_stream_manager.h"

class HackNCCLGroup: public c10d::ProcessGroupNCCL {
public:
	ncclComm_t getcomm(at::Device dev) {
		auto key = std::to_string(dev.index());
137
#ifdef ENABLE_NCCL_P2P_SUPPORT
Rick Ho's avatar
Rick Ho committed
138
139
140
141
142
143
144
145
146
147
148
149
		ncclUniqueId ncclID;
		int rank = getRank();
		if (rank == 0) {
			ncclGetUniqueId(&ncclID);
		}
		broadcastUniqueNCCLID(&ncclID,
				c10d::OpType::SEND,
				"fastmoe_nccl_comm",
				rank);
		ncclComm_t comm;
		ncclCommInitRank(&comm, getSize(), ncclID, rank);
		return comm;
150
151
#else
		auto v = getNCCLComm(key, {dev});
Rick Ho's avatar
Rick Ho committed
152
153
154
155
		if (v.size() == 0) {
			std::cerr << "PyTorch has nothing\n";
			return 0;
		}
Rick Ho's avatar
Rick Ho committed
156
157
158
		int count;
		ncclCommCount(v[0]->getNcclComm(), &count);
		std::cerr << "PyTorch has " << v.size() << " comms, comm 0 size " << count << "\n";
Rick Ho's avatar
Rick Ho committed
159
		return v[0]->getNcclComm();
Rick Ho's avatar
Rick Ho committed
160
#endif
Rick Ho's avatar
Rick Ho committed
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
	}
};

void moe_ensure_nccl(c10d::ProcessGroupNCCL& p, torch::Tensor t) {
	auto smgr = getCudaStreamManager(t.device().index());
	if (smgr->ncclgood) {
		return;
	}
	HackNCCLGroup* h = (HackNCCLGroup*)(void*)&p;
	smgr->ncclcomm = h->getcomm(t.device());
	if (smgr->ncclcomm != 0) {
		smgr->ncclgood = 1;
	} else {
		std::cerr << "Nccl initialization failed\n";
	}
Jiezhong Qiu's avatar
update  
Jiezhong Qiu committed
176
}
Rick Ho's avatar
Rick Ho committed
177
178

#endif  // MOE_USE_NCCL
Jiezhong Qiu's avatar
update  
Jiezhong Qiu committed
179
180

PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) {
Rick Ho's avatar
Rick Ho committed
181
182
183
  m.def("expert_count", &moe_expert_count, "MoE expert count (CUDA)");
  m.def("local_scatter", &moe_local_scatter, "MoE local scatter (CUDA)");
  m.def("local_gather", &moe_local_gather, "MoE local gather (CUDA)");
184
#ifdef MOE_USE_NCCL
Rick Ho's avatar
Rick Ho committed
185
  m.def("expert_exchange", &moe_expert_exchange, "MoE expert exchange (CUDA)");
186
187
  m.def("global_scatter", &moe_global_scatter, "MoE global scatter (CUDA)");
  m.def("global_gather", &moe_global_gather, "MoE global gather (CUDA)");
Rick Ho's avatar
Rick Ho committed
188
189
  m.def("global_fused_forward", &moe_global_fused_forward, 
		  "MoE global gather (CUDA)");
Rick Ho's avatar
Rick Ho committed
190
  m.def("ensure_nccl", &moe_ensure_nccl, "MoE ensure torch nccl comm");
191
#endif
Jiezhong Qiu's avatar
Jiezhong Qiu committed
192
193
  m.def("forward", &moe_forward, "MoE forward (CUDA)");
  m.def("backward", &moe_backward, "MoE backward (CUDA)");
Rick Ho's avatar
Rick Ho committed
194
}