LlamaBatch.h 7.12 KB
Newer Older
Li Zhang's avatar
Li Zhang committed
1
2
3
4
// Copyright (c) OpenMMLab. All rights reserved.

#pragma once

Li Zhang's avatar
Li Zhang committed
5
6
// #include "src/turbomind/models/llama/LlamaCacheManager.h"
#include "src/turbomind/models/llama/Barrier.h"
7
#include "src/turbomind/models/llama/LlamaNcclGuard.h"
lvhan028's avatar
lvhan028 committed
8
#include "src/turbomind/models/llama/Request.h"
Li Zhang's avatar
Li Zhang committed
9
#include "src/turbomind/models/llama/SequenceManager.h"
lvhan028's avatar
lvhan028 committed
10
11
#include "src/turbomind/utils/allocator.h"
#include "src/turbomind/utils/cublasMMWrapper.h"
Li Zhang's avatar
Li Zhang committed
12
13
#include <condition_variable>
#include <mutex>
Li Zhang's avatar
Li Zhang committed
14

lvhan028's avatar
lvhan028 committed
15
namespace turbomind {
Li Zhang's avatar
Li Zhang committed
16

Li Zhang's avatar
Li Zhang committed
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
struct BatchState {
    int*  h_context_length;
    bool* h_finished;

    void* top_k_curand_state;
    void* top_p_curand_state;
    int*  output_ids;  // output ids in [B, S]

    float* h_rope_theta;

    std::vector<int> seq_len_limit;
    std::vector<int> is_swap_in;

    std::vector<const Sequence*>          sequences;
    std::vector<std::shared_ptr<Request>> requests;

    // |<-- existing -->|<-- swap-in -->|
    // |<----------- active ----------->|<-- inactive -->|
    int active_size;
    int size;
};

Li Zhang's avatar
Li Zhang committed
39
40
41
42
43
44
template<typename T>
class LlamaV2;

template<typename T>
class LlamaBatch {
public:
Li Zhang's avatar
Li Zhang committed
45
46
47
    void AllocateBuffer(size_t batch_size, size_t session_len);
    void AllocatePersistantBuffer(size_t max_batch_size);
    void FreeBuffer();
Li Zhang's avatar
Li Zhang committed
48

Li Zhang's avatar
Li Zhang committed
49
50
    using Requests = std::vector<std::shared_ptr<Request>>;
    using Signal   = std::function<void()>;
Li Zhang's avatar
Li Zhang committed
51

Li Zhang's avatar
Li Zhang committed
52
53
54
55
56
    void RejectInvalidRequests(Requests& stop_reqs, Requests& infer_reqs);

    [[nodiscard]] auto ProcessStopRequests(const Requests& requests) -> std::vector<Signal>;

    void ProcessInferRequests(const Requests& requests);
Li Zhang's avatar
Li Zhang committed
57

Li Zhang's avatar
Li Zhang committed
58
    [[nodiscard]] bool Initialize();
Li Zhang's avatar
Li Zhang committed
59

Li Zhang's avatar
Li Zhang committed
60
    void ContextDecode();
Li Zhang's avatar
Li Zhang committed
61

Li Zhang's avatar
Li Zhang committed
62
63
64
65
66
67
    struct GenerationState {
        int max_init_ctx_len;
        int step;
        int sum_seq_len;
        int max_seq_len;
    };
Li Zhang's avatar
Li Zhang committed
68

Li Zhang's avatar
Li Zhang committed
69
70
    void            InitializeSampling();
    GenerationState InitializeGeneration();
Li Zhang's avatar
Li Zhang committed
71

Li Zhang's avatar
Li Zhang committed
72
    [[nodiscard]] bool Generate(GenerationState& g);
Li Zhang's avatar
Li Zhang committed
73

Li Zhang's avatar
Li Zhang committed
74
    [[nodiscard]] auto Finish(GenerationState& g) -> std::vector<Signal>;
Li Zhang's avatar
Li Zhang committed
75

Li Zhang's avatar
Li Zhang committed
76
    void CompleteRequest(int index, bool is_stop_request, bool is_force_end);
Li Zhang's avatar
Li Zhang committed
77

Li Zhang's avatar
Li Zhang committed
78
    void SetOutputTensors(const GenerationState& g);
Li Zhang's avatar
Li Zhang committed
79

80
    void
Li Zhang's avatar
Li Zhang committed
81
    OutputContextLogits(T* context_decoder_output, const std::vector<int>& indices, const std::vector<int>& lengths);
82

Li Zhang's avatar
Li Zhang committed
83
84
85
86
87
    explicit LlamaBatch(int                              max_batch_size,
                        int                              max_context_token_num,
                        int                              session_len,
                        std::unique_ptr<SequenceManager> sequence_manager,
                        LlamaV2<T>*                      llama);
Li Zhang's avatar
Li Zhang committed
88
89
90

    ~LlamaBatch()
    {
Li Zhang's avatar
Li Zhang committed
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
        TM_LOG_ERROR("~LlamaBatch()");
        model_->shared_state_->request_queue.close();

        internal_thread_.join();

        if (output_thread_.joinable()) {
            {
                std::lock_guard lock{output_mutex_};
                output_stop_token_ = true;
            }
            output_cv_.notify_one();
            output_thread_.join();
        }

        FreeBuffer();
    }

    void Start();

private:
    void InternalThreadEntry(int device_id);

    void OutputThreadEntry();

    void UpdateSequenceStates(BatchState& state, int index);

    void CopyState(const std::pair<BatchState*, int> _src, const std::pair<BatchState*, int>& _dst);

    void SaveRandomState(BatchState& state, int idx);

    void LoadRandomState(BatchState& state, int idx);

    void BarrierSignalRequests(Barrier& barrier, const std::vector<Signal>& signals);

    // analogs to `std::copy_n`
    template<typename U>
    U* Copy(const U* src, size_t count, U* dst)
    {
        check_cuda_error(cudaMemcpyAsync(dst, src, sizeof(U) * count, cudaMemcpyDefault, stream_));
        return dst += count;
    }

    template<typename U>
    U* Clear(U* data, size_t count)
    {
        check_cuda_error(cudaMemsetAsync(data, 0, sizeof(U) * count, stream_));
        return data += count;
Li Zhang's avatar
Li Zhang committed
138
139
140
141
142
143
144
145
    }

private:
    const int  max_batch_size_;
    const int  max_context_token_num_;
    const int  session_len_;
    const int  rank_;
    const bool debug_;
Li Zhang's avatar
Li Zhang committed
146
    const int  step_length_;
Li Zhang's avatar
Li Zhang committed
147

Li Zhang's avatar
Li Zhang committed
148
    LlamaV2<T>* const model_;
Li Zhang's avatar
Li Zhang committed
149

Li Zhang's avatar
Li Zhang committed
150
    std::unique_ptr<SequenceManager> sequence_manager_;
Li Zhang's avatar
Li Zhang committed
151

Li Zhang's avatar
Li Zhang committed
152
153
154
155
156
    ///////////////////////////////////////////////////////////////////
    // k/v cache block buffers
    int*       cu_block_counts_{};
    uintptr_t* k_block_ptrs_{};
    uintptr_t* v_block_ptrs_{};
Li Zhang's avatar
Li Zhang committed
157

Li Zhang's avatar
Li Zhang committed
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
    ////////////////////////////////////////////////////////////////////
    // context decoding temp buffers
    T*   context_decoder_input_buf_{};
    T*   context_decoder_output_buf_{};
    int* context_decoder_ids_buf_{};
    int* input_ids_buf_{};
    // lengths
    int* input_length_buf_{};    // input + cache missed length
    int* context_length_buf_{};  // history length + input_length
    // temp buffers used for block->linear kv-cache conversion
    T*     tmp_k_cache_buf_{};
    T*     tmp_v_cache_buf_{};
    void** tmp_k_ptrs_{};
    void** tmp_v_ptrs_{};
    void** h_tmp_k_ptrs_{};
    void** h_tmp_v_ptrs_{};

    T*   decoder_input_buf_{};
    T*   decoder_output_buf_{};
    int* sequence_lengths_{};  // current sequence length
    int* init_ctx_lens_{};
Li Zhang's avatar
Li Zhang committed
179
180
181

    float* logits_buf_{};        // combined logits
    float* local_logits_buf_{};  // tensor parallel local logits
182
183
    float* context_logits_buf_{};
    float* local_context_logits_buf_{};
Li Zhang's avatar
Li Zhang committed
184

Li Zhang's avatar
Li Zhang committed
185
186
    float* rope_theta_{};

Li Zhang's avatar
Li Zhang committed
187
    // used by dynamic decoder
Li Zhang's avatar
Li Zhang committed
188
    int*      token_ids_buf_{};  // all token IDs in [S, B], indexed using `step`
Li Zhang's avatar
Li Zhang committed
189
190
191
192
    int*      end_ids_buf_{};
    bool*     finished_buf_{};
    uint32_t* seq_limit_len_{};

Li Zhang's avatar
Li Zhang committed
193
194
195
196
197
198
199
    int** request_output_ids_ptrs_{};
    int*  request_output_ids_lens_{};
    int** request_seqlen_ptrs_{};
    int** h_request_output_ids_ptrs_{};
    int*  h_request_output_ids_lens_{};
    int** h_request_seqlen_ptrs_{};

Li Zhang's avatar
Li Zhang committed
200
201
202
203
    // pinned buffers
    int*       h_input_ids_buf_{};
    int*       h_input_length_buf_{};
    uint32_t*  h_seq_limit_len_{};
Li Zhang's avatar
Li Zhang committed
204
205
206
    int*       h_cu_block_counts_{};
    uintptr_t* h_k_block_ptrs_{};
    uintptr_t* h_v_block_ptrs_{};
Li Zhang's avatar
Li Zhang committed
207
208
209
210
211
212
213
214
215

    int*      stop_words_buf_{};  // [batch_size, 2, kMaxStopWordsLen]
    int*      bad_words_buf_{};
    int*      h_runtime_top_k_{};
    float*    h_runtime_top_p_{};
    float*    h_temperature_{};
    float*    h_repetition_penalty_{};
    uint64_t* h_random_seed_{};

Li Zhang's avatar
Li Zhang committed
216
    std::array<BatchState, 3> states_{};
Li Zhang's avatar
Li Zhang committed
217

Li Zhang's avatar
Li Zhang committed
218
219
220
    BatchState* state_{};
    BatchState* back_{};
    BatchState* incoming_{};
Li Zhang's avatar
Li Zhang committed
221

Li Zhang's avatar
Li Zhang committed
222
    uint64_t request_count_{0};
Li Zhang's avatar
Li Zhang committed
223

Li Zhang's avatar
Li Zhang committed
224
225
    // hard limits for persistent buffers
    static constexpr int kMaxStopBadWordsLen = 32;
Li Zhang's avatar
Li Zhang committed
226
227
228
229
230
231
232
233
234
235
236
237
238
239

    const DataType data_type_{};

    bool is_allocate_persistant_buffer_ = false;
    bool is_allocate_buffer_            = false;

    TensorMap inputs_;
    TensorMap outputs_;

    std::unordered_map<std::string, void*> sampling_params_;

    cudaStream_t     stream_{};
    cublasMMWrapper* cublas_wrapper_{};
    IAllocator*      allocator_{};
Li Zhang's avatar
Li Zhang committed
240
241
242
243
244
245
246
247
248

    std::thread internal_thread_;

    // async stream callback utils
    std::thread             output_thread_;
    std::mutex              output_mutex_;
    std::condition_variable output_cv_;
    Requests                output_reqs_;
    bool                    output_stop_token_{false};
Li Zhang's avatar
Li Zhang committed
249
250
};

lvhan028's avatar
lvhan028 committed
251
}  // namespace turbomind