Commit bf4388c0 authored by Rick Ho's avatar Rick Ho
Browse files

Merge branch 'master' into laekov/batching

parents 284f1424 ef83c893
/* TODO: make it ke xue
#include <cuda_runtime.h>
#include <cassert>
#include <thread>
#include "cuda_stream_manager.h"
CudaStreamManager* smgr = NULL;
thread_local CudaStreamManager smgr;
CudaStreamManager* getCudaStreamManager(const size_t num_expert) {
CudaStreamManager* getCudaStreamManager(const size_t num_expert, const int device) {
if (!smgr) {
smgr = new CudaStreamManager(num_expert);
smgr = new CudaStreamManager(num_expert, device);
}
<<<<<<< HEAD
return smgr;
}
......@@ -20,3 +25,5 @@ void CudaStreamManager::sync(int i) {
cudaStreamSynchronize(streams[i]);
}
}
}
*/
......@@ -5,42 +5,49 @@
#include <cublas_v2.h>
#include <helper_cuda.h>
#include <cstdio>
#define MAX_STREAMS 16
class CudaStreamManager {
public:
CudaStreamManager() : num_expert(0), device(0), streams(NULL) {
int current_device;
checkCudaErrors(cudaGetDevice(&current_device));
#ifdef MOE_DEBUG
printf("constructor at device %d\n", current_device);
#endif
}
struct CudaStreamManager {
const size_t num_expert;
cublasHandle_t* handles;
cudaStream_t* streams;
CudaStreamManager(const size_t num_expert_) : num_expert(num_expert_) {
streams = new cudaStream_t[MAX_STREAMS];
handles = new cublasHandle_t[MAX_STREAMS];
for (size_t i=0; i<MAX_STREAMS; ++i) {
checkCudaErrors(cublasCreate(handles + i));
checkCudaErrors(cudaStreamCreate(streams + i));
checkCudaErrors(cublasSetStream(handles[i], streams[i]));
}
void setup(const size_t num_expert, const int device) {
#ifdef MOE_DEBUG
printf("setup at device %d\n", device);
#endif
this->num_expert = num_expert;
this->device = device;
checkCudaErrors(cudaSetDevice(device));
streams = new cudaStream_t[num_expert];
checkCudaErrors(cublasCreate(&handle));
for (size_t i=0; i<num_expert; ++i) {
checkCudaErrors(cudaStreamCreate(streams+i));
}
}
~CudaStreamManager() {
for (size_t i=0; i<MAX_STREAMS; ++i) {
checkCudaErrors(cudaStreamDestroy(streams[i]));
checkCudaErrors(cublasDestroy(handles[i]));
}
#ifdef MOE_DEBUG
printf("destructor at device %d\n", device);
#endif
for (size_t i=0; i<num_expert; ++i) {
checkCudaErrors(cudaStreamDestroy(*(streams+i)));
}
checkCudaErrors(cublasDestroy(handle));
delete[] streams;
}
inline cudaStream_t& getStream(int idx) {
return streams[idx % MAX_STREAMS];
}
inline cublasHandle_t& getHandle(int idx) {
return handles[idx % MAX_STREAMS];
}
void sync(int=-1);
size_t num_expert;
int device;
cublasHandle_t handle;
cudaStream_t* streams;
};
CudaStreamManager* getCudaStreamManager(const size_t num_expert);
// CudaStreamManager* getCudaStreamManager(const size_t num_expert, const int device);
#endif // CUDA_STREAM_MANAGER
......@@ -5,8 +5,6 @@ import torch
import moe_cuda
torch.manual_seed(42)
torch.cuda.manual_seed(42)
class MOEFunction(Function):
@staticmethod
......@@ -47,7 +45,7 @@ class MOELayer(nn.Module):
self.weight.data[i] = linear.weight.data
def forward(self, inp, gate):
return MOEFunction.apply(inp, gate, self.weight)
return MOEFunction.apply(inp, gate.int(), self.weight)
class MOELayer_raw(nn.Module):
......@@ -89,6 +87,8 @@ def test_module(moe, linear, inp, gate):
def test():
torch.manual_seed(42)
torch.cuda.manual_seed(42)
batch_size = 4
num_expert = 2
in_feat = 6
......@@ -112,5 +112,31 @@ def test():
err = (mo - ro).abs().sum()
print('{} abs err {}'.format(name, err))
def test_dp():
torch.manual_seed(42)
torch.cuda.manual_seed(42)
batch_size = 6
num_expert = 4
in_feat = 2
out_feat = 3
inp = torch.rand(batch_size, in_feat).cuda()
gate = torch.randint(low=0, high=num_expert, size=(batch_size, ), requires_grad=False).int().cuda()
print("data parallel of a nn.Linear model")
linear = nn.Linear(in_feat, in_feat).cuda()
linear_dp = torch.nn.DataParallel(linear, device_ids=[0,1,2])
output = linear_dp(inp)
print("successful!")
print("data parallel of our MoE model")
moe = MOELayer(num_expert, in_feat, out_feat).cuda()
moe_dp = torch.nn.DataParallel(moe, device_ids=[0,1,2])
for i in range(5):
output = moe_dp(inp, gate)
if __name__ == '__main__':
test()
# test()
test_dp()
\ No newline at end of file
......@@ -9,6 +9,7 @@
#include <cuda_runtime.h>
#include <cublas_v2.h>
#include <helper_cuda.h>
#include <c10/cuda/CUDAGuard.h>
#include "timer.hh"
......@@ -20,6 +21,8 @@
// #define MOE_BREAKDOWN
// #define MOE_DEBUG
thread_local CudaStreamManager smgr;
template <typename scalar_t>
__global__
void generate_ptr_offset_kernel(size_t n, const scalar_t* base, size_t stride,
......@@ -103,7 +106,6 @@ void moe_cuda_forward_impl(
expert_ptr[0] = 0;
for (int i = 1; i < num_expert; ++i) {
expert_ptr[i] = expert_ptr[i - 1] + expert_count[i - 1];
}
int *pos = new int[batch_size];
int *d_pos;
......@@ -197,8 +199,6 @@ void moe_cuda_grad_weight(
const size_t out_feat,
const size_t num_expert) {
auto h = getCudaStreamManager(num_expert);
int* gate_host = new int[batch_size];
scalar_t alpha = 1, beta = 1;
checkCudaErrors(cudaMemcpy(gate_host, gate, batch_size * sizeof(int), cudaMemcpyDeviceToHost));
......@@ -220,7 +220,7 @@ void moe_cuda_grad_weight(
out_feat));
}
for (size_t i=0; i<num_expert; ++i) {
checkCudaErrors(cudaStreamSynchronize(*(h->streams + i)));
checkCudaErrors(cudaStreamSynchronize(*(smgr.streams + i)));
}
delete[] gate_host;
}
......@@ -238,6 +238,10 @@ std::vector<torch::Tensor> moe_cuda_forward(
#ifdef MOE_DEBUG
printf("[forward] b=%ld, expert=%ld, in_feat (d_model)=%ld, out_feat (d_ffn)=%ld\n", batch_size, num_expert, in_feat, out_feat);
#endif
const int device = device_of(input).value().index();
if (smgr.streams == NULL) {
smgr.setup(num_expert, device);
}
auto output = input.new_zeros({batch_size, out_feat});
AT_DISPATCH_FLOATING_TYPES(input.scalar_type(), "moe_forward_cuda", ([&] {
......@@ -266,9 +270,14 @@ std::vector<torch::Tensor> moe_cuda_backward(
const auto num_expert = weight.size(0);
const auto out_feat = weight.size(1);
const auto in_feat = weight.size(2);
#ifdef MOE_DEBUG
printf("[backward] b=%ld, expert=%ld, in_feat (d_model)=%ld, out_feat (d_ffn)=%ld\n", batch_size, num_expert, in_feat, out_feat);
#endif
const int device = device_of(input).value().index();
if (smgr.streams == NULL) {
smgr.setup(num_expert, device);
}
auto grad_input = grad_output.new_zeros({batch_size, in_feat}); // batch_size x in_feat
auto grad_weight = grad_output.new_zeros({num_expert, out_feat, in_feat}); // num_expert x out_feat x in_feat
......
......@@ -10,10 +10,11 @@ def perf():
hidden_feat = int(sys.argv[3])
num_expert = int(sys.argv[4])
inp = torch.rand(batch_size, io_feat).cuda()
gate = torch.randint(low=0, high=num_expert, size=(batch_size, ), requires_grad=False).int().cuda()
moe = MOELayer(num_expert, io_feat, hidden_feat, io_feat).cuda()
inp = torch.rand(batch_size, in_feat).cuda("cuda:1")
gate = torch.randint(low=0, high=num_expert, size=(batch_size, ), requires_grad=False).int().cuda("cuda:1")
moe = MOELayer(num_expert, in_feat, out_feat).cuda("cuda:1")
o = moe(inp, gate)
o = moe(inp, gate)
......@@ -27,7 +28,7 @@ def perf():
maxt = 0.
sqtot = 0.
for i in range(n_runs):
gate = torch.randint(low=0, high=num_expert, size=(batch_size, ), requires_grad=False).int().cuda()
gate = torch.randint(low=0, high=num_expert, size=(batch_size, ), requires_grad=False).int().cuda("cuda:1")
ts = time.time()
o = moe(inp, gate)
te = time.time()
......
......@@ -11,7 +11,7 @@ setup(
name='moe_cuda',
sources=[
'moe.cpp',
'cuda_stream_manager.cpp',
# 'cuda_stream_manager.cpp',
'moe_cuda_kernel.cu',
],
extra_compile_args={'cxx': ['-I{}'.format(CUDA_HELPER)],
......
......@@ -9,6 +9,8 @@ import torch.nn as nn
import torch.nn.functional as F
# import torch_sparse
from cuda.moe import MOELayer
sys.path.append('utils')
from proj_adaptive_softmax import ProjectedAdaptiveLogSoftmax
from log_uniform_sampler import LogUniformSampler, sample_logits
......@@ -31,9 +33,74 @@ class PositionalEmbedding(nn.Module):
else:
return pos_emb[:,None,:]
class MoEPositionwiseFF(nn.Module):
class CustomizedMoEPositionwiseFF(nn.Module):
def __init__(self, d_model, d_inner, dropout, pre_lnorm=False, top_k=2, num_expert=32):
super(CustomizedMoEPositionwiseFF, self).__init__()
print("CustomizedMoEPositionwiseFF num_expert=%d top_k=%d" % (num_expert, top_k))
self.top_k = top_k
assert num_expert >= top_k
self.d_model = d_model
self.d_inner = d_inner
self.dropout = dropout
self.gate = nn.Linear(d_model, num_expert)
self.moe1 = MOELayer(num_expert=num_expert, in_feat=d_model+1, out_feat=d_inner)
self.moe2 = MOELayer(num_expert=num_expert, in_feat=d_inner+1, out_feat=d_model)
self.layer_norm = nn.LayerNorm(d_model)
self.pre_lnorm = pre_lnorm
self.dropout = nn.Dropout(dropout)
self.reset_parameter()
def reset_parameter(self):
pass
def forward(self, inp):
residual = inp
if self.pre_lnorm:
inp = self.layer_norm(inp)
gate = self.gate(inp)
gate_top_k_val, gate_top_k_idx = torch.topk(gate, k=self.top_k, dim=-1, largest=True, sorted=False) # [.. x top_k]
gate_top_k_val = gate_top_k_val.view(-1, self.top_k)
gate_score = F.softmax(gate_top_k_val, dim=-1).unsqueeze(1) # (BxL) x 1 x top_k
gate_top_k_idx = gate_top_k_idx.view(-1, self.top_k)
core_out = []
inp = inp.view(-1, self.d_model)
inp = F.pad(inp, pad=(0, 1), mode='constant', value=1.0)
for i in range(self.top_k):
gate_idx = gate_top_k_idx[:, i].contiguous()
x = self.moe1(inp, gate_idx)
x = self.dropout(F.relu(x))
x = F.pad(x, pad=(0, 1), mode='constant', value=1.0)
x = self.moe2(x, gate_idx)
x = self.dropout(x) # (BxL) x d_model
core_out.append(x.unsqueeze(1)) # (BxL) x 1 x d_model
core_out = torch.cat(core_out, dim=1) # (BxL) x top_k x d_model
core_out = torch.bmm(gate_score, core_out) # (BxL) x 1 x d_model
core_out = core_out.view(residual.size(0), residual.size(1), self.d_model)
output = core_out + residual
if not self.pre_lnorm:
output = self.layer_norm(output)
return output
class MoEPositionwiseFFRaw(nn.Module):
def __init__(self, d_model, d_inner, dropout, pre_lnorm=False, top_k=64):
super(MoEPositionwiseFF, self).__init__()
super(MoEPositionwiseFFRaw, self).__init__()
print("MoEPositionwiseFF")
self.top_k = top_k
......@@ -820,7 +887,7 @@ class DecoderLayer(nn.Module):
self.dec_attn = MultiHeadAttn(n_head, d_model, d_head, dropout, **kwargs)
# self.dec_attn = ExtendedMultiHeadAttn(n_head, d_model, d_head, dropout, **kwargs)
self.pos_ff = MultiHeadHierarchicalMoEPositionwiseFF(d_model, d_inner, dropout,
self.pos_ff = CustomizedMoEPositionwiseFF(d_model, d_inner, dropout,
pre_lnorm=kwargs.get('pre_lnorm'))
def forward(self, dec_inp, dec_attn_mask=None, mems=None):
......@@ -840,7 +907,7 @@ class RelLearnableDecoderLayer(nn.Module):
self.dec_attn = RelLearnableMultiHeadAttn(n_head, d_model, d_head, dropout,
**kwargs)
self.pos_ff = MultiHeadHierarchicalMoEPositionwiseFF(d_model, d_inner, dropout,
self.pos_ff = CustomizedMoEPositionwiseFF(d_model, d_inner, dropout,
pre_lnorm=kwargs.get('pre_lnorm'))
def forward(self, dec_inp, r_emb, r_w_bias, r_bias, dec_attn_mask=None, mems=None):
......@@ -861,7 +928,7 @@ class RelPartialLearnableDecoderLayer(nn.Module):
self.dec_attn = RelPartialLearnableMultiHeadAttn(n_head, d_model,
d_head, dropout, **kwargs)
self.pos_ff = MultiHeadHierarchicalMoEPositionwiseFF(d_model, d_inner, dropout,
self.pos_ff = CustomizedMoEPositionwiseFF(d_model, d_inner, dropout,
pre_lnorm=kwargs.get('pre_lnorm'))
def forward(self, dec_inp, r, r_w_bias, r_r_bias, dec_attn_mask=None, mems=None):
......
#!/bin/bash
export LD_LIBRARY_PATH=/home/jiezhong/miniconda3/lib:/usr/local/cuda/lib64:$LD_LIBRARY_PATH
if [[ $1 == 'train' ]]; then
echo 'Run training...'
python train.py \
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment