Commit c0439e64 authored by ThomasNing's avatar ThomasNing
Browse files

Finished the receiving kernel for the message receive coding.

parent 5dd5b531
......@@ -19,6 +19,7 @@ struct AllocateAndTransferFunctor
{
// Invoke the memory transfer between GPUs based on whether it is host gpu or slave gpu.
float invoke_transfer(ck_tile::DeviceMem& transfer_buf,
std::vector<ck_tile::DeviceMem>& receive_mem_bufs,
ck_tile::index_t host_gpu,
int device_id,
const ck_tile::ArgParser& arg_parser,
......@@ -86,8 +87,11 @@ struct AllocateAndTransferFunctor
if(static_cast<ck_tile::index_t>(device_id) == host_gpu)
{
// initialize the receive data buffer and global memory location.
ck_tile::HostTensor<InputType> receive_host({M, N});
ck_tile::DeviceMem receive_buf(receive_host.get_element_space_size_in_bytes());
std::array<const void*, MaxSendGPUNum> p_receive_list;
for(size_t i = 0; i < receive_mem_bufs.size(); ++i) {
p_receive_list[i] = receive_mem_bufs[i].GetDeviceBuffer();
}
args_receive.p_receive_list = p_receive_list;
// initialize the output data buffer.
std::string output_type = arg_parser.get_str("output_type");
if(output_type.compare("float") == 0)
......@@ -96,6 +100,7 @@ struct AllocateAndTransferFunctor
ck_tile::DeviceMem output_buf(output_host.get_element_space_size_in_bytes());
args_receive.p_output = output_buf.GetDeviceBuffer();
auto kargs_slave = SlaveKernel::MakeKargs(args_receive.p_reduce,
args_receive.p_receive_list,
args_receive.p_output,
args_receive.M,
args_receive.N);
......@@ -132,6 +137,7 @@ struct AllocateAndTransferFunctor
void operator()(int device_id,
ck_tile::HostTensor<InputType>& host_tensor,
ck_tile::DeviceMem& device_mem,
std::vector<ck_tile::DeviceMem>& receive_mem,
ck_tile::index_t host_gpu,
const ck_tile::ArgParser& arg_parser)
{
......@@ -142,6 +148,11 @@ struct AllocateAndTransferFunctor
<< hipGetErrorString(hip_err_set_device) << std::endl;
return;
}
if(device_id == host_gpu){
for(size_t i = 0; i < receive_mem.size(); ++i) {
receive_mem[i].Realloc(host_tensor.get_element_space_size_in_bytes());
}
}
// Allocate device memory
device_mem.Realloc(host_tensor.get_element_space_size_in_bytes());
// Transfer data to device
......@@ -152,12 +163,14 @@ struct AllocateAndTransferFunctor
static_cast<int>(host_gpu),
static_cast<int>(worldSize),
device_mem.GetDeviceBuffer(),
receive_mem,
host_tensor.get_element_space_size_in_bytes());
int n_warmup = arg_parser.get_int("warmup");
int n_repeat = arg_parser.get_int("repeat");
invoke_transfer(device_mem,
receive_mem,
host_gpu,
device_id,
arg_parser,
......@@ -221,6 +234,7 @@ bool run_cross_gpu_reduce(ck_tile::ArgParser arg_parser)
std::vector<ck_tile::HostTensor<InputType>> transfer_tensor_host_list;
transfer_tensor_host_list.reserve(gpu_nums);
std::vector<ck_tile::DeviceMem> transfer_bufs(gpu_nums);
std::vector<ck_tile::DeviceMem> slave_receive_bufs(gpu_nums - 1);
std::vector<std::thread> threads;
AllocateAndTransferFunctor<InputType, OutputType> allocateAndTransfer;
......@@ -306,6 +320,7 @@ bool run_cross_gpu_reduce(ck_tile::ArgParser arg_parser)
device_list[i],
std::ref(transfer_tensor_host_list[i]),
std::ref(transfer_bufs[i]),
std::ref(slave_receive_bufs),
host_gpu,
arg_parser);
}
......
......@@ -5,9 +5,12 @@
#include "ck_tile/host.hpp"
constexpr int MaxSendGPUNum = 7;
struct transfer_receive_basic_args
{
const void* p_reduce;
std::array<const void*, MaxSendGPUNum> p_receive_list;
const void* p_output;
ck_tile::index_t host_gpu;
ck_tile::index_t device_id;
......
......@@ -5,6 +5,8 @@
#include "ck_tile/core.hpp"
#include "ck_tile/ops/common.hpp"
#include <vector>
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wsuggest-destructor-override"
#pragma clang diagnostic ignored "-Wold-style-cast"
......@@ -26,7 +28,12 @@ extern __constant__ DeviceHandle<mscclpp::SmChannel> constSlaveSmChannels[8]; //
extern __constant__ DeviceHandle<mscclpp::SmChannel> constMasterSmChannel;
void setupConnection(int rank, int slaveRank, int worldSize, void* dst_data, size_t dataSize)
void setupConnection(int rank,
int slaveRank,
int worldSize,
void* dst_data,
std::vector<ck_tile::DeviceMem>& receive_mem_vector,
size_t dataSize)
{
// Initialize MSCCL++ Communicator
auto bootstrap = std::make_shared<mscclpp::TcpBootstrap>(rank, worldSize);
......@@ -87,7 +94,7 @@ void setupConnection(int rank, int slaveRank, int worldSize, void* dst_data, siz
SmChannels.push_back(mscclpp::deviceHandle(
mscclpp::SmChannel(slaveSemaphores[i],
remoteMemories[i], // Remote buffer from the sender
dst_data // Local buffer (this slave's buffer)
receive_mem_vector[i].GetDeviceBuffer() // Local buffer (this slave's buffer)
)));
}
hipError_t error_slave =
......
......@@ -20,17 +20,20 @@ struct ReduceReceiveKernel
struct ReduceReceiveKargs
{
const void* reduce_ptr;
std::array<const void*, MaxSendGPUNum> receive_ptr_list;
const void* output_ptr;
index_t M;
index_t N;
};
CK_TILE_HOST static constexpr ReduceReceiveKargs MakeKargs(const void* reduce_ptr,
const void* output_ptr,
index_t M,
index_t N)
CK_TILE_HOST static constexpr ReduceReceiveKargs
MakeKargs(const void* reduce_ptr,
std::array<const void*, MaxSendGPUNum> receive_ptr_list,
const void* output_ptr,
index_t M,
index_t N)
{
return ReduceReceiveKargs{reduce_ptr, output_ptr, M, N};
return ReduceReceiveKargs{reduce_ptr, receive_ptr_list, output_ptr, M, N};
}
CK_TILE_HOST_DEVICE static constexpr index_t GetSmemSize()
......@@ -45,10 +48,10 @@ struct ReduceReceiveKernel
CK_TILE_DEVICE void operator()(ReduceReceiveKargs kargs) const
{
auto channel = *constSlaveSmChannels[0];
const auto [i_m, i_n] = CrossReducePartitioner{}();
const DataType* reduce_start = static_cast<const DataType*>(reduce_ptr);
auto transfer_tensor_view = [&]() {
auto channel = constSlaveSmChannels[0];
const auto [i_m, i_n] = CrossReducePartitioner{}();
const DataType* reduce_start = static_cast<const DataType*>(kargs.reduce_ptr);
auto transfer_tensor_view = [&]() {
return make_naive_tensor_view<address_space_enum::global>(
reduce_start,
make_tuple(kargs.M, kargs.N),
......@@ -61,12 +64,32 @@ struct ReduceReceiveKernel
make_tuple(number<ReduceReceivePipeline::Block_M>{},
number<ReduceReceivePipeline::Block_N>{}),
{i_m, i_n});
uint32_t numThreads = static_cast<uint32_t>(CrossReducePartitioner::NumThreads(kargs.M, kargs.N));
uint32_t threadId = static_cast<uint32_t>(i_m + i_n * (kargs.M + ReduceReceivePipeline::Block_M - 1) / ReduceReceivePipeline::Block_M);
uint64_t totalBytes = static_cast<uint64_t>(ReduceReceivePipeline::Block_M * ReduceReceivePipeline::Block_N * sizeof(DataType));
uint32_t numThreads =
static_cast<uint32_t>(CrossReducePartitioner::NumThreads(kargs.M, kargs.N));
uint32_t threadId =
static_cast<uint32_t>(i_m + i_n * (kargs.M + ReduceReceivePipeline::Block_M - 1) /
ReduceReceivePipeline::Block_M);
uint64_t totalBytes = static_cast<uint64_t>(
ReduceReceivePipeline::Block_M * ReduceReceivePipeline::Block_N * sizeof(DataType));
channel.get(0, totalBytes, threadId, numThreads);
// After the channel get, start the memory block preparation for the receiving window
const DataType* receive_start =
static_cast<const DataType*>(kargs.receive_ptr_list[0]);
auto receive_tensor_view = [&]() {
return make_naive_tensor_view<address_space_enum::global>(
receive_start,
make_tuple(kargs.M, kargs.N),
make_tuple(kargs.N, 1),
number<ReduceReceivePipeline::Vector_N>{},
number<1>{});
}();
auto receive_block_window =
make_tile_window(receive_tensor_view,
make_tuple(number<ReduceReceivePipeline::Block_M>{},
number<ReduceReceivePipeline::Block_N>{}),
{i_m, i_n});
const ODataType* output_start = static_cast<const ODataType*>(kargs.output_ptr);
auto output_tensor_view = [&]() {
......@@ -85,7 +108,8 @@ struct ReduceReceiveKernel
__shared__ char smem_ptr[ReduceReceivePipeline::GetSmemSize()];
ReduceReceivePipeline{}(transfer_block_window, output_block_window, smem_ptr);
ReduceReceivePipeline{}(
transfer_block_window, receive_block_window, output_block_window, smem_ptr);
return;
}
};
......
......@@ -14,7 +14,7 @@ struct CrossReducePartitioner
static constexpr index_t kM = CrossReduceShape::Block_M;
static constexpr index_t kN = CrossReduceShape::Block_N;
CK_TILE_HOST static constexpr auto NumThreads(index_t M, index_t N){
CK_TILE_HOST_DEVICE static constexpr auto NumThreads(index_t M, index_t N){
index_t GridDimX = (M + kM - 1) / kM;
index_t GridDimY = (N + kN - 1) / kN;
return GridDimX * GridDimY;
......
......@@ -40,7 +40,7 @@ struct ReduceSendKernel
CK_TILE_DEVICE void operator()(ReduceSendKargs kargs) const
{
const auto i_M = CrossReducePartitioner{}();
const auto [i_m, i_n] = CrossReducePartitioner{}();
const DataType* reduce_start = static_cast<const DataType*>(kargs.reduce_ptr);
auto transfer_tensor_view = [&]() {
return make_naive_tensor_view<address_space_enum::global>(
......@@ -54,7 +54,7 @@ struct ReduceSendKernel
make_tile_window(transfer_tensor_view,
make_tuple(number<ReduceSendPipeline::Block_M>{},
number<ReduceSendPipeline::Block_N>{}),
{i_M, 0});
{i_m, i_n});
__shared__ char smem_ptr[ReduceSendPipeline::GetSmemSize()];
......
......@@ -40,10 +40,14 @@ struct CrossReduceReceivePipelineScaleUp
return Policy::template GetSmemSize<DataType, ReduceShape>();
}
template <typename InDramBlockWindowTmp, typename OutDramBlockWindowTmp>
CK_TILE_HOST_DEVICE auto operator()(const InDramBlockWindowTmp& input_dram_block_window_tmp,
const OutDramBlockWindowTmp& output_dram_block_window_tmp,
void* p_smem) const
template <typename InDramBlockWindowTmp,
typename ReceiveDramBlockWindowTmp,
typename OutDramBlockWindowTmp>
CK_TILE_HOST_DEVICE auto
operator()(const InDramBlockWindowTmp& input_dram_block_window_tmp,
const ReceiveDramBlockWindowTmp& receive_dram_block_window_tmp,
const OutDramBlockWindowTmp& output_dram_block_window_tmp,
void* p_smem) const
{
DataType* p_lds = static_cast<DataType*>(p_smem);
constexpr auto lds_block_desc = Policy::template MakeLdsBlockDescriptor<ReduceShape>();
......@@ -52,9 +56,6 @@ struct CrossReduceReceivePipelineScaleUp
integer_divide_ceil(sizeof(DataType) * lds_block_desc.get_element_space_size(), 16) *
16;
DataType* p_receive_lds = static_cast<DataType*>(
static_cast<void*>(static_cast<char*>(p_smem) + lds_block_space_size_aligned));
// DRAM tile window for load
auto copy_dram_window =
make_tile_window(input_dram_block_window_tmp.get_bottom_tensor_view(),
......@@ -69,10 +70,31 @@ struct CrossReduceReceivePipelineScaleUp
auto host_block_tile = load_tile(copy_dram_window);
const auto block_tile_tmp =
// Receive tile window initialization
DataType* p_receive_lds = static_cast<DataType*>(
static_cast<void*>(static_cast<char*>(p_smem) + lds_block_space_size_aligned));
auto receive_dram_window =
make_tile_window(receive_dram_block_window_tmp.get_bottom_tensor_view(),
make_tuple(number<Block_M>{}, number<Block_N>{}),
receive_dram_block_window_tmp.get_window_origin(),
Policy::template MakeDramTileDistribution<ReduceShape>());
auto receive_lds_block =
make_tensor_view<address_space_enum::lds>(p_receive_lds, lds_block_desc);
auto receive_lds_window = make_tile_window(receive_lds_block,
make_tuple(number<Block_M>{}, number<Block_N>{}),
{0, 0},
receive_dram_window.get_tile_distribution());
auto receive_block_tile = load_tile(receive_dram_window);
const auto host_block_tile_tmp =
tile_elementwise_in([](const DataType& a) { return a; }, host_block_tile);
store_tile(copy_lds_window, block_tile_tmp);
move_tile_window(copy_lds_window, {0, Block_N});
store_tile(copy_lds_window, host_block_tile_tmp);
const auto receive_block_tile_tmp =
tile_elementwise_in([](const DataType& a) { return a; }, receive_block_tile);
store_tile(receive_lds_window, receive_block_tile_tmp);
__syncthreads();
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment