Unverified Commit 7f943a26 authored by Li Zhang's avatar Li Zhang Committed by GitHub
Browse files

Unify prefill & decode passes (#775)

* Unify prefill and decode passes

* dynamic split-fuse

* refactor

* correct input count calculation

* remove unused

* lint

* lint

* fix msvc build

* fix msvc build

* fix msvc build

* fix msvc build

* fix msvc build

* fix msvc build

* fix msvc build

* fix msvc build

* fix msvc build
parent 2ba90822
...@@ -13,8 +13,8 @@ ...@@ -13,8 +13,8 @@
namespace turbomind { namespace turbomind {
struct Request { struct Request {
uint64_t id; uint64_t id; // sequence id
uint64_t priority; uint64_t unique_id; // monotonic increasing
bool start_flag; bool start_flag;
bool end_flag; bool end_flag;
......
...@@ -36,7 +36,7 @@ SequenceManager::SequenceManager(size_t layer_num, ...@@ -36,7 +36,7 @@ SequenceManager::SequenceManager(size_t layer_num,
const Sequence* SequenceManager::Create(uint64_t id) const Sequence* SequenceManager::Create(uint64_t id)
{ {
Sequence sequence{id, {}, {}, {}, {}, {}, {}, 0.f}; Sequence sequence{id};
auto it = sequences_.find(id); auto it = sequences_.find(id);
if (it != sequences_.end()) { if (it != sequences_.end()) {
...@@ -152,18 +152,23 @@ struct Schedule { ...@@ -152,18 +152,23 @@ struct Schedule {
int last; int last;
int input_count1;
int input_count2;
Sequences active; Sequences active;
std::vector<int> block_counts; std::vector<int> block_counts;
Sequences inactive; Sequences inactive;
Sequences victims; Sequences victims;
Schedule(Snapshot snapshot, int size): Schedule(Snapshot snapshot, int size, int _input_count1, int _input_count2):
free(snapshot.free), free(snapshot.free),
cached(snapshot.cached), cached(snapshot.cached),
last(size), last(size),
use_count_(std::move(snapshot.use_count)), use_count_(std::move(snapshot.use_count)),
unlocked_(size), unlocked_(size),
it_(size) it_(size),
input_count1(_input_count1),
input_count2(_input_count2)
{ {
} }
...@@ -208,6 +213,7 @@ std::ostream& operator<<(std::ostream& os, const Schedule& s) ...@@ -208,6 +213,7 @@ std::ostream& operator<<(std::ostream& os, const Schedule& s)
struct Transaction { struct Transaction {
int index_; int index_;
int block_count_; int block_count_;
int input_count_;
int allocate_{}; int allocate_{};
int evict_{}; int evict_{};
...@@ -218,13 +224,14 @@ struct Transaction { ...@@ -218,13 +224,14 @@ struct Transaction {
const Sequences& sequences_; const Sequences& sequences_;
Schedule& schedule_; Schedule& schedule_;
explicit Transaction(const Sequences& sequences, int index, int block_count, Schedule& sched): explicit Transaction(const Sequences& sequences, int index, int block_count, int input_count, Schedule& sched):
sequences_(sequences), schedule_(sched), index_(index), block_count_(block_count) sequences_(sequences), schedule_(sched), index_(index), block_count_(block_count), input_count_(input_count)
{ {
} }
void Process() void Process()
{ {
if (schedule_.input_count1 > 0) {
int count = block_count_; int count = block_count_;
int tmp = std::min(schedule_.free, count); int tmp = std::min(schedule_.free, count);
...@@ -249,13 +256,13 @@ struct Transaction { ...@@ -249,13 +256,13 @@ struct Transaction {
break; break;
} }
} }
if (count == 0) { if (count == 0) {
Commit(); return Commit();
} }
else {
schedule_.inactive.push_back(sequences_[index_]);
} }
const_cast<Sequence*>(sequences_[index_])->input_length = 0;
schedule_.inactive.push_back(sequences_[index_]);
} }
void Commit() void Commit()
...@@ -276,6 +283,13 @@ struct Transaction { ...@@ -276,6 +283,13 @@ struct Transaction {
// update active sequences // update active sequences
schedule_.active.push_back(sequences_[index_]); schedule_.active.push_back(sequences_[index_]);
schedule_.block_counts.push_back(block_count_); schedule_.block_counts.push_back(block_count_);
if (input_count_ > schedule_.input_count2) {
input_count_ = schedule_.input_count1;
}
schedule_.input_count1 -= input_count_;
schedule_.input_count2 -= input_count_;
const_cast<Sequence*>(sequences_[index_])->input_length = input_count_;
} }
}; };
...@@ -308,6 +322,25 @@ void SequenceManager::SortByPriority(Sequences& sequences, ...@@ -308,6 +322,25 @@ void SequenceManager::SortByPriority(Sequences& sequences,
context_lengths.swap(tmp_lengths); context_lengths.swap(tmp_lengths);
} }
// template<class P, class... Ts>
// void SortByPriority(const std::vector<P>& priorities, Ts&... ranges)
// {
// // sort according to priority
// std::vector<int> idxs(priorities.size());
// std::iota(idxs.begin(), idxs.end(), 0);
// std::sort(idxs.begin(), idxs.end(), [&](int i, int j) {
// return priorities[i] < priorities[j]; //
// });
// auto reorder = [&](auto& src) {
// auto dst = src;
// for (size_t i = 0; i < idxs.size(); ++i) {
// dst[i] = src[idxs[i]];
// }
// src.swap(dst);
// };
// (reorder(ranges), ...);
// }
std::vector<int> SequenceManager::CountRequiredBlocks(const Sequences& sequences, std::vector<int> SequenceManager::CountRequiredBlocks(const Sequences& sequences,
const std::vector<int>& context_lengths, const std::vector<int>& context_lengths,
int step_length) int step_length)
...@@ -344,7 +377,8 @@ void SequenceManager::AssignAndActivate(const Sequences& sequenc ...@@ -344,7 +377,8 @@ void SequenceManager::AssignAndActivate(const Sequences& sequenc
auto SequenceManager::Materialize(Sequences sequences, auto SequenceManager::Materialize(Sequences sequences,
std::vector<int> context_lengths, std::vector<int> context_lengths,
const std::vector<uint64_t>& priorities, const std::vector<uint64_t>& priorities,
int step_length) -> Outcome int step_length,
AdjustInputCount adjust) -> Outcome
{ {
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// Schedule the assignment of blocks to sequences /// Schedule the assignment of blocks to sequences
...@@ -354,18 +388,23 @@ auto SequenceManager::Materialize(Sequences sequences, ...@@ -354,18 +388,23 @@ auto SequenceManager::Materialize(Sequences sequences,
SortByPriority(sequences, context_lengths, priorities); SortByPriority(sequences, context_lengths, priorities);
// SortByPriority(priorities, sequences, context_lengths);
// Verify and lock cache sequences to avoid their blocks being evicted unnoticed // Verify and lock cache sequences to avoid their blocks being evicted unnoticed
// the blocks can still be preempted later // the blocks can still be preempted later
VerifyAndLockCached(sequences); VerifyAndLockCached(sequences);
auto [input_count1, input_count2] = adjust(sequences, context_lengths);
std::vector<int> required = CountRequiredBlocks(sequences, context_lengths, step_length); std::vector<int> required = CountRequiredBlocks(sequences, context_lengths, step_length);
// dbg(required); // dbg(required);
Schedule schedule(block_manager_->TakeSnapshot(), sequences.size()); Schedule schedule(block_manager_->TakeSnapshot(), sequences.size(), input_count1, input_count2);
// `schedule.last` is decreasing in the loop // `schedule.last` is decreasing in the loop
for (int i = 0; i < schedule.last; ++i) { for (int i = 0; i < schedule.last; ++i) {
Transaction{sequences, i, required[i], schedule}.Process(); const int input_length = context_lengths[i] - sequences[i]->cache_len;
Transaction{sequences, i, required[i], input_length, schedule}.Process();
} }
// mark remaining sequences invalid // mark remaining sequences invalid
......
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
#pragma once #pragma once
#include "src/turbomind/models/llama/BlockManager.h" #include "src/turbomind/models/llama/BlockManager.h"
#include <functional>
namespace turbomind { namespace turbomind {
...@@ -16,19 +17,23 @@ struct Sequence { ...@@ -16,19 +17,23 @@ struct Sequence {
}; };
uint64_t id; uint64_t id;
Status status; Status status = kCached;
std::vector<const Block*> blocks; std::vector<const Block*> blocks;
std::vector<uint64_t> block_unique_ids; std::vector<uint64_t> block_unique_ids;
int input_length = 0;
mutable std::vector<int> tokens; // update by user mutable std::vector<int> tokens; // update by user
mutable int cache_len; mutable int cache_len = 0;
// additional data kept round-to-round // additional data kept round-to-round
mutable std::vector<std::byte> random_state; // update by user mutable std::vector<std::byte> random_state; // update by user
mutable float rope_theta; mutable float rope_theta = 0.f;
Sequence(uint64_t _id): id(_id) {}
friend std::ostream& operator<<(std::ostream& os, const Sequence& seq); friend std::ostream& operator<<(std::ostream& os, const Sequence& seq);
}; };
...@@ -74,10 +79,13 @@ public: ...@@ -74,10 +79,13 @@ public:
int swap_out; int swap_out;
}; };
using AdjustInputCount = std::function<std::pair<int, int>(const Sequences&, const std::vector<int>&)>;
[[nodiscard]] Outcome Materialize(Sequences sequences, [[nodiscard]] Outcome Materialize(Sequences sequences,
std::vector<int> context_lengths, std::vector<int> context_lengths,
const std::vector<uint64_t>& priorities, const std::vector<uint64_t>& priorities,
int step_length); int step_length,
AdjustInputCount adjust);
void* OffsetKey(void* block_ptr) void* OffsetKey(void* block_ptr)
{ {
......
// Copyright (c) OpenMMLab. All rights reserved.
#pragma once
#include "src/turbomind/models/llama/llama_kernels.h"
#include "src/turbomind/utils/cuda_utils.h"
namespace turbomind {
class BatchedCopy {
public:
template<class T, std::enable_if_t<alignof(T) <= alignof(uint32_t), int> = 0>
T* Add(const T* src, int size, T* dst)
{
src_.push_back((void*)src);
dst_.push_back((void*)dst);
size_.push_back(sizeof(T) * size);
return dst + size;
}
void Submit(cudaStream_t stream)
{
invokeBatchedCopy(src_.data(), dst_.data(), size_.data(), size_.size(), stream);
sync_check_cuda_error();
src_.clear();
dst_.clear();
size_.clear();
}
private:
std::vector<void*> src_;
std::vector<void*> dst_;
std::vector<int> size_;
};
} // namespace turbomind
...@@ -101,6 +101,8 @@ __device__ T blockReduceSum(const cg::thread_block& block, T value) ...@@ -101,6 +101,8 @@ __device__ T blockReduceSum(const cg::thread_block& block, T value)
return cg::reduce(tile, value, cg::plus<float>{}); return cg::reduce(tile, value, cg::plus<float>{});
} }
// r' = r + x
// x' = norm(r') * scales
template<typename T> template<typename T>
__global__ void fusedAddBiasResidualNorm(T* __restrict__ r_data, __global__ void fusedAddBiasResidualNorm(T* __restrict__ r_data,
T* __restrict__ x_data, T* __restrict__ x_data,
......
...@@ -9,11 +9,13 @@ ...@@ -9,11 +9,13 @@
#include "src/turbomind/models/llama/llama_utils.h" #include "src/turbomind/models/llama/llama_utils.h"
#include "src/turbomind/utils/cuda_type_utils.cuh" #include "src/turbomind/utils/cuda_type_utils.cuh"
#include "src/turbomind/utils/cuda_utils.h" #include "src/turbomind/utils/cuda_utils.h"
#include "src/turbomind/utils/dispatch.h"
#include "src/turbomind/utils/logger.h" #include "src/turbomind/utils/logger.h"
#include <algorithm> #include <algorithm>
#include <cstdint> #include <cstdint>
#include <cub/block/block_reduce.cuh> #include <cub/block/block_reduce.cuh>
#include <type_traits> #include <type_traits>
#include <utility>
namespace turbomind { namespace turbomind {
...@@ -544,8 +546,10 @@ __global__ void gatherOutput(int* output_ids, ...@@ -544,8 +546,10 @@ __global__ void gatherOutput(int* output_ids,
} }
// skip padding for dst // skip padding for dst
const int dst_idx = src_idx < context_len ? src_idx : src_idx - (max_context_len - context_len); const int dst_idx = src_idx < context_len ? src_idx : src_idx - (max_context_len - context_len);
if (dst_idx < max_output_len) {
output_ids[dst_idx] = ids[src_idx * batch_size + batch_id]; output_ids[dst_idx] = ids[src_idx * batch_size + batch_id];
} }
}
} }
void invokeGatherOutput(int* output_ids, void invokeGatherOutput(int* output_ids,
...@@ -694,8 +698,10 @@ void invokeIndexedCopyImpl(void** h_src_ptr, ...@@ -694,8 +698,10 @@ void invokeIndexedCopyImpl(void** h_src_ptr,
int count, int count,
cudaStream_t st) cudaStream_t st)
{ {
auto invoke = [&](auto max_count) { dispatch( // dispatch for num of copy operations
constexpr int C = decltype(max_count)::value; std::integer_sequence<int, 4, 8, 16, 32, 64, 128, 256>{},
[&](auto C) { return count <= C; },
[&](auto C) {
// maximum parameter size: sm<70: 4kB, sm>=70: 32kB // maximum parameter size: sm<70: 4kB, sm>=70: 32kB
static_assert(sizeof(IndexedCopyParam<N, C>) <= 4096); static_assert(sizeof(IndexedCopyParam<N, C>) <= 4096);
IndexedCopyParam<N, C> param{}; IndexedCopyParam<N, C> param{};
...@@ -711,33 +717,12 @@ void invokeIndexedCopyImpl(void** h_src_ptr, ...@@ -711,33 +717,12 @@ void invokeIndexedCopyImpl(void** h_src_ptr,
return src ? (void)std::copy_n(src + offset, n, dst) : std::iota(dst, dst + n, offset); return src ? (void)std::copy_n(src + offset, n, dst) : std::iota(dst, dst + n, offset);
}; };
for (int c = 0; c < count; c += C) { for (int c = 0; c < count; c += C) {
int batch_size = std::min(count - c, C); int batch_size = std::min(count - c, (int)C);
copy_idx(h_src_idx, c, batch_size, param.src_idx.data()); copy_idx(h_src_idx, c, batch_size, param.src_idx.data());
copy_idx(h_dst_idx, c, batch_size, param.dst_idx.data()); copy_idx(h_dst_idx, c, batch_size, param.dst_idx.data());
indexedCopy<T><<<batch_size, 128, 0, st>>>(param); indexedCopy<T><<<batch_size, 128, 0, st>>>(param);
} }
}; });
if (count <= 4) {
invoke(std::integral_constant<int, 4>{});
}
if (count <= 8) {
invoke(std::integral_constant<int, 8>{});
}
else if (count <= 16) {
invoke(std::integral_constant<int, 16>{});
}
else if (count <= 32) {
invoke(std::integral_constant<int, 32>{});
}
else if (count <= 64) {
invoke(std::integral_constant<int, 64>{});
}
else if (count <= 128) {
invoke(std::integral_constant<int, 128>{});
}
else {
invoke(std::integral_constant<int, 256>{});
}
} }
void invokeIndexedCopy(void** h_src_ptr, void invokeIndexedCopy(void** h_src_ptr,
...@@ -749,19 +734,14 @@ void invokeIndexedCopy(void** h_src_ptr, ...@@ -749,19 +734,14 @@ void invokeIndexedCopy(void** h_src_ptr,
int n_copys, int n_copys,
cudaStream_t st) cudaStream_t st)
{ {
auto args = std::tuple{h_src_ptr, h_dst_ptr, h_elem_sz, h_src_idx, h_dst_idx, count, st}; auto success = dispatch(std::integer_sequence<int, 1, 2, 3, 4>{}, [&](auto N) {
switch (n_copys) { if (N == n_copys) {
case 1: invokeIndexedCopyImpl<uint32_t, N>(h_src_ptr, h_dst_ptr, h_elem_sz, h_src_idx, h_dst_idx, count, st);
return std::apply(invokeIndexedCopyImpl<uint32_t, 1>, args); return true;
case 2:
return std::apply(invokeIndexedCopyImpl<uint32_t, 2>, args);
case 3:
return std::apply(invokeIndexedCopyImpl<uint32_t, 3>, args);
case 4:
return std::apply(invokeIndexedCopyImpl<uint32_t, 4>, args);
default:
FT_CHECK(0);
} }
return false;
});
FT_CHECK(success);
} }
__global__ void padLastTokenIds(int* token_ids, const int* context_length, int max_context_len, int batch_size) __global__ void padLastTokenIds(int* token_ids, const int* context_length, int max_context_len, int batch_size)
...@@ -777,6 +757,96 @@ void invokePadLastTokenIds( ...@@ -777,6 +757,96 @@ void invokePadLastTokenIds(
padLastTokenIds<<<1, 512, 0, stream>>>(token_ids, context_length, max_context_len, batch_size); padLastTokenIds<<<1, 512, 0, stream>>>(token_ids, context_length, max_context_len, batch_size);
} }
template<typename T>
__global__ void getFeatureOfLastToken(T* output, const T* input, const int* cu_seqlens, int dims)
{
int bi = blockIdx.x;
int ti = cu_seqlens[bi + 1] - 1;
for (int i = threadIdx.x; i < dims; i += blockDim.x) {
output[dims * bi + i] = input[dims * ti + i];
}
}
template<typename T>
void invokeGetFeatureOfLastToken(
T* output, const T* input, const int* cu_seqlens, int dims, int batch_size, cudaStream_t stream)
{
getFeatureOfLastToken<<<batch_size, 256, 0, stream>>>(output, input, cu_seqlens, dims);
}
template void invokeGetFeatureOfLastToken(half*, const half*, const int*, int, int, cudaStream_t);
template void invokeGetFeatureOfLastToken(float*, const float*, const int*, int, int, cudaStream_t);
template<class T, int C>
struct BatchedCopyParam {
Array<T*, C> src_ptr;
Array<T*, C> dst_ptr;
Array<int, C> size;
int count;
};
template<int kThrPerCpy, class T, int C>
__global__ void batchedCopy(BatchedCopyParam<T, C> param)
{
const int ti = threadIdx.x + blockIdx.x * blockDim.x;
const int bi = ti / kThrPerCpy;
if (bi >= param.count) {
return;
}
const T* __restrict__ src = param.src_ptr[bi];
T* __restrict__ dst = param.dst_ptr[bi];
int size = param.size[bi];
for (int i = ti % kThrPerCpy; i < size; i += kThrPerCpy) {
dst[i] = src[i];
}
}
// MSVC does not like CUDA kernel launch inside nested lambdas
template<class P>
struct BatchedCopyLauncher {
int max_size;
int count;
const P* params;
cudaStream_t st;
template<int S>
void operator()(std::integral_constant<int, S>) const
{
constexpr int threads = 128;
constexpr int items_per_block = threads / S;
const int blocks = (count + items_per_block - 1) / items_per_block;
batchedCopy<S><<<blocks, threads, 0, st>>>(*params);
}
};
void invokeBatchedCopy(void** src_ptr, void** dst_ptr, int* size, int count, cudaStream_t st)
{
dispatch(
std::integer_sequence<int, 1, 8, 32, 128>{},
[&](auto C) { return count <= C; },
[&](auto C) {
using T = uint32_t;
BatchedCopyParam<T, C> params{};
// TODO: on CUDA 12.1 and sm_70+ this can be 32K
static_assert(sizeof(params) <= 4096);
for (int c = 0; c < count; c += C) {
const int bsz = std::min<int>(count - c, C);
params.count = bsz;
for (int i = 0; i < bsz; ++i) {
params.src_ptr[i] = (T*)src_ptr[c + i];
params.dst_ptr[i] = (T*)dst_ptr[c + i];
FT_CHECK(size[c + i] % sizeof(T) == 0);
params.size[i] = size[c + i] / sizeof(T);
}
const int max_size = *std::max_element(params.size.begin(), params.size.end());
dispatch(
std::integer_sequence<int, 1, 2, 4, 8, 16, 32, 64, 128>{},
[&](auto S) { return max_size <= S; },
BatchedCopyLauncher<BatchedCopyParam<T, C>>{max_size, count, &params, st});
}
});
}
#define VERSION_SWITCH(VERSION, CONST_NAME, ...) \ #define VERSION_SWITCH(VERSION, CONST_NAME, ...) \
[&] { \ [&] { \
if (VERSION == 2) { \ if (VERSION == 2) { \
......
...@@ -105,6 +105,8 @@ void invokeIndexedCopy(void** h_src_ptr, ...@@ -105,6 +105,8 @@ void invokeIndexedCopy(void** h_src_ptr,
int n_copys, int n_copys,
cudaStream_t st); cudaStream_t st);
void invokeBatchedCopy(void** src_ptr, void** dst_ptr, int* size, int count, cudaStream_t st);
// ABCDe ABCDe e // ABCDe ABCDe e
// ABCDEFGHIJk ABCDEFGHIJk // ABCDEFGHIJk ABCDEFGHIJk
// ABCDEFGHi -> ABCDEFGHi i // ABCDEFGHi -> ABCDEFGHi i
...@@ -113,6 +115,10 @@ void invokeIndexedCopy(void** h_src_ptr, ...@@ -113,6 +115,10 @@ void invokeIndexedCopy(void** h_src_ptr,
void invokePadLastTokenIds( void invokePadLastTokenIds(
int* token_ids, const int* context_length, int max_context_len, int batch_size, cudaStream_t stream); int* token_ids, const int* context_length, int max_context_len, int batch_size, cudaStream_t stream);
template<typename T>
void invokeGetFeatureOfLastToken(
T* output, const T* input, const int* cu_seqlens, int dims, int batch_size, cudaStream_t stream);
void invokeMyCopyInt(int* dst, const int* src, size_t count, cudaStream_t st); void invokeMyCopyInt(int* dst, const int* src, size_t count, cudaStream_t st);
template<typename T> template<typename T>
......
...@@ -13,4 +13,21 @@ struct LlamaAttentionParams { ...@@ -13,4 +13,21 @@ struct LlamaAttentionParams {
bool use_logn_attn; bool use_logn_attn;
}; };
struct EngineParams {
// batch params
int max_batch_size;
int session_len;
int step_length;
// cache params
float cache_max_block_count;
int cache_chunk_size;
// chunking params
int max_context_token_num;
int num_tokens_per_iter;
int extra_tokens_per_iter;
int max_prefill_iters;
};
} // namespace turbomind } // namespace turbomind
...@@ -25,17 +25,26 @@ ...@@ -25,17 +25,26 @@
#include "src/turbomind/models/llama/LlamaLinear.h" #include "src/turbomind/models/llama/LlamaLinear.h"
#include "src/turbomind/models/llama/llama_params.h" #include "src/turbomind/models/llama/llama_params.h"
#include "src/turbomind/utils/Tensor.h" #include "src/turbomind/utils/Tensor.h"
#include "src/turbomind/utils/cuda_utils.h"
#include "src/turbomind/utils/nccl_utils.h" #include "src/turbomind/utils/nccl_utils.h"
namespace turbomind { namespace turbomind {
template<typename T> template<typename T>
class LlamaContextAttentionLayer { class UnifiedAttentionLayer {
public: public:
void freeBuffer(); using WeightType = LlamaAttentionWeight<T>;
void allocateBuffer(size_t batch_size, size_t num_token, size_t max_q_len, size_t max_kv_len); static constexpr int kDecodeMaxSplits = 16;
LlamaContextAttentionLayer(size_t head_num, void freeBuffer();
void allocateBuffer(size_t num_token,
size_t pf_batch_size,
size_t pf_max_q_len,
size_t pf_max_k_len,
size_t dc_batch_size,
size_t dc_max_split_k);
UnifiedAttentionLayer(size_t head_num,
size_t kv_head_num, size_t kv_head_num,
size_t size_per_head, size_t size_per_head,
LlamaAttentionParams attn_params, LlamaAttentionParams attn_params,
...@@ -65,11 +74,50 @@ public: ...@@ -65,11 +74,50 @@ public:
quant_policy_(quant_policy) quant_policy_(quant_policy)
{ {
FT_CHECK(head_num % kv_head_num == 0); FT_CHECK(head_num % kv_head_num == 0);
arch_ = getSMVersion();
} }
void forward(TensorMap* output_tensors, const TensorMap* input_tensors, const LlamaAttentionWeight<T>* weights); void forward(TensorMap* outputs, const TensorMap* inputs, const LlamaAttentionWeight<T>* weights);
void fusedMultiHeadAttention(T** key_cache_ptrs, void prefill(T* output,
const T* qkv,
void** k_cache_ptrs,
void** v_cache_ptrs,
const T* attention_mask,
const int* cu_seqlens,
const int* padding_offset,
T** tmp_k_ptrs,
T** tmp_v_ptrs,
const int* input_length,
const int* context_length,
const int* cu_block_count,
const float* rope_theta,
int pf_batch_size,
int pf_num_token,
size_t layer_offset,
int pf_max_q_len,
int pf_max_k_len,
int pf_session_len,
const WeightType* weights);
void decode(T* output,
const T* qkv,
void** k_cache_ptrs,
void** v_cache_ptrs,
const int* cu_block_count,
const int* context_length,
const bool* is_finished,
const float* rope_theta,
size_t layer_offset,
int batch_size,
int dc_sum_seq_len,
int dc_max_seq_len,
int max_split_k,
const WeightType* weights);
void fusedMultiHeadAttention(T* output,
const T* query,
T** key_cache_ptrs,
T** val_cache_ptrs, T** val_cache_ptrs,
size_t cache_layer_offset, size_t cache_layer_offset,
T* attention_mask, T* attention_mask,
...@@ -80,7 +128,9 @@ public: ...@@ -80,7 +128,9 @@ public:
int max_k_len, int max_k_len,
int max_seq_len); int max_seq_len);
void unfusedMultiHeadAttention(T** key_cache_ptrs, void unfusedMultiHeadAttention(T* output,
const T* query,
T** key_cache_ptrs,
T** val_cache_ptrs, T** val_cache_ptrs,
size_t cache_layer_offset, size_t cache_layer_offset,
const T* attention_mask, const T* attention_mask,
...@@ -116,6 +166,8 @@ private: ...@@ -116,6 +166,8 @@ private:
cublasMMWrapper* cublas_wrapper_; cublasMMWrapper* cublas_wrapper_;
LlamaLinear<T> linear_; LlamaLinear<T> linear_;
int arch_{};
T* qkv_buf_{}; T* qkv_buf_{};
T* q_buf_2_{}; T* q_buf_2_{};
T* k_buf_2_{}; T* k_buf_2_{};
...@@ -126,6 +178,7 @@ private: ...@@ -126,6 +178,7 @@ private:
float* qk_buf_float_{}; float* qk_buf_float_{};
T* qkv_buf_2_{}; T* qkv_buf_2_{};
T* qkv_buf_3_{}; T* qkv_buf_3_{};
float* dc_workspace_{};
bool is_allocate_buffer_ = false; bool is_allocate_buffer_ = false;
}; };
......
#include "src/turbomind/models/llama/unified_decoder.h"
#include "src/turbomind/kernels/bert_preprocess_kernels.h"
#include "src/turbomind/kernels/gpt_kernels.h"
#include "src/turbomind/models/llama/llama_decoder_kernels.h"
#include "src/turbomind/models/llama/llama_kernels.h"
#include "src/turbomind/models/llama/unified_attention_layer.h"
#include "src/turbomind/utils/cuda_utils.h"
namespace turbomind {
template<typename T>
void UnifiedDecoder<T>::allocateBuffer(size_t num_token, size_t pf_batch_size, size_t pf_max_q_len, size_t pf_max_k_len)
{
TM_LOG_DEBUG(__PRETTY_FUNCTION__);
if (pf_batch_size) {
attention_mask_ =
(T*)allocator_->reMalloc(attention_mask_, sizeof(T) * pf_batch_size * pf_max_q_len * pf_max_k_len, false);
padding_offset_ =
(int*)allocator_->reMalloc(padding_offset_, sizeof(int) * pf_batch_size * pf_max_q_len, false);
cu_seqlens_ = (int*)allocator_->reMalloc(cu_seqlens_, sizeof(int) * (pf_batch_size + 1), false);
}
}
template<typename T>
void UnifiedDecoder<T>::freeBuffer()
{
TM_LOG_DEBUG(__PRETTY_FUNCTION__);
allocator_->free((void**)&padding_offset_);
allocator_->free((void**)&cu_seqlens_);
allocator_->free((void**)&attention_mask_);
allocator_->free((void**)&h_pinned_token_num_ptr_, true);
}
template<typename T>
void UnifiedDecoder<T>::initialize(const LlamaAttentionParams& attn_params,
size_t kv_head_num,
bool use_fmha,
int cache_block_seq_len,
int quant_policy)
{
h_pinned_token_num_ptr_ = (size_t*)allocator_->reMalloc(h_pinned_token_num_ptr_, sizeof(size_t), true, true);
attn_layer_ = new UnifiedAttentionLayer<T>(head_num_,
kv_head_num,
size_per_head_,
attn_params,
tensor_para_,
stream_,
cublas_wrapper_,
allocator_,
is_free_buffer_after_forward_,
use_fmha,
cache_block_seq_len,
quant_policy);
ffn_layer_ = new LlamaFfnLayer<T>(head_num_,
size_per_head_,
inter_size_,
tensor_para_,
stream_,
cublas_wrapper_,
allocator_,
is_free_buffer_after_forward_);
}
template<typename T>
void UnifiedDecoder<T>::forwardSelfAttn(T* attn_io,
TensorMap* _outputs,
const TensorMap* _inputs,
size_t token_num,
size_t pf_batch_size,
size_t pf_max_q_len,
size_t pf_max_k_len,
size_t dc_batch_size,
int layer_id,
const LlamaAttentionWeight<T>* weight)
{
TensorMap inputs(*_inputs);
inputs.insert("input_query", {MEMORY_GPU, dtype_, {token_num, hidden_units_}, attn_io});
inputs.insert("layer_id", {MEMORY_CPU, TYPE_INT32, {1}, &layer_id});
if (pf_batch_size) {
inputs.insert("attention_mask",
{MEMORY_GPU, dtype_, {pf_batch_size, 1, pf_max_q_len, pf_max_k_len}, attention_mask_});
const size_t pf_token_num = token_num - dc_batch_size;
inputs.insert("padding_offset", {MEMORY_GPU, TYPE_INT32, {pf_token_num}, padding_offset_});
inputs.insert("cu_seqlens", {MEMORY_GPU, TYPE_INT32, {pf_batch_size + 1}, cu_seqlens_});
}
TensorMap outputs(*_outputs);
outputs.insert("hidden_features", {MEMORY_GPU, dtype_, {token_num, hidden_units_}, attn_io});
attn_layer_->forward(&outputs, &inputs, weight);
}
template<typename T>
UnifiedDecoder<T>::~UnifiedDecoder()
{
delete attn_layer_;
delete ffn_layer_;
freeBuffer();
}
template<typename T>
void UnifiedDecoder<T>::forward(TensorMap* outputs, const TensorMap* inputs, const std::vector<WeightType*>* weights)
{
/**
* input tensors:
* \param decoder_input [num_token, hidden_units], float
* \param input_lengths [batch_size], int
* \param history_lengths [batch_size], int
* \param context_legnths [batch_size], int
* \param output_norm_weight [hidden_dims], float
* \param max_q_len [1], int on cpu
* \param max_kv_len [1], int on cpu
* \param max_seq_len [1], int on cpu
*
* output tensors:
* \param decoder_output [num_token, hidden_units],
* \param key_cache [num_layer, batch, local_head_num, size_per_head // x, max_seq_len, x]
* \param value_cache [num_layer, batch, local_head_num, max_seq_len, size_per_head]
* \param last_token_hidden_units [batch_size, hidden_units]
*/
// Session sess{};
const size_t token_num = inputs->at("decoder_input").shape[0];
const int pf_max_q_len = inputs->getVal<int>("pf_max_q_len");
const int pf_max_k_len = inputs->getVal<int>("pf_max_k_len");
const int pf_batch_size = inputs->getVal<int>("pf_batch_size");
const int dc_batch_size = inputs->getVal<int>("dc_batch_size");
const int* input_length = inputs->getPtr<int>("input_lengths");
const int* context_length = inputs->getPtr<int>("context_lengths");
T* decoder_input_output = inputs->getPtr<T>("decoder_input");
T* decoder_output = outputs->getPtr<T>("decoder_output");
T* last_token_hidden_units = outputs->getPtr<T>("last_token_hidden_units");
allocateBuffer(token_num, pf_batch_size, pf_max_q_len, pf_max_k_len);
const int pf_offset = dc_batch_size;
if (pf_batch_size) {
FT_CHECK(padding_offset_);
size_t tmp_token_num{};
// `cu_seqlens` is exclusive sum of "input_lengths"
invokeGetPaddingOffsetAndCuSeqLens(h_pinned_token_num_ptr_,
&tmp_token_num, // updated token num
padding_offset_,
cu_seqlens_,
input_length + pf_offset,
pf_batch_size,
pf_max_q_len,
stream_);
sync_check_cuda_error();
FT_CHECK(tmp_token_num == token_num - dc_batch_size);
invokeCreateCausalMasks(attention_mask_,
input_length + pf_offset,
context_length + pf_offset,
pf_max_q_len,
pf_max_k_len,
pf_batch_size,
stream_);
sync_check_cuda_error();
}
/////////////////////////////////////////////
/// RMSNorm
invokeRootMeanSquareNorm(decoder_output,
decoder_input_output,
weights->at(0)->self_attn_norm_weights,
rmsnorm_eps_,
token_num,
hidden_units_,
stream_);
sync_check_cuda_error();
for (size_t layer = 0; layer < num_layer_; ++layer) {
/////////////////////////////////////////////
/// self-attention
forwardSelfAttn(decoder_output,
outputs,
inputs,
token_num,
pf_batch_size,
pf_max_q_len,
pf_max_k_len,
dc_batch_size,
layer,
&weights->at(layer)->self_attn_weights);
invokeFusedAddBiasResidualRMSNorm(decoder_input_output,
decoder_output,
weights->at(layer)->self_attn_weights.output.bias,
weights->at(layer)->ffn_norm_weights,
rmsnorm_eps_,
token_num,
hidden_units_,
stream_);
sync_check_cuda_error();
////////////////////////////////////////////
/// feed-forward network
TensorMap ffn_inputs{{"ffn_input", {MEMORY_GPU, dtype_, {token_num, hidden_units_}, decoder_output}}};
TensorMap ffn_outputs{{"ffn_output", {MEMORY_GPU, dtype_, {token_num, hidden_units_}, decoder_output}}};
ffn_layer_->forward(&ffn_outputs, &ffn_inputs, &weights->at(layer)->ffn_weights);
const bool is_last_layer = layer == num_layer_ - 1;
auto scale_weight = !is_last_layer ? weights->at(layer + 1)->self_attn_norm_weights :
inputs->at("output_norm_weight").getPtr<T>();
invokeFusedAddBiasResidualRMSNorm(decoder_input_output,
decoder_output,
weights->at(layer)->ffn_weights.output.bias,
scale_weight,
rmsnorm_eps_,
token_num,
hidden_units_,
stream_);
sync_check_cuda_error();
}
if (dc_batch_size) {
check_cuda_error(cudaMemcpyAsync(last_token_hidden_units,
decoder_output,
sizeof(T) * dc_batch_size * hidden_units_,
cudaMemcpyDefault,
stream_));
}
if (pf_batch_size) {
invokeGetFeatureOfLastToken(last_token_hidden_units + pf_offset * hidden_units_, //
decoder_output + pf_offset * hidden_units_,
cu_seqlens_,
hidden_units_,
pf_batch_size,
stream_);
sync_check_cuda_error();
}
if (is_free_buffer_after_forward_) {
freeBuffer();
}
}
template class UnifiedDecoder<float>;
template class UnifiedDecoder<half>;
} // namespace turbomind
#pragma once
#include "src/turbomind/models/llama/LlamaDecoderLayerWeight.h"
#include "src/turbomind/models/llama/LlamaFfnLayer.h"
#include "src/turbomind/models/llama/llama_params.h"
#include "src/turbomind/models/llama/unified_attention_layer.h"
#include "src/turbomind/utils/cublasMMWrapper.h"
#include "src/turbomind/utils/nccl_utils.h"
namespace turbomind {
template<typename T>
class UnifiedDecoder {
protected:
void allocateBuffer(size_t num_token, size_t pfill_batch_size, size_t pfill_max_q_len, size_t pfill_max_k_len);
void freeBuffer();
void initialize(const LlamaAttentionParams& attn_params,
size_t kv_head_num,
bool use_fmha,
int cache_block_seq_len,
int quant_policy);
cudaStream_t stream_;
cublasMMWrapper* cublas_wrapper_;
IAllocator* allocator_;
bool is_free_buffer_after_forward_{};
size_t head_num_;
size_t size_per_head_;
size_t inter_size_;
size_t num_layer_;
size_t hidden_units_;
float rmsnorm_eps_;
NcclParam tensor_para_;
T* attention_mask_{};
int* padding_offset_{};
int* cu_seqlens_{}; // cu for cumulative
size_t* h_pinned_token_num_ptr_{};
UnifiedAttentionLayer<T>* attn_layer_{};
LlamaFfnLayer<T>* ffn_layer_{};
const DataType dtype_;
using WeightType = LlamaDecoderLayerWeight<T>;
void forwardSelfAttn(T* attn_io,
TensorMap* _outputs,
const TensorMap* _inputs,
size_t token_num,
size_t pf_batch_size,
size_t pf_max_q_len,
size_t pf_max_k_len,
size_t dc_batch_size,
int layer_id,
const LlamaAttentionWeight<T>* weight);
public:
UnifiedDecoder(size_t head_num,
size_t kv_head_num,
size_t size_per_head,
size_t inter_size,
size_t num_layer,
const LlamaAttentionParams& attn_params,
float rmsnorm_eps,
NcclParam tensor_para,
cudaStream_t stream,
cublasMMWrapper* cublas_wrapper,
IAllocator* allocator,
bool is_free_buffer_after_forward,
bool use_fmha,
int cache_block_seq_len,
int quant_policy):
stream_(stream),
cublas_wrapper_(cublas_wrapper),
allocator_(allocator),
is_free_buffer_after_forward_(is_free_buffer_after_forward),
head_num_(head_num),
size_per_head_(size_per_head),
inter_size_(inter_size),
hidden_units_(head_num * size_per_head),
num_layer_(num_layer),
rmsnorm_eps_(rmsnorm_eps),
tensor_para_(tensor_para),
dtype_(getTensorType<T>())
{
initialize(attn_params, kv_head_num, use_fmha, cache_block_seq_len, quant_policy);
}
~UnifiedDecoder();
void forward(TensorMap* outputs, const TensorMap* inputs, const std::vector<WeightType*>* weights);
};
} // namespace turbomind
...@@ -89,16 +89,11 @@ private: ...@@ -89,16 +89,11 @@ private:
size_t num_layer_; size_t num_layer_;
size_t vocab_size_; size_t vocab_size_;
turbomind::LlamaAttentionParams attn_params_; turbomind::LlamaAttentionParams attn_params_;
turbomind::EngineParams engine_params_;
float norm_eps_; float norm_eps_;
int max_batch_size_;
int max_context_token_num_;
int session_len_;
int step_length_;
int start_id_; int start_id_;
int end_id_; int end_id_;
float cache_max_block_count_;
int cache_block_seq_len_; int cache_block_seq_len_;
int cache_chunk_size_;
int use_context_fmha_; int use_context_fmha_;
size_t tensor_para_size_; size_t tensor_para_size_;
size_t pipeline_para_size_; size_t pipeline_para_size_;
......
// Copyright (c) OpenMMLab. All rights reserved.
#pragma once
#include <utility>
namespace turbomind {
namespace detail {
template<int X>
inline constexpr std::integral_constant<int, X> _Int{};
template<class F, class P, class G, int... Xs, std::size_t... Is>
bool dispatch_impl(F&& f, P&& p, G g, std::integer_sequence<int, Xs...>, std::index_sequence<Is...>)
{
constexpr int N = sizeof...(Xs);
return (((((P &&) p)(_Int<Xs>) || (g && Is == N - 1)) && (((F &&) f)(_Int<Xs>), 1)) || ...);
}
} // namespace detail
template<class F, class P, int... Is, class G = std::true_type>
bool dispatch(std::integer_sequence<int, Is...> seq, P&& p, F&& f, G g = {})
{
return detail::dispatch_impl((F &&) f, (P &&) p, g, seq, std::make_index_sequence<sizeof...(Is)>{});
}
template<class F, int... Is, class G = std::true_type>
bool dispatch(std::integer_sequence<int, Is...> seq, F&& f)
{
return (((F &&) f)(detail::_Int<Is>) || ...);
}
} // namespace turbomind
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment