#pragma once #include "type_convert.cuh" #include "dispatch_utils.h" #include #include #include #include #include #include #include #include #include #include #include #include // #if defined(USE_ROCM) typedef __hip_bfloat16 nv_bfloat16; // #endif #include #include #include #include #include #include #ifndef USE_ROCM #include #else #include #endif namespace vllm { #define CUDACHECK(cmd) \ do { \ cudaError_t e = cmd; \ if (e != cudaSuccess) { \ printf("Failed: Cuda error %s:%d '%s'\n", __FILE__, __LINE__, \ cudaGetErrorString(e)); \ exit(EXIT_FAILURE); \ } \ } while (0) // Maximal number of blocks in allreduce kernel. constexpr int kMaxBlocks = 128; // Default number of blocks in allreduce kernel. #ifndef USE_ROCM const int defaultBlockLimit = 36; CUpointer_attribute rangeStartAddrAttr = CU_POINTER_ATTRIBUTE_RANGE_START_ADDR; #else const int defaultBlockLimit = 16; hipPointer_attribute rangeStartAddrAttr = HIP_POINTER_ATTRIBUTE_RANGE_START_ADDR; #endif // Counter may overflow, but it's fine since unsigned int overflow is // well-defined behavior. using FlagType = uint32_t; // Two sets of peer counters are needed for two syncs: starting and ending an // operation. The reason is that it's possible for peer GPU block to arrive at // the second sync point while the current GPU block haven't passed the first // sync point. Thus, peer GPU may write counter+1 while current GPU is busy // waiting for counter. We use alternating counter array to avoid this // possibility. struct Signal { alignas(128) FlagType start[kMaxBlocks][16]; alignas(128) FlagType end[kMaxBlocks][16]; alignas(128) FlagType _flag[kMaxBlocks]; // incremental flags for each rank }; struct __align__(16) RankData { const void* ptrs[16]; }; struct __align__(16) RankSignals { Signal* signals[16]; }; // like std::array, but aligned template struct __align__(alignof(T) * sz) array_t { T data[sz]; using type = T; static constexpr int size = sz; }; // use packed type to maximize memory efficiency // goal: generate ld.128 and st.128 instructions template struct packed_t { // the (P)acked type for load/store using P = array_t; // the (A)ccumulator type for reduction using A = array_t; using F = array_t; }; #define DINLINE __device__ __forceinline__ // scalar cast functions DINLINE float upcast_s(half val) { return __half2float(val); } template DINLINE T downcast_s(float val); template <> DINLINE half downcast_s(float val) { return __float2half(val); } // scalar add functions // for some reason when compiling with Pytorch, the + operator for half and // bfloat is disabled so we call the intrinsics directly DINLINE half& assign_add(half& a, half b) { a = __hadd(a, b); return a; } DINLINE float& assign_add(float& a, float b) { return a += b; } // #if (__CUDA_ARCH__ >= 800 || !defined(__CUDA_ARCH__)) DINLINE float upcast_s(nv_bfloat16 val) { return __bfloat162float(val); } template <> DINLINE nv_bfloat16 downcast_s(float val) { return __float2bfloat16(val); } DINLINE nv_bfloat16& assign_add(nv_bfloat16& a, nv_bfloat16 b) { a = __hadd(a, b); return a; } // #endif template DINLINE array_t& packed_assign_add(array_t& a, array_t b) { #pragma unroll for (int i = 0; i < N; i++) { assign_add(a.data[i], b.data[i]); } return a; } /**********************************************************/ template DINLINE P vec_add(const P& a, const P& b) { P sum_tmp; #pragma unroll for (int i = 0; i < a.size; ++i) sum_tmp.data[i] = static_cast(a.data[i]) + static_cast(b.data[i]); return sum_tmp; } template __inline__ __device__ T WarpReduceSum(T val) { #pragma unroll for (int offset = reducesize / 2; offset > 0; offset >>= 1) { val += WARP_SHFL_DOWN(val, offset); } return val; } template DINLINE T BlockReduce(T val, T* shared) { const int lid = threadIdx.x % 64; const int wid = threadIdx.x / 64; const int block_size = blockDim.x; const int shared_size = block_size / 64; val = WarpReduceSum(val); if(block_size==64) return val; if (lid == 0 && wid < shared_size) { shared[wid] = val; } __syncthreads(); val = 0.f; if (wid == 0 && lid < shared_size) { val= shared[lid]; val = WarpReduceSum(val); } return val; } template DINLINE P fused_add_rms_norm(P const& residual, P const& gamma, int hidden_dim, float eps) { static constexpr int VEC_SIZE = 16 / sizeof(T); __shared__ float s_val; float trstd; P norm_out; float acc = 0.0f; #pragma unroll for (int i = 0; i < VEC_SIZE; ++i) { float v = static_cast(residual.data[i]); acc += v * v; } __shared__ float r_sum[16]; acc = BlockReduce(acc, r_sum); if (threadIdx.x == 0) s_val = rsqrtf(acc / hidden_dim + eps); __syncthreads(); trstd = s_val; #pragma unroll for (int i = 0; i < VEC_SIZE; ++i) { norm_out.data[i] = static_cast(static_cast(residual.data[i]) * trstd * static_cast(gamma.data[i])); } return norm_out; } static inline __device__ int8_t float_to_int8_rn(float x) { #ifdef USE_ROCM static constexpr auto i8_min = static_cast(std::numeric_limits::min()); static constexpr auto i8_max = static_cast(std::numeric_limits::max()); float dst = std::nearbyint(x); dst = (dst < i8_min) ? i8_min : (dst > i8_max) ? i8_max : dst; return static_cast(dst); #else // CUDA path uint32_t dst; asm volatile("cvt.rni.sat.s8.f32 %0, %1;" : "=r"(dst) : "f"(x)); return reinterpret_cast(dst); #endif } template __inline__ __device__ T WarpReduceMax(T val) { #pragma unroll for (int offset = reducesize / 2; offset > 0; offset >>= 1) { val = fmaxf(val, WARP_SHFL_DOWN(val, offset)); } return val; } template DINLINE T BlockReduceMax_ROW(T val, T* shared) { const int lid = threadIdx.x % 64; const int wid = threadIdx.x / 64; const int block_size = blockDim.x; const int shared_size = block_size / 64; val = WarpReduceMax(val); if(block_size==64) return val; if (lid == 0 && wid < shared_size) { shared[wid] = val; } __syncthreads(); if (wid == 0 && lid(val); } return val; } template DINLINE array_t upcast(array_t val) { if constexpr (std::is_same::value) { return val; } else { array_t out; #pragma unroll for (int i = 0; i < N; i++) { out.data[i] = static_cast(val.data[i]); } return out; } } template DINLINE O downcast(array_t val) { if constexpr (std::is_same::value) { return val; } else { O out; #pragma unroll for (int i = 0; i < O::size; i++) { out.data[i] = static_cast(val.data[i]); } return out; } } #if 0 static DINLINE void st_flag_release(FlagType* flag_addr, FlagType flag) { #if defined(__CUDA_ARCH__) && __CUDA_ARCH__ >= 700 asm volatile("st.release.sys.global.u32 [%1], %0;" ::"r"(flag), "l"(flag_addr)); #else asm volatile("membar.sys; st.volatile.global.u32 [%1], %0;" ::"r"(flag), "l"(flag_addr)); #endif } static DINLINE FlagType ld_flag_acquire(FlagType* flag_addr) { FlagType flag; #if defined(__CUDA_ARCH__) && __CUDA_ARCH__ >= 700 asm volatile("ld.acquire.sys.global.u32 %0, [%1];" : "=r"(flag) : "l"(flag_addr)); #else asm volatile("ld.volatile.global.u32 %0, [%1]; membar.gl;" : "=r"(flag) : "l"(flag_addr)); #endif return flag; } static DINLINE void st_flag_volatile(FlagType* flag_addr, FlagType flag) { asm volatile("st.volatile.global.u32 [%1], %0;" ::"r"(flag), "l"(flag_addr)); } static DINLINE FlagType ld_flag_volatile(FlagType* flag_addr) { FlagType flag; asm volatile("ld.volatile.global.u32 %0, [%1];" : "=r"(flag) : "l"(flag_addr)); return flag; } // This function is meant to be used as the first synchronization in the all // reduce kernel. Thus, it doesn't need to make any visibility guarantees for // prior memory accesses. Note: volatile writes will not be reordered against // other volatile writes. template DINLINE void barrier_at_start(const RankSignals& sg, Signal* self_sg, int rank) { uint32_t flag = self_sg->_flag[blockIdx.x] + 1; if (threadIdx.x < ngpus) { auto peer_counter_ptr = &sg.signals[threadIdx.x]->start[blockIdx.x][rank]; auto self_counter_ptr = &self_sg->start[blockIdx.x][threadIdx.x]; // Write the expected counter value to peer and wait for correct value // from peer. st_flag_volatile(peer_counter_ptr, flag); while (ld_flag_volatile(self_counter_ptr) != flag); } __syncthreads(); // use one thread to update flag if (threadIdx.x == 0) self_sg->_flag[blockIdx.x] = flag; } // This function is meant to be used as the second or the final // synchronization barrier in the all reduce kernel. If it's the final // synchronization barrier, we don't need to make any visibility guarantees // for prior memory accesses. template DINLINE void barrier_at_end(const RankSignals& sg, Signal* self_sg, int rank) { __syncthreads(); uint32_t flag = self_sg->_flag[blockIdx.x] + 1; if (threadIdx.x < ngpus) { auto peer_counter_ptr = &sg.signals[threadIdx.x]->end[blockIdx.x][rank]; auto self_counter_ptr = &self_sg->end[blockIdx.x][threadIdx.x]; // Write the expected counter value to peer and wait for correct value from // peer. if constexpr (!final_sync) { st_flag_release(peer_counter_ptr, flag); while (ld_flag_acquire(self_counter_ptr) != flag); } else { st_flag_volatile(peer_counter_ptr, flag); while (ld_flag_volatile(self_counter_ptr) != flag); } } if constexpr (!final_sync) __syncthreads(); // use one thread to update flag if (threadIdx.x == 0) self_sg->_flag[blockIdx.x] = flag; } #else template DINLINE void barrier_at_start(const RankSignals& sg, Signal* self_sg, int rank) { uint32_t flag = self_sg->_flag[blockIdx.x] + 1; //当前线程块标记+1 if (threadIdx.x < ngpus) { // simultaneously write to the corresponding flag of all ranks. // Latency = 1 p2p write // __scoped_atomic_store_n(&sg.signals[threadIdx.x]->start[blockIdx.x][rank], // flag, __ATOMIC_RELAXED, __MEMORY_SCOPE_SYSTEM); // 将每个peer GPU对应线程块的本rank flag填入 __atomic_store_n(&sg.signals[threadIdx.x]->start[blockIdx.x][rank], flag, __ATOMIC_RELAXED); // wait until we got true from all ranks // while (__scoped_atomic_load_n(&self_sg->start[blockIdx.x][threadIdx.x], // __ATOMIC_RELAXED, // __MEMORY_SCOPE_DEVICE) < flag); //等待对应blockidx.x处理的数据的peer gpu到达 while (__atomic_load_n(&self_sg->start[blockIdx.x][threadIdx.x], __ATOMIC_RELAXED) < flag); } __syncthreads(); // use one thread to update flag if (threadIdx.x == 0) self_sg->_flag[blockIdx.x] = flag; } template DINLINE void barrier_at_end(const RankSignals& sg, Signal* self_sg, int rank) { __syncthreads(); uint32_t flag = self_sg->_flag[blockIdx.x] + 1; if (threadIdx.x < ngpus) { // simultaneously write to the corresponding flag of all ranks. // Latency = 1 p2p write // __scoped_atomic_store_n(&sg.signals[threadIdx.x]->end[blockIdx.x][rank], // flag, // final_sync ? __ATOMIC_RELAXED : __ATOMIC_RELEASE, // __MEMORY_SCOPE_SYSTEM); // 告诉其他GPU 本block Reduce完毕 __atomic_store_n(&sg.signals[threadIdx.x]->end[blockIdx.x][rank], flag, final_sync ? __ATOMIC_RELAXED : __ATOMIC_RELEASE); // wait until we got true from all ranks // while ( // __scoped_atomic_load_n(&self_sg->end[blockIdx.x][threadIdx.x], // final_sync ? __ATOMIC_RELAXED : __ATOMIC_ACQUIRE, // __MEMORY_SCOPE_DEVICE) < flag); // 当前block处理的 hs的其他GPU处理完毕 while (__atomic_load_n(&self_sg->end[blockIdx.x][threadIdx.x], final_sync ? __ATOMIC_RELAXED : __ATOMIC_ACQUIRE) < flag); } if constexpr (!final_sync) __syncthreads(); // use one thread to update flag if (threadIdx.x == 0) self_sg->_flag[blockIdx.x] = flag; } template DINLINE void barrier_at_end_fuse(const RankSignals& sg, Signal* self_sg, int rank) { __syncthreads(); uint32_t flag = self_sg->_flag[blockIdx.x] + 1; if (threadIdx.x < ngpus) { // simultaneously write to the corresponding flag of all ranks. // Latency = 1 p2p write // __scoped_atomic_store_n(&sg.signals[threadIdx.x]->end[blockIdx.x][rank], // flag, // final_sync ? __ATOMIC_RELAXED : __ATOMIC_RELEASE, // __MEMORY_SCOPE_SYSTEM); __atomic_store_n(&sg.signals[threadIdx.x]->end[blockIdx.x][rank], flag, final_sync ? __ATOMIC_RELAXED : __ATOMIC_RELEASE); // wait until we got true from all ranks // while ( // __scoped_atomic_load_n(&self_sg->end[blockIdx.x][threadIdx.x], // final_sync ? __ATOMIC_RELAXED : __ATOMIC_ACQUIRE, // __MEMORY_SCOPE_DEVICE) < flag); while (__atomic_load_n(&self_sg->end[blockIdx.x][threadIdx.x], final_sync ? __ATOMIC_RELAXED : __ATOMIC_ACQUIRE) < flag); } __syncthreads(); // use one thread to update flag if (threadIdx.x == 0) self_sg->_flag[blockIdx.x] = flag; } #endif template DINLINE P packed_reduce(const P* ptrs[], int idx) { A tmp = upcast(ptrs[0][idx]); #pragma unroll for (int i = 1; i < ngpus; i++) { packed_assign_add(tmp, upcast(ptrs[i][idx])); } return downcast

(tmp); } template __global__ void __launch_bounds__(512, 1) cross_device_reduce_1stage(RankData* _dp, RankSignals sg, Signal* self_sg, T* __restrict__ result, int rank, int size) { using P = typename packed_t::P; using A = typename packed_t::A; // note: we don't reorder the address so the accumulation order is the same // for all ranks, ensuring bitwise identical results auto dp = *_dp; barrier_at_start(sg, self_sg, rank); // do the actual reduction for (int idx = blockIdx.x * blockDim.x + threadIdx.x; idx < size; idx += gridDim.x * blockDim.x) { ((P*)result)[idx] = packed_reduce((const P**)&dp.ptrs[0], idx); } barrier_at_end(sg, self_sg, rank); } template DINLINE P* get_tmp_buf(Signal* sg) { return (P*)(((Signal*)sg) + 1); } template __global__ void __launch_bounds__(1024, 1) cross_device_reduce_2stage_fuse_norm(RankData* _dp, RankSignals sg, Signal* self_sg, T* __restrict__ result, int rank, int size, int hidden_dim, T* residual_in, T* rms_gamma, float eps, std::array begin_tokens, std::array token_num_per_ranks) { static constexpr int VEC_SIZE = 16 / sizeof(T); int H_D_word_num = hidden_dim / VEC_SIZE; int token_id = blockIdx.x; // local token id int access_id_in_token = threadIdx.x; // 当前token内数据部分 int token_stride = gridDim.x; // int access_id = token_id * H_D_word_num + access_id_in_token; // local token id * (token in size) int access_stride = token_stride * H_D_word_num; // gridDim.x * (token in size) using P = typename packed_t::P; using A = typename packed_t::A; const P* ptrs[ngpus]; P* tmps[ngpus]; #pragma unroll for (int i = 0; i < ngpus; ++i) { int target = (rank + i) % ngpus; ptrs[i] = (const P*)_dp->ptrs[target]; tmps[i] = get_tmp_buf

(sg.signals[target]); } int start = begin_tokens[rank] * H_D_word_num; int part = (begin_tokens[rank] + token_num_per_ranks[rank]) * H_D_word_num; auto tmp_out = tmps[0]; // 当前rank的 (meta_data + sizeof(signal)) 偏移 barrier_at_start(sg, self_sg, rank); #pragma unroll for (int idx = access_id + start; idx < part; idx+=access_stride) { tmp_out[idx] = packed_reduce(ptrs, idx); #pragma unroll for (int r = 0; r < ngpus; ++r) tmps[r][idx] = tmp_out[idx]; //将当前GPU处理的数据--->其他GPU的对应问题 } barrier_at_end(sg, self_sg, rank); //debug --- 验证reduce结果 // for (int r = 0; r < ngpus; ++r) { // int cm_access_id = access_id + begin_tokens[r] * H_D_word_num; // int cm_token_id = token_id + begin_tokens[r]; // int cm_token_access = (begin_tokens[r] + token_num_per_ranks[r]) * H_D_word_num; // for (int idx = cm_access_id; idx < cm_token_access; idx += access_stride) // ((P*)result)[idx] = tmp_out[idx]; // } P m_residual_val, m_gamm_val; m_gamm_val = ((P*)rms_gamma)[access_id_in_token]; #pragma unroll for (int r = 0; r < ngpus; ++r) { int cm_access_id = access_id + begin_tokens[r] * H_D_word_num; int cm_token_id = token_id + begin_tokens[r]; int cm_tot_access = (begin_tokens[r] + token_num_per_ranks[r]) * H_D_word_num; for (int idx = cm_access_id; idx < cm_tot_access; idx += access_stride) { P sum_val; sum_val = tmp_out[idx]; m_residual_val =((P*)residual_in)[idx]; sum_val = vec_add(sum_val, m_residual_val); sum_val = fused_add_rms_norm(sum_val, m_gamm_val, hidden_dim, eps); ((P*)result)[idx] = sum_val; } } } template __global__ void __launch_bounds__(1024, 1) cross_device_reduce_1stage_norm_quant(RankData* _dp, RankSignals sg, Signal* self_sg, T_out* __restrict__ result, int rank, int size, int hidden_dim, T* residual_in, T* rms_gamma, float* __restrict__ scales, float eps, T* __restrict__ norm_res) { // static constexpr int VEC_SIZE = 16 / sizeof(T); static constexpr int VEC_SIZE = packed_t::P::size; int H_D_word_num = hidden_dim / VEC_SIZE; int token_id = blockIdx.x; int access_id_in_token = threadIdx.x; int token_stride = gridDim.x; int access_id = token_id * H_D_word_num + access_id_in_token; int access_stride = token_stride * H_D_word_num; using P = typename packed_t::P; using A = typename packed_t::A; using F = typename packed_t::F; P m_residual_val, m_gamm_val; m_gamm_val = reinterpret_cast(rms_gamma)[access_id_in_token]; auto dp = *_dp; P sum_val; barrier_at_start(sg, self_sg, rank); sum_val = packed_reduce((const P**)&dp.ptrs[0], access_id); barrier_at_end(sg, self_sg, rank); if constexpr(isResidual) { m_residual_val = reinterpret_cast(residual_in)[access_id]; sum_val = vec_add(m_residual_val, sum_val); ((P*)residual_in)[access_id] = sum_val; } __shared__ float s_val; P norm_out; float acc = 0.f; #pragma unroll for (int i = 0; i < VEC_SIZE; ++i) { float v = static_cast(sum_val.data[i]); acc += v * v; } __shared__ float r_sum[16]; acc = BlockReduce(acc, r_sum); if (threadIdx.x == 0) s_val = rsqrt(acc / hidden_dim + eps); __syncthreads(); float block_absmax_val_maybe = 0.f; #pragma unroll for (int i = 0; i < VEC_SIZE; ++i) { norm_out.data[i] = static_cast(sum_val.data[i]) * s_val * static_cast(m_gamm_val.data[i]); block_absmax_val_maybe = fmaxf(block_absmax_val_maybe, fabs(norm_out.data[i])); } block_absmax_val_maybe = BlockReduceMax_ROW(block_absmax_val_maybe,r_sum); // __shared__ float s_token_scale; float scale = 0.0f; if (threadIdx.x == 0) { scale = block_absmax_val_maybe; s_token_scale = scale; } __syncthreads(); float inv_s = (s_token_scale == 0.f) ? 0.f : 127.f / s_token_scale; F out_vec; #pragma unroll for (int i = 0; i < VEC_SIZE; ++i) out_vec.data[i] = float_to_int8_rn(norm_out.data[i] * inv_s); constexpr float qmax = 127.0f; constexpr float min_scale = 1.19209e-07f; ((F*)result)[access_id] = out_vec; if constexpr (update_input) ((P*)norm_res)[access_id] = norm_out; if (threadIdx.x == 0) scales[blockIdx.x] = fmaxf(scale/qmax, min_scale); } template __global__ void __launch_bounds__(1024, 1) cross_device_reduce_2stage_fuse_norm_quant(RankData* _dp, RankSignals sg, Signal* self_sg, T_out* __restrict__ result, int rank, int size, int hidden_dim, T* residual_in, T* rms_gamma, float* __restrict__ scales, float eps, T* __restrict__ norm_res, std::array begin_tokens, std::array token_num_per_ranks) { static constexpr int VEC_SIZE = 16 / sizeof(T); int H_D_word_num = hidden_dim / VEC_SIZE; int token_id = blockIdx.x; // local token id int access_id_in_token = threadIdx.x; // 当前token内数据部分 int token_stride = gridDim.x; // int access_id = token_id * H_D_word_num + access_id_in_token; // local token id * (token in size) int access_stride = token_stride * H_D_word_num; // gridDim.x * (token in size) using P = typename packed_t::P; using A = typename packed_t::A; using F = typename packed_t::F; const P* ptrs[ngpus]; P* tmps[ngpus]; #pragma unroll for (int i = 0; i < ngpus; ++i) { int target = (rank + i) % ngpus; ptrs[i] = (const P*)_dp->ptrs[target]; tmps[i] = get_tmp_buf

(sg.signals[target]); } int start = begin_tokens[rank] * H_D_word_num; int part = (begin_tokens[rank] + token_num_per_ranks[rank]) * H_D_word_num; auto tmp_out = tmps[0]; // 当前rank的 (meta_data + sizeof(signal)) 偏移 auto input = ptrs[0]; barrier_at_start(sg, self_sg, rank); #pragma unroll for (int idx = access_id + start; idx < part; idx+=access_stride) { tmp_out[idx] = packed_reduce(ptrs, idx); #pragma unroll for (int r = 0; r < ngpus; ++r) tmps[r][idx] = tmp_out[idx]; //将当前GPU处理的数据--->其他GPU的对应问题 } barrier_at_end(sg, self_sg, rank); P m_residual_val, m_gamm_val; m_gamm_val = reinterpret_cast(rms_gamma)[access_id_in_token]; #pragma unroll for (int r = 0; r < ngpus; ++r) { int cm_access_id = access_id + begin_tokens[r] * H_D_word_num; int cm_token_id = token_id + begin_tokens[r]; int cm_tot_access = (begin_tokens[r] + token_num_per_ranks[r]) * H_D_word_num; for (int idx = cm_access_id, tidx = cm_token_id; idx < cm_tot_access; idx += access_stride, tidx += token_stride) { P sum_val; sum_val = tmp_out[idx]; if constexpr (isResidual) { m_residual_val = reinterpret_cast(residual_in)[idx]; sum_val = vec_add(sum_val, m_residual_val); ((P*)residual_in)[idx] = sum_val; } __shared__ float s_val; P norm_out; float acc = 0.0f; #pragma unroll for (int i = 0; i < VEC_SIZE; ++i) { float v = static_cast(sum_val.data[i]); acc += v * v; } __shared__ float r_sum[16]; acc = BlockReduce(acc, r_sum); if (threadIdx.x == 0) s_val = rsqrtf(acc / hidden_dim + eps); __syncthreads(); float block_absmax_val_maybe = 0.f; #pragma unroll for (int i = 0; i < VEC_SIZE; ++i) { norm_out.data[i] = static_cast(static_cast(sum_val.data[i]) * s_val * static_cast(m_gamm_val.data[i])); block_absmax_val_maybe = fmaxf(block_absmax_val_maybe, fabs(norm_out.data[i])); } block_absmax_val_maybe = BlockReduceMax_ROW(block_absmax_val_maybe, r_sum); __shared__ float s_token_scale; float scale = 0.0f; if (threadIdx.x == 0) { scale = block_absmax_val_maybe; s_token_scale = scale; } __syncthreads(); float inv_s = (s_token_scale == 0.f) ? 0.f : 127.f / s_token_scale; F out_vec; #pragma unroll for (int i = 0; i < VEC_SIZE; ++i) out_vec.data[i] = float_to_int8_rn(norm_out.data[i] * inv_s); constexpr float qmax = 127.0f; constexpr float min_scale = 1.19209e-07f; ((F*)result)[idx] = out_vec; if constexpr (update_input) ((P*)norm_res)[idx] = norm_out; if (threadIdx.x == 0) scales[tidx] = fmaxf(scale/qmax, min_scale); } } } template __global__ void __launch_bounds__(512, 1) cross_device_reduce_2stage(RankData* _dp, RankSignals sg, Signal* self_sg, T* __restrict__ result, int rank, int size) { int tid = blockIdx.x * blockDim.x + threadIdx.x; int stride = gridDim.x * blockDim.x; using P = typename packed_t::P; using A = typename packed_t::A; int part = size / ngpus; int start = rank * part; int end = rank == ngpus - 1 ? size : start + part; int largest_part = part + size % ngpus; const P* ptrs[ngpus]; P* tmps[ngpus]; #pragma unroll for (int i = 0; i < ngpus; i++) { int target = (rank + i) % ngpus; ptrs[i] = (const P*)_dp->ptrs[target]; tmps[i] = get_tmp_buf

(sg.signals[target]); } auto tmp_out = tmps[0]; barrier_at_start(sg, self_sg, rank); // stage 1: reduce scatter for (int idx = start + tid; idx < end; idx += stride) { tmp_out[idx - start] = packed_reduce(ptrs, idx); } barrier_at_end(sg, self_sg, rank); // stage 2: allgather. Note: it's important to match the tid between // the two stages, because visibility across devices is only guaranteed // between threads that have the same tid. If thread i computes the sum of // start + i in the first stage, then thread i also gathers start + i from // all ranks. for (int idx = tid; idx < largest_part; idx += stride) { #pragma unroll for (int i = 0; i < ngpus; i++) { int gather_from_rank = ((rank + i) % ngpus); if (gather_from_rank == ngpus - 1 || idx < part) { int dst_idx = gather_from_rank * part + idx; ((P*)result)[dst_idx] = tmps[i][idx]; } } } } template __global__ void __launch_bounds__(512, 1) cross_device_reduce_1stage_pcie(RankData* _dp, RankSignals sg, Signal* self_sg, T* __restrict__ result, int rank, int size, uint32_t** curr_hdp_reg, int world_size) { using P = typename packed_t::P; using A = typename packed_t::A; // note: we don't reorder the address so the accumulation order is the same // for all ranks, ensuring bitwise identical results auto dp = *_dp; if (threadIdx.x == 1) { for(int i = 0; i < world_size; i++) { __atomic_store_n(curr_hdp_reg[i], 0x1, __ATOMIC_RELAXED); } } barrier_at_start(sg, self_sg, rank); // do the actual reduction for (int idx = blockIdx.x * blockDim.x + threadIdx.x; idx < size; idx += gridDim.x * blockDim.x) { ((P*)result)[idx] = packed_reduce((const P**)&dp.ptrs[0], idx); } barrier_at_end(sg, self_sg, rank); } template __global__ void __launch_bounds__(512, 1) cross_device_reduce_2stage_pcie(RankData* _dp, RankSignals sg, Signal* self_sg, T* __restrict__ result, int rank, int size, uint32_t** curr_hdp_reg, int world_size) { int tid = blockIdx.x * blockDim.x + threadIdx.x; int stride = gridDim.x * blockDim.x; using P = typename packed_t::P; using A = typename packed_t::A; int part = size / ngpus; int start = rank * part; int end = rank == ngpus - 1 ? size : start + part; int largest_part = part + size % ngpus; const P* ptrs[ngpus]; P* tmps[ngpus]; if (threadIdx.x == 1) { for(int i = 0; i < world_size; i++) { __atomic_store_n(curr_hdp_reg[i], 0x1, __ATOMIC_RELAXED); } } #pragma unroll for (int i = 0; i < ngpus; i++) { int target = (rank + i) % ngpus; ptrs[i] = (const P*)_dp->ptrs[target]; tmps[i] = get_tmp_buf

(sg.signals[target]); } auto tmp_out = tmps[0]; barrier_at_start(sg, self_sg, rank); // stage 1: reduce scatter for (int idx = start + tid; idx < end; idx += stride) { tmp_out[idx - start] = packed_reduce(ptrs, idx); } barrier_at_end(sg, self_sg, rank); // stage 2: allgather. Note: it's important to match the tid between // the two stages, because visibility across devices is only guaranteed // between threads that have the same tid. If thread i computes the sum of // start + i in the first stage, then thread i also gathers start + i from // all ranks. for (int idx = tid; idx < largest_part; idx += stride) { #pragma unroll for (int i = 0; i < ngpus; i++) { int gather_from_rank = ((rank + i) % ngpus); if (gather_from_rank == ngpus - 1 || idx < part) { int dst_idx = gather_from_rank * part + idx; ((P*)result)[dst_idx] = tmps[i][idx]; } } } } using IPC_KEY = std::array; static_assert(sizeof(IPC_KEY) == sizeof(cudaIpcMemHandle_t)); static_assert(alignof(IPC_KEY) == alignof(cudaIpcMemHandle_t)); class CustomAllreduce { public: int rank_; int world_size_; // Full NVLink or xGMI connection between GPUs. bool fully_connected_; RankSignals sg_; // Stores a map from a pointer to its peer pointers from all ranks. std::unordered_map buffers_; Signal* self_sg_; // Stores rank data from all ranks. This is mainly for cuda graph purposes. // For cuda graph to work, all kernel arguments must be fixed during graph // capture time. However, the peer pointers are not known during graph // capture time. Therefore, during capture, we increment the rank data // pointer and use that as the argument to the kernel. The kernel arguments // are stored in graph_unreg_buffers_. The actual peer pointers will be // filled in at the memory pointed to by the pointers in // graph_unreg_buffers_ when the IPC handles are exchanged between ranks. // // The overall process looks like this: // 1. Graph capture. // 2. Each rank obtains the IPC handles for each addresses used during cuda // graph capture using get_graph_buffer_ipc_meta. // 3. (In Python) all gather the IPC handles. // 4. Obtain the peer pointers by opening the IPC handles, and store them in // the rank data array at corresponding positions. RankData *d_rank_data_base_, *d_rank_data_end_; std::vector graph_unreg_buffers_; // a map from IPC handles to opened IPC pointers std::map ipc_handles_; uint32_t** dev_curr_hdp_reg; hipEvent_t stopEvent; /** * Signals are an array of ipc-enabled buffers from all ranks. * For each of the buffer, the layout is as follows: * | -- sizeof(Signal) -- | ------ a few MB ----- | * The first section is for allreduce synchronization, and the second * section is for storing the intermediate results required by some * allreduce algos. * * Note: this class does not own any device memory. Any required buffers * are passed in from the constructor. */ CustomAllreduce(Signal** signals, void* rank_data, size_t rank_data_sz, int rank, int world_size, bool fully_connected = true) : rank_(rank), world_size_(world_size), fully_connected_(fully_connected), self_sg_(signals[rank]), d_rank_data_base_(reinterpret_cast(rank_data)), d_rank_data_end_(d_rank_data_base_ + rank_data_sz / sizeof(RankData)) { for (int i = 0; i < world_size_; i++) { sg_.signals[i] = signals[i]; } if (!fully_connected) { cudaMalloc((void**)&dev_curr_hdp_reg, world_size_ * sizeof(uint32_t*)); for (int i = 0; i < world_size_; ++i) { hipDeviceGetAttribute((int*)&dev_curr_hdp_reg[i], hipDeviceAttributeHdpMemFlushCntl, i); } } cudaEventCreate(&stopEvent); } char* open_ipc_handle(const void* ipc_handle) { auto [it, new_handle] = ipc_handles_.insert({*((IPC_KEY*)ipc_handle), nullptr}); if (new_handle) { char* ipc_ptr; CUDACHECK(cudaIpcOpenMemHandle((void**)&ipc_ptr, *((const cudaIpcMemHandle_t*)ipc_handle), cudaIpcMemLazyEnablePeerAccess)); it->second = ipc_ptr; } return it->second; } std::pair> get_graph_buffer_ipc_meta() { auto num_buffers = graph_unreg_buffers_.size(); auto handle_sz = sizeof(cudaIpcMemHandle_t); std::string handles(handle_sz * num_buffers, static_cast(0)); std::vector offsets(num_buffers); for (int i = 0; i < num_buffers; i++) { auto ptr = graph_unreg_buffers_[i]; void* base_ptr; // note: must share the base address of each allocation, or we get wrong // address if (cuPointerGetAttribute(&base_ptr, rangeStartAddrAttr, (CUdeviceptr)ptr) != CUDA_SUCCESS) throw std::runtime_error("failed to get pointer attr"); CUDACHECK(cudaIpcGetMemHandle( (cudaIpcMemHandle_t*)&handles[i * handle_sz], base_ptr)); offsets[i] = ((char*)ptr) - ((char*)base_ptr); } return std::make_pair(handles, offsets); } void check_rank_data_capacity(size_t num = 1) { if (d_rank_data_base_ + num > d_rank_data_end_) throw std::runtime_error( "Rank data buffer is overflowed by " + std::to_string(d_rank_data_base_ + num - d_rank_data_end_)); } /** * Register already-shared IPC pointers. */ void register_buffer(void** ptrs) { check_rank_data_capacity(); RankData data; for (int i = 0; i < world_size_; i++) { data.ptrs[i] = ptrs[i]; } auto d_data = d_rank_data_base_++; CUDACHECK( cudaMemcpy(d_data, &data, sizeof(RankData), cudaMemcpyHostToDevice)); buffers_[ptrs[rank_]] = d_data; } // Note: when registering graph buffers, we intentionally choose to not // deduplicate the addresses. That means if the allocator reuses some // addresses, they will be registered again. This is to account for the // remote possibility of different allocation patterns between ranks. For // example, rank 1 may get the same input address for the second allreduce, // but rank 2 got a different address. IPC handles have internal reference // counting mechanism so overhead should be small. void register_graph_buffers( const std::vector& handles, const std::vector>& offsets) { auto num_buffers = graph_unreg_buffers_.size(); check_rank_data_capacity(num_buffers); std::vector rank_data(num_buffers); for (int i = 0; i < num_buffers; i++) { auto self_ptr = graph_unreg_buffers_[i]; auto& rd = rank_data[i]; for (int j = 0; j < world_size_; j++) { if (j != rank_) { char* handle = open_ipc_handle(&handles[j][i * sizeof(cudaIpcMemHandle_t)]); handle += offsets[j][i]; rd.ptrs[j] = handle; } else { rd.ptrs[j] = self_ptr; } } } CUDACHECK(cudaMemcpy(d_rank_data_base_, rank_data.data(), sizeof(RankData) * num_buffers, cudaMemcpyHostToDevice)); d_rank_data_base_ += num_buffers; graph_unreg_buffers_.clear(); } template void allreduce_pcie(cudaStream_t stream, T* input, T* output, int size, int threads = 512, int block_limit = defaultBlockLimit) { auto d = packed_t::P::size; if (size % d != 0) throw std::runtime_error( "custom allreduce currently requires input length to be multiple " "of " + std::to_string(d)); if (block_limit > kMaxBlocks) throw std::runtime_error("max supported block limit is " + std::to_string(kMaxBlocks) + ". Got " + std::to_string(block_limit)); RankData* ptrs; cudaStreamCaptureStatus status; CUDACHECK(cudaStreamIsCapturing(stream, &status)); if (status == cudaStreamCaptureStatusActive) { ptrs = d_rank_data_base_ + graph_unreg_buffers_.size(); graph_unreg_buffers_.push_back(input); } else { auto it = buffers_.find(input); if (it == buffers_.end()) throw std::runtime_error( "buffer address " + std::to_string(reinterpret_cast(input)) + " is not registered!"); ptrs = it->second; } size /= d; auto bytes = size * sizeof(typename packed_t::P); int blocks = std::min(block_limit, (size + threads - 1) / threads); #define KL(ngpus, name) \ name<<>>(ptrs, sg_, self_sg_, output, \ rank_, size, dev_curr_hdp_reg, world_size_) ; #define REDUCE_CASE(ngpus) \ case ngpus: { \ if (world_size_ == 2) { \ KL(ngpus, cross_device_reduce_1stage_pcie); \ } else { \ if ((world_size_ <= 4 && bytes < 128 * 8192) || \ (world_size_ <= 8 && bytes < 8 * 8192)) { \ KL(ngpus, cross_device_reduce_1stage_pcie); \ } else { \ KL(ngpus, cross_device_reduce_2stage_pcie); \ } \ } \ break; \ } switch (world_size_) { REDUCE_CASE(2) REDUCE_CASE(4) REDUCE_CASE(6) REDUCE_CASE(8) REDUCE_CASE(16) default: throw std::runtime_error( "custom allreduce only supports num gpus in (2,4,6,8,16). Actual " "num " "gpus = " + std::to_string(world_size_)); } #undef REDUCE_CASE #undef KL } /** * Performs allreduce, assuming input has already been registered. * * Block and grid default configs are results after careful grid search. * Using 36 blocks give the best or close to the best runtime on the devices * I tried: A100, A10, A30, T4, V100. You'll notice that NCCL kernels also * only take a small amount of SMs. Not quite sure the underlying reason, * but my guess is that too many SMs will cause contention on NVLink bus. */ template void allreduce_fuse_norm(cudaStream_t stream, T* input, T* output, int size, int token_num, int hidden_dim, T* residual, T* rms_weight, double eps, int threads = 512, int block_limit = defaultBlockLimit) { auto d = packed_t::P::size; if (hidden_dim % d != 0) throw std::runtime_error( "custom allreduce currently requires input length to be multiple " "of " + std::to_string(d)); if (block_limit > kMaxBlocks) throw std::runtime_error("max supported block limit is " + std::to_string(kMaxBlocks) + ". Got " + std::to_string(block_limit)); RankData* ptrs; cudaStreamCaptureStatus status; CUDACHECK(cudaStreamIsCapturing(stream, &status)); if (status == cudaStreamCaptureStatusActive) { ptrs = d_rank_data_base_ + graph_unreg_buffers_.size(); graph_unreg_buffers_.push_back(input); } else { auto it = buffers_.find(input); if (it == buffers_.end()) throw std::runtime_error( "buffer address " + std::to_string(reinterpret_cast(input)) + " is not registered!"); ptrs = it->second; } int block_num = token_num; #define KL(ngpus, name) \ std::array begin_tokens, token_num_per_ranks; \ int remaining_token = token_num % ngpus; \ int token_num_per_rank = token_num / ngpus; \ block_num = token_num_per_rank; \ if (remaining_token) \ block_num++; \ for (int i = 0; i < ngpus; ++i) { \ begin_tokens[i] = i * token_num_per_rank + (remaining_token > i ? i : remaining_token); \ token_num_per_ranks[i] = token_num_per_rank + (remaining_token > i ? 1 : 0); \ } \ int thread_per_token = hidden_dim / d; \ int grid_size = std::min(kMaxBlocks, block_num); \ int threads_in_block = thread_per_token; \ name<<>>(ptrs, sg_, self_sg_, output, \ rank_, size, hidden_dim, residual, \ rms_weight, eps, begin_tokens, token_num_per_ranks); #define REDUCE_CASE(ngpus) \ case ngpus: { \ if (world_size_ == 2) { \ KL(ngpus, cross_device_reduce_2stage_fuse_norm); \ } else if (fully_connected_) { \ if ((world_size_ <= 4) || \ (world_size_ <= 8 )) { \ KL(ngpus, cross_device_reduce_2stage_fuse_norm); \ } else { \ KL(ngpus, cross_device_reduce_2stage_fuse_norm); \ } \ } \ break; \ } switch (world_size_) { REDUCE_CASE(2) REDUCE_CASE(4) REDUCE_CASE(6) REDUCE_CASE(8) default: throw std::runtime_error( "custom allreduce only supports num gpus in (2,4,6,8). Actual " "num " "gpus = " + std::to_string(world_size_)); } #undef REDUCE_CASE #undef KL } template void allreduce_fuse_norm_quant(cudaStream_t stream, scalar_in_t* input, scalar_out_t* output, int size, int token_num, int hidden_dim, scalar_in_t* residual, scalar_in_t* rms_weight, scalar_in_t* norm_out, double eps, float* scales, int threads = 512, int block_limit = defaultBlockLimit) { auto d = packed_t::P::size; if (hidden_dim % d != 0) throw std::runtime_error( "custom allreduce currently requires input length to be multiple " "of " + std::to_string(d)); if (block_limit > kMaxBlocks) throw std::runtime_error("max supported block limit is " + std::to_string(kMaxBlocks) + ". Got " + std::to_string(block_limit)); RankData* ptrs; cudaStreamCaptureStatus status; CUDACHECK(cudaStreamIsCapturing(stream, &status)); if (status == cudaStreamCaptureStatusActive) { ptrs = d_rank_data_base_ + graph_unreg_buffers_.size(); graph_unreg_buffers_.push_back(input); } else { auto it = buffers_.find(input); if (it == buffers_.end()) throw std::runtime_error( "buffer address " + std::to_string(reinterpret_cast(input)) + " is not registered!"); ptrs = it->second; } int block_num = token_num; int thread_per_token = hidden_dim / d; auto bytes = (size / d) * sizeof(typename packed_t::P); #define KL1(ngpus, name) \ std::array begin_tokens, token_num_per_ranks; \ int remaining_token = token_num % ngpus; \ int token_num_per_rank = token_num / ngpus; \ block_num = token_num_per_rank; \ if (remaining_token) \ block_num++; \ for (int i = 0; i < ngpus; ++i) { \ begin_tokens[i] = i * token_num_per_rank + (remaining_token > i ? i : remaining_token); \ token_num_per_ranks[i] = token_num_per_rank + (remaining_token > i ? 1 : 0); \ } \ int grid_size = std::min(kMaxBlocks, block_num); \ int threads_in_block = thread_per_token; \ name<<>>(ptrs, sg_, \ self_sg_, output, rank_, size, hidden_dim, residual, \ rms_weight, scales, eps, norm_out, begin_tokens, token_num_per_ranks); #define KL(ngpus, name) \ name<<>>(ptrs, sg_, \ self_sg_, output, rank_, size, hidden_dim, residual, rms_weight, \ scales, eps, norm_out); #define REDUCE_CASE(ngpus) \ case ngpus: { \ if (world_size_ == 2) { \ KL(ngpus, cross_device_reduce_1stage_norm_quant); \ } else if (fully_connected_) { \ if ((world_size_ <= 4 && bytes < 1024 * 1024) || \ (world_size_ <= 8 && bytes < 512 * 1024)) { \ KL(ngpus, cross_device_reduce_1stage_norm_quant); \ } else { \ KL1(ngpus, cross_device_reduce_2stage_fuse_norm_quant); \ } \ } \ break; \ } switch (world_size_) { REDUCE_CASE(2) REDUCE_CASE(4) REDUCE_CASE(6) REDUCE_CASE(8) default: throw std::runtime_error( "custom allreduce only supports num gpus in (2,4,6,8). Actual " "num " "gpus = " + std::to_string(world_size_)); } #undef REDUCE_CASE #undef KL } template void allreduce(cudaStream_t stream, T* input, T* output, int size, int threads = 512, int block_limit = defaultBlockLimit) { auto d = packed_t::P::size; if (size % d != 0) throw std::runtime_error( "custom allreduce currently requires input length to be multiple " "of " + std::to_string(d)); if (block_limit > kMaxBlocks) throw std::runtime_error("max supported block limit is " + std::to_string(kMaxBlocks) + ". Got " + std::to_string(block_limit)); RankData* ptrs; cudaStreamCaptureStatus status; CUDACHECK(cudaStreamIsCapturing(stream, &status)); if (status == cudaStreamCaptureStatusActive) { ptrs = d_rank_data_base_ + graph_unreg_buffers_.size(); graph_unreg_buffers_.push_back(input); } else { auto it = buffers_.find(input); if (it == buffers_.end()) throw std::runtime_error( "buffer address " + std::to_string(reinterpret_cast(input)) + " is not registered!"); ptrs = it->second; } size /= d; auto bytes = size * sizeof(typename packed_t::P); int blocks = std::min(block_limit, (size + threads - 1) / threads); // #define KL(ngpus, name) \ // name<<>>(ptrs, sg_, self_sg_, output, \ // rank_, size); #define KL(ngpus, name) \ { \ void* kernelArgs[] = { \ &ptrs, &sg_, &self_sg_, &output, &rank_, &size \ }; \ hipExtLaunchKernel( \ (void*)name, \ blocks, threads, \ kernelArgs, 0, \ stream, nullptr, stopEvent, 0 \ ); \ } #define REDUCE_CASE(ngpus) \ case ngpus: { \ if (world_size_ == 2) { \ KL(ngpus, cross_device_reduce_1stage); \ } else if (fully_connected_) { \ if ((world_size_ <= 4 && bytes < 512 * 1024) || \ (world_size_ <= 8 && bytes < 256 * 1024)) { \ KL(ngpus, cross_device_reduce_1stage); \ } else { \ KL(ngpus, cross_device_reduce_2stage); \ } \ } \ break; \ } switch (world_size_) { REDUCE_CASE(2) REDUCE_CASE(4) REDUCE_CASE(6) REDUCE_CASE(8) default: throw std::runtime_error( "custom allreduce only supports num gpus in (2,4,6,8). Actual " "num " "gpus = " + std::to_string(world_size_)); } #undef REDUCE_CASE #undef KL } ~CustomAllreduce() { for (auto [_, ptr] : ipc_handles_) { CUDACHECK(cudaIpcCloseMemHandle(ptr)); } cudaFree(dev_curr_hdp_reg); cudaEventDestroy(stopEvent); } }; /** * To inspect PTX/SASS, copy paste this header file to compiler explorer and add a template instantiation: * template void vllm::CustomAllreduce::allreduce(cudaStream_t, half *, half *, int, int, int); */ } // namespace vllm