Commit b5a9a18d authored by zhuwenwen's avatar zhuwenwen
Browse files

Merge branch 'v0.7.2-dev-custom' into 'v0.7.2-dev'

V0.7.2 dev custom

See merge request dcutoolkit/deeplearing/vllm!90
parents ae0ed592 c8a63b38
......@@ -463,6 +463,11 @@ if(VLLM_GPU_LANG STREQUAL "CUDA")
# if CUDA endif
endif()
if(VLLM_GPU_LANG STREQUAL "HIP")
list(APPEND VLLM_EXT_SRC
"csrc/custom_all_reduce.cu")
endif()
message(STATUS "Enabling C extension.")
define_gpu_extension_target(
_C
......
......@@ -142,3 +142,44 @@ void register_graph_buffers(fptr_t _fa,
bytes.reserve(handles.size());
fa->register_graph_buffers(bytes, offsets);
}
std::tuple<fptr_t, torch::Tensor> allocate_shared_buffer_and_handle(
int64_t size) {
auto device_index = c10::cuda::current_device();
at::DeviceGuard device_guard(at::Device(at::DeviceType::CUDA, device_index));
void* buffer;
cudaStreamCaptureMode mode = cudaStreamCaptureModeRelaxed;
auto stream = c10::cuda::getCurrentCUDAStream().stream();
AT_CUDA_CHECK(cudaThreadExchangeStreamCaptureMode(&mode));
#if defined(USE_ROCM)
// data buffers need to be "uncached" for signal on MI200
AT_CUDA_CHECK(
hipExtMallocWithFlags((void**)&buffer, size, hipDeviceMallocUncached));
#else
AT_CUDA_CHECK(cudaMalloc((void**)&buffer, size));
#endif
AT_CUDA_CHECK(cudaMemsetAsync(buffer, 0, size, stream));
AT_CUDA_CHECK(cudaStreamSynchronize(stream));
AT_CUDA_CHECK(cudaThreadExchangeStreamCaptureMode(&mode));
auto options =
torch::TensorOptions().dtype(torch::kUInt8).device(torch::kCPU);
auto handle =
torch::empty({static_cast<int64_t>(sizeof(cudaIpcMemHandle_t))}, options);
AT_CUDA_CHECK(
cudaIpcGetMemHandle((cudaIpcMemHandle_t*)handle.data_ptr(), buffer));
return std::make_tuple(reinterpret_cast<fptr_t>(buffer), handle);
}
fptr_t open_mem_handle(torch::Tensor& mem_handle) {
void* ipc_ptr;
AT_CUDA_CHECK(cudaIpcOpenMemHandle(
(void**)&ipc_ptr, *((const cudaIpcMemHandle_t*)mem_handle.data_ptr()),
cudaIpcMemLazyEnablePeerAccess));
return reinterpret_cast<fptr_t>(ipc_ptr);
}
void free_shared_buffer(fptr_t buffer) {
AT_CUDA_CHECK(cudaFree(reinterpret_cast<void*>(buffer)));
}
\ No newline at end of file
......@@ -5,6 +5,10 @@
#include <cuda_fp16.h>
#include <cuda_runtime.h>
#if defined(USE_ROCM)
typedef __hip_bfloat16 nv_bfloat16;
#endif
#include <iostream>
#include <array>
#include <limits>
......@@ -12,6 +16,7 @@
#include <unordered_map>
#include <vector>
namespace vllm {
#define CUDACHECK(cmd) \
do { \
cudaError_t e = cmd; \
......@@ -22,24 +27,38 @@
} \
} while (0)
namespace vllm {
// Maximal number of blocks in allreduce kernel.
constexpr int kMaxBlocks = 36;
// Default number of blocks in allreduce kernel.
#ifndef USE_ROCM
const int defaultBlockLimit = 36;
CUpointer_attribute rangeStartAddrAttr =
CUDA_POINTER_ATTRIBUTE_RANGE_START_ADDR;
#else
const int defaultBlockLimit = 16;
hipPointer_attribute rangeStartAddrAttr =
HIP_POINTER_ATTRIBUTE_RANGE_START_ADDR;
#endif
// Counter may overflow, but it's fine since unsigned int overflow is
// well-defined behavior.
using FlagType = uint32_t;
// Two sets of peer counters are needed for two syncs: starting and ending an
// operation. The reason is that it's possible for peer GPU block to arrive at
// the second sync point while the current GPU block haven't passed the first
// sync point. Thus, peer GPU may write counter+1 while current GPU is busy
// waiting for counter. We use alternating counter array to avoid this
// possibility.
struct Signal {
alignas(128) FlagType self_counter[kMaxBlocks][8];
// Two sets of peer counters are needed for two syncs. The reason is that
// it's possible for peer GPU block to arrive at the second sync point while
// the current GPU block haven't passed the first sync point. Thus, peer GPU
// may write counter+1 while current GPU is busy waiting for counter. We use
// alternating counter array to avoid this possibility.
alignas(128) FlagType peer_counter[2][kMaxBlocks][8];
alignas(128) FlagType start[kMaxBlocks][8];
alignas(128) FlagType end[kMaxBlocks][8];
alignas(128) FlagType _flag[kMaxBlocks]; // incremental flags for each rank
};
struct __align__(16) RankData {
const void* __restrict__ ptrs[8];
const void* ptrs[8];
};
struct __align__(16) RankSignals {
......@@ -134,27 +153,29 @@ DINLINE O downcast(array_t<float, O::size> val) {
}
}
#if !defined(USE_ROCM)
static DINLINE void st_flag_release(FlagType* flag_addr, FlagType flag) {
#if defined(__CUDA_ARCH__) && __CUDA_ARCH__ >= 700
#if defined(__CUDA_ARCH__) && __CUDA_ARCH__ >= 700
asm volatile("st.release.sys.global.u32 [%1], %0;" ::"r"(flag),
"l"(flag_addr));
#else
#else
asm volatile("membar.sys; st.volatile.global.u32 [%1], %0;" ::"r"(flag),
"l"(flag_addr));
#endif
#endif
}
static DINLINE FlagType ld_flag_acquire(FlagType* flag_addr) {
FlagType flag;
#if defined(__CUDA_ARCH__) && __CUDA_ARCH__ >= 700
#if defined(__CUDA_ARCH__) && __CUDA_ARCH__ >= 700
asm volatile("ld.acquire.sys.global.u32 %0, [%1];"
: "=r"(flag)
: "l"(flag_addr));
#else
#else
asm volatile("ld.volatile.global.u32 %0, [%1]; membar.gl;"
: "=r"(flag)
: "l"(flag_addr));
#endif
#endif
return flag;
}
......@@ -170,37 +191,108 @@ static DINLINE FlagType ld_flag_volatile(FlagType* flag_addr) {
return flag;
}
// is_start: whether this is the very first synchronization barrier.
// need_fence: whether a memory fence is needed. If true, a release-acquire
// semantic is used to enforce memory access order before and after this
// barrier.
template <int ngpus, bool is_start, bool need_fence = false>
DINLINE void multi_gpu_barrier(const RankSignals& sg, Signal* self_sg,
int rank) {
if constexpr (!is_start) __syncthreads();
static_assert(
!(is_start && need_fence)); // Start barrier shouldn't need fence.
// This function is meant to be used as the first synchronization in the all
// reduce kernel. Thus, it doesn't need to make any visibility guarantees for
// prior memory accesses. Note: volatile writes will not be reordered against
// other volatile writes.
template <int ngpus>
DINLINE void start_sync(const RankSignals& sg, Signal* self_sg, int rank) {
uint32_t flag = self_sg->_flag[blockIdx.x] + 1;
if (threadIdx.x < ngpus) {
// Increment the counter. Technically we only need one counter, but we use
// multiple per block to eliminate the need to share the counter via smem.
auto val = self_sg->self_counter[blockIdx.x][threadIdx.x] += 1;
auto peer_counter_ptr = &sg.signals[threadIdx.x]->start[blockIdx.x][rank];
auto self_counter_ptr = &self_sg->start[blockIdx.x][threadIdx.x];
// Write the expected counter value to peer and wait for correct value
// from peer.
st_flag_volatile(peer_counter_ptr, flag);
while (ld_flag_volatile(self_counter_ptr) != flag);
}
__syncthreads();
// use one thread to update flag
if (threadIdx.x == 0) self_sg->_flag[blockIdx.x] = flag;
}
// This function is meant to be used as the second or the final
// synchronization barrier in the all reduce kernel. If it's the final
// synchronization barrier, we don't need to make any visibility guarantees
// for prior memory accesses.
template <int ngpus, bool final_sync = false>
DINLINE void end_sync(const RankSignals& sg, Signal* self_sg, int rank) {
__syncthreads();
uint32_t flag = self_sg->_flag[blockIdx.x] + 1;
if (threadIdx.x < ngpus) {
auto peer_counter_ptr = &sg.signals[threadIdx.x]->end[blockIdx.x][rank];
auto self_counter_ptr = &self_sg->end[blockIdx.x][threadIdx.x];
// Write the expected counter value to peer and wait for correct value from
// peer.
auto peer_counter_ptr =
&sg.signals[threadIdx.x]->peer_counter[val % 2][blockIdx.x][rank];
auto self_counter_ptr =
&self_sg->peer_counter[val % 2][blockIdx.x][threadIdx.x];
if constexpr (need_fence) {
st_flag_release(peer_counter_ptr, val);
while (ld_flag_acquire(self_counter_ptr) != val);
if constexpr (!final_sync) {
st_flag_release(peer_counter_ptr, flag);
while (ld_flag_acquire(self_counter_ptr) != flag);
} else {
st_flag_volatile(peer_counter_ptr, val);
while (ld_flag_volatile(self_counter_ptr) != val);
st_flag_volatile(peer_counter_ptr, flag);
while (ld_flag_volatile(self_counter_ptr) != flag);
}
}
if constexpr (is_start || need_fence) __syncthreads();
if constexpr (!final_sync) __syncthreads();
// use one thread to update flag
if (threadIdx.x == 0) self_sg->_flag[blockIdx.x] = flag;
}
#else
template <int ngpus>
DINLINE void start_sync(const RankSignals& sg, Signal* self_sg, int rank) {
uint32_t flag = self_sg->_flag[blockIdx.x] + 1;
if (threadIdx.x < ngpus) {
// simultaneously write to the corresponding flag of all ranks.
// Latency = 1 p2p write
// __scoped_atomic_store_n(&sg.signals[threadIdx.x]->start[blockIdx.x][rank],
// flag, __ATOMIC_RELAXED, __MEMORY_SCOPE_SYSTEM);
// // wait until we got true from all ranks
// while (__scoped_atomic_load_n(&self_sg->start[blockIdx.x][threadIdx.x],
// __ATOMIC_RELAXED,
// __MEMORY_SCOPE_DEVICE) < flag);
__atomic_store_n(&sg.signals[threadIdx.x]->start[blockIdx.x][rank], flag,
__ATOMIC_RELAXED);
// wait until we got true from all ranks
while (__atomic_load_n(&self_sg->start[blockIdx.x][threadIdx.x],
__ATOMIC_RELAXED) < flag);
}
__syncthreads();
// use one thread to update flag
if (threadIdx.x == 0) self_sg->_flag[blockIdx.x] = flag;
}
template <int ngpus, bool final_sync = false>
DINLINE void end_sync(const RankSignals& sg, Signal* self_sg, int rank) {
__syncthreads();
uint32_t flag = self_sg->_flag[blockIdx.x] + 1;
if (threadIdx.x < ngpus) {
// simultaneously write to the corresponding flag of all ranks.
// Latency = 1 p2p write
// __scoped_atomic_store_n(&sg.signals[threadIdx.x]->end[blockIdx.x][rank],
// flag,
// final_sync ? __ATOMIC_RELAXED : __ATOMIC_RELEASE,
// __MEMORY_SCOPE_SYSTEM);
// // wait until we got true from all ranks
// while (
// __scoped_atomic_load_n(&self_sg->end[blockIdx.x][threadIdx.x],
// final_sync ? __ATOMIC_RELAXED : __ATOMIC_ACQUIRE,
// __MEMORY_SCOPE_DEVICE) < flag);
__atomic_store_n(&sg.signals[threadIdx.x]->end[blockIdx.x][rank], flag,
final_sync ? __ATOMIC_RELAXED : __ATOMIC_RELEASE);
// wait until we got true from all ranks
while (__atomic_load_n(&self_sg->end[blockIdx.x][threadIdx.x],
final_sync ? __ATOMIC_RELAXED : __ATOMIC_ACQUIRE) <
flag);
}
if constexpr (!final_sync) __syncthreads();
// use one thread to update flag
if (threadIdx.x == 0) self_sg->_flag[blockIdx.x] = flag;
}
#endif
template <typename P, int ngpus, typename A>
DINLINE P packed_reduce(const P* ptrs[], int idx) {
A tmp = upcast(ptrs[0][idx]);
......@@ -220,13 +312,13 @@ __global__ void __launch_bounds__(512, 1)
// note: we don't reorder the address so the accumulation order is the same
// for all ranks, ensuring bitwise identical results
auto dp = *_dp;
multi_gpu_barrier<ngpus, true>(sg, self_sg, rank);
start_sync<ngpus>(sg, self_sg, rank);
// do the actual reduction
for (int idx = blockIdx.x * blockDim.x + threadIdx.x; idx < size;
idx += gridDim.x * blockDim.x) {
((P*)result)[idx] = packed_reduce<P, ngpus, A>((const P**)&dp.ptrs[0], idx);
}
multi_gpu_barrier<ngpus, false>(sg, self_sg, rank);
end_sync<ngpus, true>(sg, self_sg, rank);
}
template <typename P>
......@@ -255,18 +347,20 @@ __global__ void __launch_bounds__(512, 1)
tmps[i] = get_tmp_buf<P>(sg.signals[target]);
}
auto tmp_out = tmps[0];
multi_gpu_barrier<ngpus, true>(sg, self_sg, rank);
start_sync<ngpus>(sg, self_sg, rank);
// stage 1: reduce scatter
for (int idx = start + tid; idx < end; idx += stride) {
tmp_out[idx - start] = packed_reduce<P, ngpus, A>(ptrs, idx);
}
multi_gpu_barrier<ngpus, false, true>(sg, self_sg, rank);
end_sync<ngpus>(sg, self_sg, rank);
// stage 2: allgather. Note: it's important to match the tid between
// the two stages, because visibility across devices is only guaranteed
// between threads that have the same tid. If thread i computes the sum of
// start + i in the first stage, then thread i also gathers start + i from all
// ranks.
// start + i in the first stage, then thread i also gathers start + i from
// all ranks.
for (int idx = tid; idx < largest_part; idx += stride) {
#pragma unroll
for (int i = 0; i < ngpus; i++) {
......@@ -290,18 +384,18 @@ class CustomAllreduce {
bool full_nvlink_;
RankSignals sg_;
// Stores an map from a pointer to its peer pointters from all ranks.
// Stores an map from a pointer to its peer pointers from all ranks.
std::unordered_map<void*, RankData*> buffers_;
Signal* self_sg_;
// Stores rank data from all ranks. This is mainly for cuda graph purposes.
// For cuda graph to work, all kernel arguments must be fixed during graph
// capture time. However, the peer pointers are not known during graph capture
// time. Therefore, during capture, we increment the rank data pointer and use
// that as the argument to the kernel. The kernel arguments are stored in
// graph_unreg_buffers_. The actual peer pointers will be filled in at the
// memory pointed to by the pointers in graph_unreg_buffers_ when
// the IPC handles are exchanged between ranks.
// capture time. However, the peer pointers are not known during graph
// capture time. Therefore, during capture, we increment the rank data
// pointer and use that as the argument to the kernel. The kernel arguments
// are stored in graph_unreg_buffers_. The actual peer pointers will be
// filled in at the memory pointed to by the pointers in
// graph_unreg_buffers_ when the IPC handles are exchanged between ranks.
//
// The overall process looks like this:
// 1. Graph capture.
......@@ -319,8 +413,9 @@ class CustomAllreduce {
* Signals are an array of ipc-enabled buffers from all ranks.
* For each of the buffer, the layout is as follows:
* | -- sizeof(Signal) -- | ------ a few MB ----- |
* The first section is for allreduce synchronization, and the second section
* is for storing the intermediate results required by some allreduce algos.
* The first section is for allreduce synchronization, and the second
* section is for storing the intermediate results required by some
* allreduce algos.
*
* Note: this class does not own any device memory. Any required buffers
* are passed in from the constructor.
......@@ -361,8 +456,7 @@ class CustomAllreduce {
void* base_ptr;
// note: must share the base address of each allocation, or we get wrong
// address
if (cuPointerGetAttribute(&base_ptr,
CU_POINTER_ATTRIBUTE_RANGE_START_ADDR,
if (cuPointerGetAttribute(&base_ptr, rangeStartAddrAttr,
(CUdeviceptr)ptr) != CUDA_SUCCESS)
throw std::runtime_error("failed to get pointer attr");
CUDACHECK(cudaIpcGetMemHandle(
......@@ -396,11 +490,11 @@ class CustomAllreduce {
// Note: when registering graph buffers, we intentionally choose to not
// deduplicate the addresses. That means if the allocator reuses some
// addresses, they will be registered again. This is to account for the remote
// possibility of different allocation patterns between ranks. For example,
// rank 1 may get the same input address for the second allreduce, but rank 2
// got a different address. IPC handles have internal reference counting
// mechanism so overhead should be small.
// addresses, they will be registered again. This is to account for the
// remote possibility of different allocation patterns between ranks. For
// example, rank 1 may get the same input address for the second allreduce,
// but rank 2 got a different address. IPC handles have internal reference
// counting mechanism so overhead should be small.
void register_graph_buffers(
const std::vector<std::string>& handles,
const std::vector<std::vector<int64_t>>& offsets) {
......@@ -431,15 +525,15 @@ class CustomAllreduce {
/**
* Performs allreduce, assuming input has already been registered.
*
* Block and grid default configs are results after careful grid search. Using
* 36 blocks give the best or close to the best runtime on the devices I
* tried: A100, A10, A30, T4, V100. You'll notice that NCCL kernels also only
* take a small amount of SMs. Not quite sure the underlying reason, but my
* guess is that too many SMs will cause contention on NVLink bus.
* Block and grid default configs are results after careful grid search.
* Using 36 blocks give the best or close to the best runtime on the devices
* I tried: A100, A10, A30, T4, V100. You'll notice that NCCL kernels also
* only take a small amount of SMs. Not quite sure the underlying reason,
* but my guess is that too many SMs will cause contention on NVLink bus.
*/
template <typename T>
void allreduce(cudaStream_t stream, T* input, T* output, int size,
int threads = 512, int block_limit = 36) {
int threads = 512, int block_limit = defaultBlockLimit) {
auto d = packed_t<T>::P::size;
if (size % d != 0)
throw std::runtime_error(
......@@ -473,8 +567,6 @@ class CustomAllreduce {
#define KL(ngpus, name) \
name<T, ngpus><<<blocks, threads, 0, stream>>>(ptrs, sg_, self_sg_, output, \
rank_, size);
// TODO(hanzhi713): Threshold is different for A100 and H100.
// Add per device threshold.
#define REDUCE_CASE(ngpus) \
case ngpus: { \
if (world_size_ == 2) { \
......@@ -497,7 +589,8 @@ class CustomAllreduce {
REDUCE_CASE(8)
default:
throw std::runtime_error(
"custom allreduce only supports num gpus in (2,4,6,8). Actual num "
"custom allreduce only supports num gpus in (2,4,6,8). Actual "
"num "
"gpus = " +
std::to_string(world_size_));
}
......@@ -511,9 +604,10 @@ class CustomAllreduce {
}
}
};
/**
* To inspect PTX/SASS, copy paste this header file to compiler explorer and add
a template instantiation:
* To inspect PTX/SASS, copy paste this header file to compiler explorer and
add a template instantiation:
* template void vllm::CustomAllreduce::allreduce<half>(cudaStream_t, half *,
half *, int, int, int);
*/
......
/**
* This is a standalone test for custom allreduce.
* To compile, make sure you have MPI and NCCL installed in your system.
* export MPI_HOME=xxx
* export MPI_HOME=XXX
* nvcc -O2 -arch=native -std=c++17 custom_all_reduce_test.cu -o
* custom_all_reduce_test -lnccl -I${MPI_HOME} -lmpi
* custom_all_reduce_test -lnccl -I${MPI_HOME}/include -lmpi
*
* Warning: this C++ test is not designed to be very readable and was used
* during the rapid prototyping process.
......@@ -11,319 +11,336 @@
* To run:
* mpirun --allow-run-as-root -np 8 ./custom_all_reduce_test
*/
#include <cuda.h>
#include <curand_kernel.h>
#include <stdio.h>
#include <stdlib.h>
#include <limits>
#include <vector>
#include "cuda_profiler_api.h"
#include "custom_all_reduce.cuh"
#include "mpi.h"
#include "nccl.h"
#define MPICHECK(cmd) \
do { \
int e = cmd; \
if (e != MPI_SUCCESS) { \
printf("Failed: MPI error %s:%d '%d'\n", __FILE__, __LINE__, e); \
exit(EXIT_FAILURE); \
} \
} while (0)
#define NCCLCHECK(cmd) \
do { \
ncclResult_t r = cmd; \
if (r != ncclSuccess) { \
printf("Failed, NCCL error %s:%d '%s'\n", __FILE__, __LINE__, \
ncclGetErrorString(r)); \
exit(EXIT_FAILURE); \
} \
} while (0)
__global__ void dummy_kernel() {
#if defined(__CUDA_ARCH__) && __CUDA_ARCH__ >= 700
for (int i = 0; i < 100; i++) __nanosleep(1000000); // 100ms
#else
for (int i = 0; i < 100; i++) {
long long int start = clock64();
while (clock64() - start < 150000000); // approximately 98.4ms on P40
}
#endif
}
template <typename T>
__global__ void set_data(T* data, int size, int myRank) {
for (int idx = blockIdx.x * blockDim.x + threadIdx.x; idx < size;
idx += gridDim.x * blockDim.x) {
data[idx] = myRank * 0.11f;
}
}
template <typename T>
__global__ void convert_data(const T* data1, const T* data2, double* fdata1,
double* fdata2, int size) {
for (int idx = blockIdx.x * blockDim.x + threadIdx.x; idx < size;
idx += gridDim.x * blockDim.x) {
fdata1[idx] = data1[idx];
fdata2[idx] = data2[idx];
}
}
__global__ void init_rand(curandState_t* state, int size, int nRanks) {
for (int idx = blockIdx.x * blockDim.x + threadIdx.x; idx < size;
idx += gridDim.x * blockDim.x) {
for (int i = 0; i < nRanks; i++) {
curand_init(i + 1, idx, 0, &state[idx * nRanks + i]);
}
}
}
template <typename T>
__global__ void gen_data(curandState_t* state, T* data, double* ground_truth,
int myRank, int nRanks, int size) {
for (int idx = blockIdx.x * blockDim.x + threadIdx.x; idx < size;
idx += gridDim.x * blockDim.x) {
double sum = 0.0;
for (int i = 0; i < nRanks; i++) {
double val = curand_uniform_double(&state[idx * nRanks + i]) * 4;
T hval = val; // downcast first
sum += static_cast<double>(hval);
if (i == myRank) data[idx] = hval;
}
ground_truth[idx] = sum;
}
}
template <typename T>
void run(int myRank, int nRanks, ncclComm_t& comm, int threads, int block_limit,
int data_size, bool performance_test) {
T* result;
cudaStream_t stream;
CUDACHECK(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking));
CUDACHECK(cudaMalloc(&result, data_size * sizeof(T)));
CUDACHECK(cudaMemset(result, 0, data_size * sizeof(T)));
cudaIpcMemHandle_t self_data_handle;
cudaIpcMemHandle_t data_handles[8];
vllm::Signal* buffer;
T* self_data_copy;
/**
* Allocate IPC buffer
*
* The first section is a temporary buffer for storing intermediate allreduce
* results, if a particular algorithm requires it. The second section is for
* the input to the allreduce. The actual API takes the input pointer as an
* argument (that is, they can and usually should be allocated separately).
* But since the input pointers and the temporary buffer all require IPC
* registration, they are allocated and registered together in the test for
* convenience.
*/
CUDACHECK(
cudaMalloc(&buffer, 2 * data_size * sizeof(T) + sizeof(vllm::Signal)));
CUDACHECK(
cudaMemset(buffer, 0, 2 * data_size * sizeof(T) + sizeof(vllm::Signal)));
CUDACHECK(cudaMalloc(&self_data_copy, data_size * sizeof(T)));
CUDACHECK(cudaIpcGetMemHandle(&self_data_handle, buffer));
MPICHECK(MPI_Allgather(&self_data_handle, sizeof(cudaIpcMemHandle_t),
MPI_BYTE, data_handles, sizeof(cudaIpcMemHandle_t),
MPI_BYTE, MPI_COMM_WORLD));
void* rank_data;
size_t rank_data_sz = 16 * 1024 * 1024;
CUDACHECK(cudaMalloc(&rank_data, rank_data_sz));
vllm::Signal* ipc_ptrs[8];
for (int i = 0; i < nRanks; i++) {
if (i == myRank)
ipc_ptrs[i] = buffer;
else
CUDACHECK(cudaIpcOpenMemHandle((void**)&ipc_ptrs[i], data_handles[i],
cudaIpcMemLazyEnablePeerAccess));
}
vllm::CustomAllreduce fa(ipc_ptrs, rank_data, rank_data_sz, myRank, nRanks);
auto* self_data =
reinterpret_cast<T*>(reinterpret_cast<char*>(buffer) +
sizeof(vllm::Signal) + data_size * sizeof(T));
// hack buffer registration
{
void* data[8];
for (int i = 0; i < nRanks; i++) {
data[i] =
((char*)ipc_ptrs[i]) + sizeof(vllm::Signal) + data_size * sizeof(T);
}
fa.register_buffer(data);
}
double* ground_truth;
CUDACHECK(cudaMallocHost(&ground_truth, data_size * sizeof(double)));
curandState_t* states;
CUDACHECK(cudaMalloc(&states, sizeof(curandState_t) * nRanks * data_size));
init_rand<<<108, 1024, 0, stream>>>(states, data_size, nRanks);
gen_data<T><<<108, 1024, 0, stream>>>(states, self_data, ground_truth, myRank,
nRanks, data_size);
CUDACHECK(cudaMemcpyAsync(self_data_copy, self_data, data_size * sizeof(T),
cudaMemcpyDeviceToDevice, stream));
cudaEvent_t start, stop;
CUDACHECK(cudaEventCreate(&start));
CUDACHECK(cudaEventCreate(&stop));
ncclDataType_t ncclDtype;
if (std::is_same<T, half>::value) {
ncclDtype = ncclFloat16;
} else if (std::is_same<T, nv_bfloat16>::value) {
ncclDtype = ncclBfloat16;
} else {
ncclDtype = ncclFloat;
}
double *nccl_result, *my_result;
CUDACHECK(cudaMallocHost(&nccl_result, data_size * sizeof(double)));
CUDACHECK(cudaMallocHost(&my_result, data_size * sizeof(double)));
if (performance_test) {
dummy_kernel<<<1, 1, 0, stream>>>();
constexpr int warmup_iters = 5;
constexpr int num_iters = 100;
// warmup
for (int i = 0; i < warmup_iters; i++) {
NCCLCHECK(ncclAllReduce(result, result, data_size, ncclDtype, ncclSum,
comm, stream));
}
CUDACHECK(cudaEventRecord(start, stream));
for (int i = 0; i < num_iters; i++) {
NCCLCHECK(ncclAllReduce(result, result, data_size, ncclDtype, ncclSum,
comm, stream));
}
CUDACHECK(cudaEventRecord(stop, stream));
CUDACHECK(cudaStreamSynchronize(stream));
float allreduce_ms = 0;
cudaEventElapsedTime(&allreduce_ms, start, stop);
dummy_kernel<<<1, 1, 0, stream>>>();
// warm up
for (int i = 0; i < warmup_iters; i++) {
fa.allreduce<T>(stream, self_data, result, data_size, threads,
block_limit);
}
CUDACHECK(cudaEventRecord(start, stream));
for (int i = 0; i < num_iters; i++) {
fa.allreduce<T>(stream, self_data, result, data_size, threads,
block_limit);
}
CUDACHECK(cudaEventRecord(stop, stream));
CUDACHECK(cudaStreamSynchronize(stream));
float duration_ms = 0;
cudaEventElapsedTime(&duration_ms, start, stop);
if (myRank == 0)
printf(
"Rank %d done, nGPUs:%d, sz (kb): %d, %d, %d, my time:%.2fus, nccl "
"time:%.2fus\n",
myRank, nRanks, data_size * sizeof(T) / 1024, threads, block_limit,
duration_ms * 1e3 / num_iters, allreduce_ms * 1e3 / num_iters);
// And wait for all the queued up work to complete
CUDACHECK(cudaStreamSynchronize(stream));
NCCLCHECK(ncclAllReduce(self_data_copy, self_data, data_size, ncclDtype,
ncclSum, comm, stream));
convert_data<T><<<108, 1024, 0, stream>>>(self_data, result, nccl_result,
my_result, data_size);
CUDACHECK(cudaStreamSynchronize(stream));
for (unsigned long j = 0; j < data_size; j++) {
auto diff = abs(nccl_result[j] - my_result[j]);
if (diff >= 4e-2) {
printf("Rank %d: Verification mismatch at %lld: %f != (my) %f, gt=%f\n",
#include <cuda.h>
#include <curand_kernel.h>
#include <stdio.h>
#include <stdlib.h>
#include <limits>
#include <vector>
#include "cuda_profiler_api.h"
#include "mpi.h"
#ifdef USE_ROCM
#include <hip/hip_bf16.h>
typedef __hip_bfloat16 nv_bfloat16;
#include "rccl/rccl.h"
#include "custom_all_reduce_hip.cuh"
#else
#include "nccl.h"
#include "custom_all_reduce.cuh"
#endif
#define MPICHECK(cmd) \
do { \
int e = cmd; \
if (e != MPI_SUCCESS) { \
printf("Failed: MPI error %s:%d '%d'\n", __FILE__, __LINE__, e); \
exit(EXIT_FAILURE); \
} \
} while (0)
#define NCCLCHECK(cmd) \
do { \
ncclResult_t r = cmd; \
if (r != ncclSuccess) { \
printf("Failed, NCCL error %s:%d '%s'\n", __FILE__, __LINE__, \
ncclGetErrorString(r)); \
exit(EXIT_FAILURE); \
} \
} while (0)
__global__ void dummy_kernel() {
#ifdef USE_ROCM
for (int i = 0; i < 100; i++) {
uint64_t start = wall_clock64();
uint64_t cycles_elapsed;
do {
cycles_elapsed = wall_clock64() - start;
} while (cycles_elapsed < 100);
}
#else
for (int i = 0; i < 100; i++) __nanosleep(1000000); // 100ms
#endif
}
template <typename T>
__global__ void set_data(T* data, int size, int myRank) {
for (int idx = blockIdx.x * blockDim.x + threadIdx.x; idx < size;
idx += gridDim.x * blockDim.x) {
data[idx] = myRank * 0.11f;
}
}
template <typename T>
__global__ void convert_data(const T* data1, const T* data2, double* fdata1,
double* fdata2, int size) {
for (int idx = blockIdx.x * blockDim.x + threadIdx.x; idx < size;
idx += gridDim.x * blockDim.x) {
fdata1[idx] = data1[idx];
fdata2[idx] = data2[idx];
}
}
__global__ void init_rand(curandState_t* state, int size, int nRanks) {
for (int idx = blockIdx.x * blockDim.x + threadIdx.x; idx < size;
idx += gridDim.x * blockDim.x) {
for (int i = 0; i < nRanks; i++) {
curand_init(i + 1, idx, 0, &state[idx * nRanks + i]);
}
}
}
template <typename T>
__global__ void gen_data(curandState_t* state, T* data, double* ground_truth,
int myRank, int nRanks, int size) {
for (int idx = blockIdx.x * blockDim.x + threadIdx.x; idx < size;
idx += gridDim.x * blockDim.x) {
double sum = 0.0;
for (int i = 0; i < nRanks; i++) {
double val = curand_uniform_double(&state[idx * nRanks + i]) * 4;
T hval = val; // downcast first
sum += static_cast<double>(hval);
if (i == myRank) data[idx] = hval;
}
ground_truth[idx] = sum;
}
}
template <typename T>
void run(int myRank, int nRanks, ncclComm_t& comm, int threads, int block_limit,
int data_size, bool performance_test) {
T* result;
cudaStream_t stream;
CUDACHECK(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking));
CUDACHECK(cudaMalloc(&result, data_size * sizeof(T)));
CUDACHECK(cudaMemset(result, 0, data_size * sizeof(T)));
cudaIpcMemHandle_t self_data_handle;
cudaIpcMemHandle_t data_handles[8];
vllm::Signal* buffer;
T* self_data_copy;
/**
* Allocate IPC buffer
*
* The first section is a temporary buffer for storing intermediate allreduce
* results, if a particular algorithm requires it. The second section is for
* the input to the allreduce. The actual API takes the input pointer as an
* argument (that is, they can and usually should be allocated separately).
* But since the input pointers and the temporary buffer all require IPC
* registration, they are allocated and registered together in the test for
* convenience.
*/
#ifdef USE_ROCM
CUDACHECK(hipExtMallocWithFlags(
(void**)&buffer, 2 * data_size * sizeof(T) + sizeof(vllm::Signal),
hipDeviceMallocUncached));
#else
CUDACHECK(
cudaMalloc(&buffer, 2 * data_size * sizeof(T) + sizeof(vllm::Signal)));
#endif
CUDACHECK(
cudaMemset(buffer, 0, 2 * data_size * sizeof(T) + sizeof(vllm::Signal)));
CUDACHECK(cudaMalloc(&self_data_copy, data_size * sizeof(T)));
CUDACHECK(cudaIpcGetMemHandle(&self_data_handle, buffer));
MPICHECK(MPI_Allgather(&self_data_handle, sizeof(cudaIpcMemHandle_t),
MPI_BYTE, data_handles, sizeof(cudaIpcMemHandle_t),
MPI_BYTE, MPI_COMM_WORLD));
void* rank_data;
size_t rank_data_sz = 16 * 1024 * 1024;
CUDACHECK(cudaMalloc(&rank_data, rank_data_sz));
std::vector<int64_t> offsets(nRanks, 0);
vllm::CustomAllreduce fa(buffer, rank_data, rank_data_sz, data_handles,
offsets, myRank);
auto* self_data =
reinterpret_cast<T*>(reinterpret_cast<char*>(buffer) +
sizeof(vllm::Signal) + data_size * sizeof(T));
// hack buffer registration
{
std::vector<std::string> handles;
handles.reserve(nRanks);
for (int i = 0; i < nRanks; i++) {
char* begin = (char*)&data_handles[i];
char* end = (char*)&data_handles[i + 1];
handles.emplace_back(begin, end);
}
std::vector<int64_t> offsets(nRanks,
sizeof(vllm::Signal) + data_size * sizeof(T));
fa.register_buffer(handles, offsets, self_data);
}
double* ground_truth;
CUDACHECK(cudaMallocHost(&ground_truth, data_size * sizeof(double)));
curandState_t* states;
CUDACHECK(cudaMalloc(&states, sizeof(curandState_t) * nRanks * data_size));
init_rand<<<108, 1024, 0, stream>>>(states, data_size, nRanks);
gen_data<T><<<108, 1024, 0, stream>>>(states, self_data, ground_truth, myRank,
nRanks, data_size);
CUDACHECK(cudaMemcpyAsync(self_data_copy, self_data, data_size * sizeof(T),
cudaMemcpyDeviceToDevice, stream));
cudaEvent_t start, stop;
CUDACHECK(cudaEventCreate(&start));
CUDACHECK(cudaEventCreate(&stop));
ncclDataType_t ncclDtype;
if (std::is_same<T, half>::value) {
ncclDtype = ncclFloat16;
} else if (std::is_same<T, nv_bfloat16>::value) {
ncclDtype = ncclBfloat16;
} else {
ncclDtype = ncclFloat;
}
double *nccl_result, *my_result;
CUDACHECK(cudaMallocHost(&nccl_result, data_size * sizeof(double)));
CUDACHECK(cudaMallocHost(&my_result, data_size * sizeof(double)));
if (performance_test) {
dummy_kernel<<<1, 1, 0, stream>>>();
constexpr int warmup_iters = 5;
constexpr int num_iters = 100;
// warmup
for (int i = 0; i < warmup_iters; i++) {
NCCLCHECK(ncclAllReduce(result, result, data_size, ncclDtype, ncclSum,
comm, stream));
}
CUDACHECK(cudaEventRecord(start, stream));
for (int i = 0; i < num_iters; i++) {
NCCLCHECK(ncclAllReduce(result, result, data_size, ncclDtype, ncclSum,
comm, stream));
}
CUDACHECK(cudaEventRecord(stop, stream));
CUDACHECK(cudaStreamSynchronize(stream));
float allreduce_ms = 0;
cudaEventElapsedTime(&allreduce_ms, start, stop);
dummy_kernel<<<1, 1, 0, stream>>>();
// warm up
for (int i = 0; i < warmup_iters; i++) {
fa.allreduce<T>(stream, self_data, result, data_size, threads,
block_limit);
}
CUDACHECK(cudaEventRecord(start, stream));
for (int i = 0; i < num_iters; i++) {
fa.allreduce<T>(stream, self_data, result, data_size, threads,
block_limit);
}
CUDACHECK(cudaEventRecord(stop, stream));
CUDACHECK(cudaStreamSynchronize(stream));
float duration_ms = 0;
cudaEventElapsedTime(&duration_ms, start, stop);
if (myRank == 0)
printf(
"Rank %d done, nGPUs:%d, sz (kb): %d, %d, %d, my time:%.2fus, nccl "
"time:%.2fus\n",
myRank, nRanks, data_size * sizeof(T) / 1024, threads, block_limit,
duration_ms * 1e3 / num_iters, allreduce_ms * 1e3 / num_iters);
// And wait for all the queued up work to complete
CUDACHECK(cudaStreamSynchronize(stream));
NCCLCHECK(ncclAllReduce(self_data_copy, self_data, data_size, ncclDtype,
ncclSum, comm, stream));
convert_data<T><<<108, 1024, 0, stream>>>(self_data, result, nccl_result,
my_result, data_size);
CUDACHECK(cudaStreamSynchronize(stream));
for (unsigned long j = 0; j < data_size; j++) {
auto diff = abs(nccl_result[j] - my_result[j]);
if (diff >= 4e-2) {
printf("Rank %d: Verification mismatch at %lld: %f != (my) %f, gt=%f\n",
myRank, j, nccl_result[j], my_result[j], ground_truth[j]);
break;
}
}
long double nccl_diffs = 0.0;
long double my_diffs = 0.0;
for (int j = 0; j < data_size; j++) {
nccl_diffs += abs(nccl_result[j] - ground_truth[j]);
my_diffs += abs(my_result[j] - ground_truth[j]);
}
if (myRank == 0)
std::cout << "average abs diffs: nccl: " << nccl_diffs / data_size
<< " me: " << my_diffs / data_size << std::endl;
} else {
for (int i = 0; i < 100; i++) {
fa.allreduce<T>(stream, self_data, result, data_size, threads,
block_limit);
CUDACHECK(cudaStreamSynchronize(stream));
NCCLCHECK(ncclAllReduce(self_data, self_data_copy, data_size, ncclDtype,
ncclSum, comm, stream));
convert_data<T><<<108, 1024, 0, stream>>>(
self_data_copy, result, nccl_result, my_result, data_size);
CUDACHECK(cudaStreamSynchronize(stream));
for (unsigned long j = 0; j < data_size; j++) {
auto diff = abs(nccl_result[j] - my_result[j]);
if (diff >= 4e-2) {
printf(
"Rank %d: Verification mismatch at %lld: %f != (my) %f, gt=%f\n",
myRank, j, nccl_result[j], my_result[j], ground_truth[j]);
break;
}
}
long double nccl_diffs = 0.0;
long double my_diffs = 0.0;
for (int j = 0; j < data_size; j++) {
nccl_diffs += abs(nccl_result[j] - ground_truth[j]);
my_diffs += abs(my_result[j] - ground_truth[j]);
}
if (myRank == 0)
std::cout << "average abs diffs: nccl: " << nccl_diffs / data_size
<< " me: " << my_diffs / data_size << std::endl;
} else {
for (int i = 0; i < 100; i++) {
fa.allreduce<T>(stream, self_data, result, data_size, threads,
block_limit);
CUDACHECK(cudaStreamSynchronize(stream));
NCCLCHECK(ncclAllReduce(self_data, self_data_copy, data_size, ncclDtype,
ncclSum, comm, stream));
convert_data<T><<<108, 1024, 0, stream>>>(
self_data_copy, result, nccl_result, my_result, data_size);
CUDACHECK(cudaStreamSynchronize(stream));
for (unsigned long j = 0; j < data_size; j++) {
auto diff = abs(nccl_result[j] - my_result[j]);
if (diff >= 4e-2) {
printf(
"Rank %d: Verification mismatch at %lld: %f != (my) %f, gt=%f\n",
myRank, j, nccl_result[j], my_result[j], ground_truth[j]);
break;
}
}
}
if (myRank == 0)
printf("Test passed: nGPUs:%d, sz (kb): %d, %d, %d\n", nRanks,
data_size * sizeof(T) / 1024, threads, block_limit);
// long double nccl_diffs = 0.0;
// long double my_diffs = 0.0;
// for (int j = 0; j < data_size; j++) {
// nccl_diffs += abs(nccl_result[j] - ground_truth[j]);
// my_diffs += abs(my_result[j] - ground_truth[j]);
// }
// if (myRank == 0)
// std::cout << "average abs diffs: nccl: " << nccl_diffs / data_size
// << " me: " << my_diffs / data_size << std::endl;
}
CUDACHECK(cudaFree(result));
CUDACHECK(cudaFree(self_data_copy));
CUDACHECK(cudaFree(rank_data));
CUDACHECK(cudaFree(buffer));
CUDACHECK(cudaFree(states));
CUDACHECK(cudaFreeHost(ground_truth));
CUDACHECK(cudaFreeHost(nccl_result));
CUDACHECK(cudaFreeHost(my_result));
CUDACHECK(cudaStreamDestroy(stream));
}
int main(int argc, char** argv) {
int nRanks, myRank;
MPICHECK(MPI_Init(&argc, &argv));
MPICHECK(MPI_Comm_rank(MPI_COMM_WORLD, &myRank));
MPICHECK(MPI_Comm_size(MPI_COMM_WORLD, &nRanks));
CUDACHECK(cudaSetDevice(myRank));
ncclUniqueId id;
ncclComm_t comm;
if (myRank == 0) ncclGetUniqueId(&id);
MPICHECK(MPI_Bcast(static_cast<void*>(&id), sizeof(id), MPI_BYTE, 0,
MPI_COMM_WORLD));
NCCLCHECK(ncclCommInitRank(&comm, nRanks, id, myRank));
bool performance_test = true;
cudaProfilerStart();
// Uncomment to scan through different block size configs.
// for (int threads : {256, 512, 1024}) {
// for (int block_limit = 16; block_limit < 112; block_limit += 4) {
// run<half>(myRank, nRanks, comm, threads, block_limit, 1024 * 1024,
// performance_test);
// }
// }
// Scan through different sizes to test performance.
for (int sz = 512; sz <= (8 << 20); sz *= 2) {
run<half>(myRank, nRanks, comm, 512, 36, sz + 8 * 47, performance_test);
}
cudaProfilerStop();
MPICHECK(MPI_Finalize());
return EXIT_SUCCESS;
}
break;
}
}
}
if (myRank == 0)
printf("Test passed: nGPUs:%d, sz (kb): %d, %d, %d\n", nRanks,
data_size * sizeof(T) / 1024, threads, block_limit);
// long double nccl_diffs = 0.0;
// long double my_diffs = 0.0;
// for (int j = 0; j < data_size; j++) {
// nccl_diffs += abs(nccl_result[j] - ground_truth[j]);
// my_diffs += abs(my_result[j] - ground_truth[j]);
// }
// if (myRank == 0)
// std::cout << "average abs diffs: nccl: " << nccl_diffs / data_size
// << " me: " << my_diffs / data_size << std::endl;
}
CUDACHECK(cudaFree(result));
CUDACHECK(cudaFree(self_data_copy));
CUDACHECK(cudaFree(rank_data));
CUDACHECK(cudaFree(buffer));
CUDACHECK(cudaFree(states));
CUDACHECK(cudaFreeHost(ground_truth));
CUDACHECK(cudaFreeHost(nccl_result));
CUDACHECK(cudaFreeHost(my_result));
CUDACHECK(cudaStreamDestroy(stream));
}
int main(int argc, char** argv) {
int nRanks, myRank;
MPICHECK(MPI_Init(&argc, &argv));
MPICHECK(MPI_Comm_rank(MPI_COMM_WORLD, &myRank));
MPICHECK(MPI_Comm_size(MPI_COMM_WORLD, &nRanks));
CUDACHECK(cudaSetDevice(myRank));
ncclUniqueId id;
ncclComm_t comm;
if (myRank == 0) ncclGetUniqueId(&id);
MPICHECK(MPI_Bcast(static_cast<void*>(&id), sizeof(id), MPI_BYTE, 0,
MPI_COMM_WORLD));
NCCLCHECK(ncclCommInitRank(&comm, nRanks, id, myRank));
bool performance_test = true;
cudaProfilerStart();
// for (int threads : {256, 512}) {
// for (int block_limit = 16; block_limit < 112; block_limit += 4) {
// run<half>(myRank, nRanks, comm, threads, block_limit, 4096 * 1024);
// }
// }
#ifdef USE_ROCM
for (int sz = 512; sz <= (8 << 20); sz *= 2) {
run<half>(myRank, nRanks, comm, 512, 16, sz + 8 * 47, performance_test);
}
#else
for (int sz = 512; sz <= (8 << 20); sz *= 2) {
run<half>(myRank, nRanks, comm, 512, 36, sz + 8 * 47, performance_test);
}
#endif
cudaProfilerStop();
MPICHECK(MPI_Finalize());
return EXIT_SUCCESS;
}
\ No newline at end of file
......@@ -374,7 +374,7 @@ void causal_conv1d_fwd(const at::Tensor& x, const at::Tensor& weight,
const std::optional<at::Tensor>& has_initial_state,
bool silu_activation, int64_t pad_slot_id);
#ifndef USE_ROCM
using fptr_t = int64_t;
fptr_t init_custom_ar(const std::vector<int64_t>& fake_ipc_ptrs,
torch::Tensor& rank_data, int64_t rank, bool full_nvlink);
......@@ -388,4 +388,8 @@ get_graph_buffer_ipc_meta(fptr_t _fa);
void register_graph_buffers(fptr_t _fa,
const std::vector<std::vector<int64_t>>& handles,
const std::vector<std::vector<int64_t>>& offsets);
#endif
std::tuple<int64_t, torch::Tensor> allocate_shared_buffer_and_handle(
int64_t size);
int64_t open_mem_handle(torch::Tensor& mem_handle);
void free_shared_buffer(int64_t buffer);
\ No newline at end of file
......@@ -710,7 +710,7 @@ TORCH_LIBRARY_EXPAND(CONCAT(TORCH_EXTENSION_NAME, _cuda_utils), cuda_utils) {
&get_max_shared_memory_per_block_device_attribute);
}
#ifndef USE_ROCM
TORCH_LIBRARY_EXPAND(CONCAT(TORCH_EXTENSION_NAME, _custom_ar), custom_ar) {
// Custom all-reduce kernels
custom_ar.def(
......@@ -728,7 +728,12 @@ TORCH_LIBRARY_EXPAND(CONCAT(TORCH_EXTENSION_NAME, _custom_ar), custom_ar) {
custom_ar.def("register_buffer", &register_buffer);
custom_ar.def("get_graph_buffer_ipc_meta", &get_graph_buffer_ipc_meta);
custom_ar.def("register_graph_buffers", &register_graph_buffers);
custom_ar.def("allocate_shared_buffer_and_handle",
&allocate_shared_buffer_and_handle);
custom_ar.def("open_mem_handle(Tensor mem_handle) -> int", &open_mem_handle);
custom_ar.impl("open_mem_handle", torch::kCPU, &open_mem_handle);
custom_ar.def("free_shared_buffer", &free_shared_buffer);
}
#endif
REGISTER_EXTENSION(TORCH_EXTENSION_NAME)
......@@ -93,7 +93,8 @@ def eager_allreduce(tp_size, pp_size, rank, distributed_init_port):
# communicate independently
num_communication = rank // tp_size + 1
sz = 1024
fa = get_tp_group().ca_comm
# fa = get_tp_group().ca_comm xiabo
fa = get_tp_group().device_communicator.ca_comm
inp = torch.ones(sz, dtype=torch.float32, device=device)
out = inp
for _ in range(num_communication):
......
......@@ -574,7 +574,17 @@ def multi_process_parallel(
# as compared to multiprocessing.
# NOTE: We need to set working_dir for distributed tests,
# otherwise we may get import errors on ray workers
ray.init(num_gpus=tp_size, runtime_env={"working_dir": VLLM_PATH})
# ray.init(num_gpus=tp_size, runtime_env={"working_dir": VLLM_PATH}) xiabo
# NOTE: Force ray not to use gitignore file as excluding, otherwise
# it will not move .so files to working dir.
# So we have to manually add some of large directories
os.environ["RAY_RUNTIME_ENV_IGNORE_GITIGNORE"] = "1"
ray.init(
runtime_env={
"working_dir": VLLM_PATH,
"excludes":
["build", ".git", "cmake-build-*", "shellcheck", "dist"]
})
distributed_init_port = get_open_port()
refs = []
......
......@@ -1527,6 +1527,18 @@ def register_graph_buffers(fa: int, handles: List[List[int]],
torch.ops._C_custom_ar.register_graph_buffers(fa, handles, offsets)
def allocate_shared_buffer_and_handle(size: int) -> tuple[int, torch.Tensor]:
return torch.ops._C_custom_ar.allocate_shared_buffer_and_handle(size)
def open_mem_handle(mem_handle: torch.Tensor):
return torch.ops._C_custom_ar.open_mem_handle(mem_handle)
def free_shared_buffer(ptr: int) -> None:
torch.ops._C_custom_ar.free_shared_buffer(ptr)
def read_cache(
keys: torch.Tensor,
values: torch.Tensor,
......
......@@ -1411,11 +1411,12 @@ class ParallelConfig:
if self.use_ray:
from vllm.executor import ray_utils
ray_utils.assert_ray_available()
if current_platform.is_rocm():
self.disable_custom_all_reduce = True
logger.info(
"Disabled the custom all-reduce kernel because it is not "
"supported on hcus.")
# xiabo
# if current_platform.is_rocm():
# self.disable_custom_all_reduce = True
# logger.info(
# "Disabled the custom all-reduce kernel because it is not "
# "supported on hcus.")
if self.ray_workers_use_nsight and not self.use_ray:
raise ValueError("Unable to use nsight profiling unless workers "
"run with Ray.")
......
......@@ -10,7 +10,7 @@ from torch.distributed import ProcessGroup
import vllm.envs as envs
from vllm import _custom_ops as ops
from vllm.distributed.device_communicators.cuda_wrapper import CudaRTLibrary
# from vllm.distributed.device_communicators.cuda_wrapper import CudaRTLibrary
from vllm.distributed.device_communicators.custom_all_reduce_utils import (
gpu_p2p_access_check)
from vllm.distributed.parallel_state import in_the_same_node_as
......@@ -56,7 +56,7 @@ class CustomAllreduce:
def __init__(self,
group: ProcessGroup,
device: Union[int, str, torch.device],
max_size=8192 * 1024) -> None:
max_size=8192 * 1024 * 2) -> None:
"""
Args:
group: the process group to work on. If None, it will use the
......@@ -73,6 +73,8 @@ class CustomAllreduce:
if not custom_ar:
# disable because of missing custom allreduce library
# e.g. in a non-cuda environment
logger.warning("Custom allreduce is disabled because "
"of missing custom allreduce library")
return
self.group = group
......@@ -88,6 +90,7 @@ class CustomAllreduce:
return
rank = dist.get_rank(group=self.group)
self.rank = rank
world_size = dist.get_world_size(group=self.group)
if world_size == 1:
# No need to initialize custom allreduce for single GPU case.
......@@ -129,10 +132,10 @@ class CustomAllreduce:
# test nvlink first, this will filter out most of the cases
# where custom allreduce is not supported
# this checks hardware and driver support for NVLink
assert current_platform.is_cuda()
from vllm.platforms.cuda import CudaPlatform
cuda_platform: CudaPlatform = current_platform
full_nvlink = cuda_platform.is_full_nvlink(physical_device_ids)
assert current_platform.is_cuda_alike()
full_nvlink = current_platform.is_fully_connected_nvlink_or_xgmi(
physical_device_ids)
if world_size > 2 and not full_nvlink:
logger.warning(
"Custom allreduce is disabled because it's not supported on"
......@@ -142,19 +145,22 @@ class CustomAllreduce:
# test P2P capability, this checks software/cudaruntime support
# this is expensive to compute at the first time
# then we cache the result
if not _can_p2p(rank, world_size):
logger.warning(
"Custom allreduce is disabled because your platform lacks "
"GPU P2P capability or P2P test failed. To silence this "
"warning, specify disable_custom_all_reduce=True explicitly.")
return
# xiabo
# if not _can_p2p(rank, world_size):
# logger.warning(
# "Custom allreduce is disabled because your platform lacks "
# "GPU P2P capability or P2P test failed. To silence this "
# "warning, specify disable_custom_all_reduce=True explicitly.")
# return
self.disabled = False
# Buffers memory are owned by this Python class and passed to C++.
# Meta data composes of two parts: meta data for synchronization and a
# temporary buffer for storing intermediate allreduce results.
self.meta_ptrs = self.create_shared_buffer(ops.meta_size() + max_size,
group=group)
group=group,
uncached=True)
# group=group) xiabo
# This is a pre-registered IPC buffer. In eager mode, input tensors
# are first copied into this buffer before allreduce is performed
self.buffer_ptrs = self.create_shared_buffer(max_size, group=group)
......@@ -174,38 +180,38 @@ class CustomAllreduce:
self.full_nvlink)
ops.register_buffer(self._ptr, self.buffer_ptrs)
@staticmethod
def create_shared_buffer(
size_in_bytes: int,
group: Optional[ProcessGroup] = None) -> List[int]:
"""
Creates a shared buffer and returns a list of pointers
representing the buffer on all processes in the group.
"""
lib = CudaRTLibrary()
pointer = lib.cudaMalloc(size_in_bytes)
handle = lib.cudaIpcGetMemHandle(pointer)
world_size = dist.get_world_size(group=group)
rank = dist.get_rank(group=group)
handles = [None] * world_size
dist.all_gather_object(handles, handle, group=group)
pointers: List[int] = []
for i, h in enumerate(handles):
if i == rank:
pointers.append(pointer.value) # type: ignore
else:
pointers.append(
lib.cudaIpcOpenMemHandle(h).value) # type: ignore
return pointers
@staticmethod
def free_shared_buffer(pointers: List[int],
group: Optional[ProcessGroup] = None) -> None:
rank = dist.get_rank(group=group)
lib = CudaRTLibrary()
lib.cudaFree(ctypes.c_void_p(pointers[rank]))
# @staticmethod
# def create_shared_buffer(
# size_in_bytes: int,
# group: Optional[ProcessGroup] = None) -> List[int]:
# """
# Creates a shared buffer and returns a list of pointers
# representing the buffer on all processes in the group.
# """
# lib = CudaRTLibrary()
# pointer = lib.cudaMalloc(size_in_bytes)
# handle = lib.cudaIpcGetMemHandle(pointer)
# world_size = dist.get_world_size(group=group)
# rank = dist.get_rank(group=group)
# handles = [None] * world_size
# dist.all_gather_object(handles, handle, group=group)
# pointers: List[int] = []
# for i, h in enumerate(handles):
# if i == rank:
# pointers.append(pointer.value) # type: ignore
# else:
# pointers.append(
# lib.cudaIpcOpenMemHandle(h).value) # type: ignore
# return pointers
# @staticmethod
# def free_shared_buffer(pointers: List[int],
# group: Optional[ProcessGroup] = None) -> None:
# rank = dist.get_rank(group=group)
# lib = CudaRTLibrary()
# lib.cudaFree(ctypes.c_void_p(pointers[rank]))
@contextmanager
def capture(self):
......@@ -284,7 +290,7 @@ class CustomAllreduce:
return None
if self._IS_CAPTURING:
if torch.cuda.is_current_stream_capturing():
return self.all_reduce(input, registered=True)
return self.all_reduce(input, registered=False)
else:
# If warm up, mimic the allocation pattern since custom
# allreduce is out-of-place.
......@@ -299,8 +305,35 @@ class CustomAllreduce:
if not self.disabled and self._ptr:
ops.dispose(self._ptr)
self._ptr = 0
self.free_shared_buffer(self.meta_ptrs)
self.free_shared_buffer(self.buffer_ptrs)
self.free_shared_buffer(self.meta_ptrs, rank=self.rank)
self.free_shared_buffer(self.buffer_ptrs, rank=self.rank)
def __del__(self):
self.close()
@staticmethod
def create_shared_buffer(size_in_bytes: int,
group: Optional[ProcessGroup] = None,
uncached: Optional[bool] = False) -> List[int]:
pointer, handle = ops.allocate_shared_buffer_and_handle(size_in_bytes)
world_size = dist.get_world_size(group=group)
rank = dist.get_rank(group=group)
handles = [None] * world_size
dist.all_gather_object(handles, handle, group=group)
pointers: List[int] = []
for i, h in enumerate(handles):
if i == rank:
pointers.append(pointer) # type: ignore
else:
pointers.append(ops.open_mem_handle(h))
return pointers
@staticmethod
def free_shared_buffer(pointers: List[int],
group: Optional[ProcessGroup] = None,
rank: Optional[int] = 0) -> None:
if rank is None:
rank = dist.get_rank(group=group)
ops.free_shared_buffer(pointers[rank])
\ No newline at end of file
......@@ -18,6 +18,10 @@ else:
logger = init_logger(__name__)
from amdsmi import (AmdSmiException, amdsmi_get_gpu_asic_info,
amdsmi_get_processor_handles, amdsmi_init,
amdsmi_shut_down, amdsmi_topo_get_link_type)
try:
import vllm._C # noqa: F401
except ImportError as e:
......@@ -104,6 +108,30 @@ class RocmPlatform(Platform):
def get_device_name(cls, device_id: int = 0) -> str:
return torch.cuda.get_device_name(device_id)
@staticmethod
def is_fully_connected_nvlink_or_xgmi(
physical_device_ids: List[int]) -> bool:
"""
Query if the set of gpus are fully connected by xgmi (1 hop)
"""
handles = [
amdsmi_get_processor_handles()[i] for i in physical_device_ids
]
for i, handle in enumerate(handles):
for j, peer_handle in enumerate(handles):
if i < j:
try:
link_type = amdsmi_topo_get_link_type(
handle, peer_handle)
# type is 2 for XGMI
if link_type["hops"] != 1 or link_type["type"] != 2:
return False
except AmdSmiException as error:
logger.error("AMD 1 hop XGMI detection failed.",
exc_info=error)
return False
return True
@classmethod
def get_device_total_memory(cls, device_id: int = 0) -> int:
device_props = torch.cuda.get_device_properties(device_id)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment