Unverified Commit 55644d82 authored by thatPepe's avatar thatPepe Committed by GitHub
Browse files

add Memory Pool

* Tensor memory pool - initial commit

* Adjusted workspace management to use the memory pool.

* Re-differentiated create and createAsync in Storage

* Set initial memory pool size to 0

* Separated creatAsync and createFromPool

* Removed the redundant WorkspaceHandle

* Adjusted naming

* Set default mempool size to 0 in declaration

* Adjusted format
parent 53cb1582
......@@ -2,24 +2,47 @@
#define ALLOCATOR_HPP
#include "infinicore_infer.h"
#include <map>
#include <memory>
#include <set>
#include <unordered_map>
#include <vector>
class AllocatorBase {
public:
virtual void *alloc(size_t size) = 0;
virtual void release(void *ptr) = 0;
virtual ~AllocatorBase() = default;
};
class WorkspaceAllocator : public AllocatorBase {
private:
void *_memory;
size_t _total_size;
size_t _align;
class MemoryPool : public AllocatorBase {
public:
WorkspaceAllocator(size_t intial_size, size_t align = 256);
~WorkspaceAllocator();
MemoryPool(size_t initialSize = 0);
~MemoryPool();
void *alloc(size_t size) override;
void release(void *ptr) override;
private:
struct Block {
void *base;
void *ptr;
size_t size;
bool is_free;
Block(void *b, void *p, size_t s, bool f)
: base(b), ptr(p), size(s), is_free(f) {}
bool operator<(const Block &other) const {
return ptr < other.ptr;
}
};
void *allocateNewRegion(size_t size);
void insertFreeBlock(Block &&block);
void tryCoalesce(const Block &block);
std::vector<void *> _base_regions;
std::set<Block> _all_blocks;
std::multimap<size_t, std::set<Block>::iterator> _free_blocks;
std::unordered_map<void *, std::set<Block>::iterator> _ptr_to_block;
};
#endif
#include "../allocator.hpp"
#include "../utils.hpp"
#include <algorithm>
#include <iostream>
#include <stdexcept>
MemoryPool::MemoryPool(size_t initialSize) {
allocateNewRegion(initialSize);
}
MemoryPool::~MemoryPool() {
for (void *region : _base_regions) {
RUN_INFINI(infinirtFree(region));
}
}
void *MemoryPool::alloc(size_t size) {
auto it = _free_blocks.lower_bound(size);
if (it == _free_blocks.end()) {
allocateNewRegion(std::max(size, size_t(0)));
it = _free_blocks.lower_bound(size);
if (it == _free_blocks.end()) {
throw std::bad_alloc();
}
}
auto block_it = it->second;
Block block = *block_it;
_free_blocks.erase(it);
_all_blocks.erase(block_it);
if (block.size > size + 256) {
// Split
void *alloc_ptr = block.ptr;
void *rem_ptr = static_cast<char *>(block.ptr) + size;
size_t rem_size = block.size - size;
Block alloc_block(block.base, alloc_ptr, size, false);
Block rem_block(block.base, rem_ptr, rem_size, true);
auto alloc_it = _all_blocks.insert(alloc_block).first;
auto rem_it = _all_blocks.insert(rem_block).first;
_free_blocks.emplace(rem_size, rem_it);
_ptr_to_block[alloc_ptr] = alloc_it;
return alloc_ptr;
} else {
// No split
block.is_free = false;
auto alloc_it = _all_blocks.insert(block).first;
_ptr_to_block[block.ptr] = alloc_it;
return block.ptr;
}
}
void MemoryPool::release(void *ptr) {
auto it = _ptr_to_block.find(ptr);
if (it == _ptr_to_block.end()) {
throw std::runtime_error("Invalid pointer to free");
}
auto block_it = it->second;
Block block = *block_it;
_all_blocks.erase(block_it);
block.is_free = true;
auto new_it = _all_blocks.insert(block).first;
_ptr_to_block.erase(ptr);
tryCoalesce(*new_it);
}
void *MemoryPool::allocateNewRegion(size_t size) {
void *ptr = nullptr;
RUN_INFINI(infinirtMalloc(&ptr, size));
_base_regions.push_back(ptr);
Block new_block(ptr, ptr, size, true);
auto it = _all_blocks.insert(new_block).first;
_free_blocks.emplace(size, it);
return ptr;
}
void MemoryPool::tryCoalesce(const Block &block) {
auto it = _all_blocks.find(block);
if (it == _all_blocks.end()) {
return;
}
Block merged = *it;
auto next = std::next(it);
auto prev = (it == _all_blocks.begin()) ? _all_blocks.end() : std::prev(it);
_all_blocks.erase(it);
_free_blocks.erase(merged.size);
// Coalesce with next
if (next != _all_blocks.end() && next->is_free && static_cast<char *>(merged.ptr) + merged.size == next->ptr) {
_free_blocks.erase(next->size);
merged.size += next->size;
_all_blocks.erase(next);
}
// Coalesce with prev
if (prev != _all_blocks.end() && prev->is_free && static_cast<char *>(prev->ptr) + prev->size == merged.ptr) {
_free_blocks.erase(prev->size);
merged.ptr = prev->ptr;
merged.size += prev->size;
merged.base = prev->base;
_all_blocks.erase(prev);
}
merged.is_free = true;
auto new_it = _all_blocks.insert(merged).first;
_free_blocks.emplace(merged.size, new_it);
}
#include "../allocator.hpp"
#include "../utils.hpp"
inline size_t aligned_size(size_t size_, size_t align) {
return (size_ + align - 1) & ~(align - 1);
}
inline void *allocate(size_t size_) {
void *ptr;
RUN_INFINI(infinirtMalloc(&ptr, size_));
return ptr;
}
WorkspaceAllocator::WorkspaceAllocator(size_t initial_size_, size_t align) {
_align = align;
_total_size = 0;
_memory = nullptr;
if (initial_size_ > 0) {
_total_size = aligned_size(initial_size_, _align);
_memory = allocate(_total_size);
}
}
void *WorkspaceAllocator::alloc(size_t new_size) {
if (_total_size < new_size) {
if (_total_size != 0) {
RUN_INFINI(infinirtDeviceSynchronize());
RUN_INFINI(infinirtFree(_memory));
}
_total_size = aligned_size(new_size, _align);
_memory = allocate(_total_size);
}
return _memory;
}
void WorkspaceAllocator::release(void *ptr) {
}
WorkspaceAllocator::~WorkspaceAllocator() {
if (_memory != nullptr) {
RUN_INFINI(infinirtDeviceSynchronize());
RUN_INFINI(infinirtFree(_memory));
}
}
\ No newline at end of file
......@@ -41,6 +41,8 @@ void createDeviceResource(DeviceResource *rsrc, const JiugeMeta *meta,
getFFNDown(meta, weights, layer, idev, ndev));
}
auto memory_pool = std::make_shared<MemoryPool>(128 * 1024 * 1024);
*rsrc = DeviceResource{
device,
dev_id,
......@@ -59,7 +61,7 @@ void createDeviceResource(DeviceResource *rsrc, const JiugeMeta *meta,
w_ffn_down,
stream,
comm,
std::make_unique<WorkspaceAllocator>(0),
memory_pool,
};
RUN_INFINI(infinirtDeviceSynchronize());
}
......@@ -100,7 +102,6 @@ void releaseDeviceResource(DeviceResource &res) {
t.reset();
}
res.w_ffn_down.clear();
res.workspace_allocator.reset();
infiniopDestroyHandle(res.handle);
res.handle = nullptr;
infinirtStreamDestroy(res.stream);
......@@ -130,13 +131,13 @@ void inferDeviceBatch(const JiugeMeta &meta, DeviceResource &rsrc,
bool has_qkv_bias = rsrc.b_attn_qkv.size() > 0;
// Allocate buffers
auto logits_in = Tensor::buffer(dt_logits, {ntok, d}, stream);
auto logits_out = Tensor::buffer(dt_logits, {ntok, d}, stream);
auto qkv_buf = Tensor::buffer(dt_logits, {ntok, (nh + nkvh * 2) * dh}, stream);
auto gate_up_buf = Tensor::buffer(dt_logits, {ntok, 2 * di}, stream);
auto o_buf = Tensor::buffer(dt_logits, {ntok, nh * dh}, stream);
auto prob_buf = Tensor::buffer(dt_logits, {nreq, dvoc}, stream);
auto result_buf = Tensor::buffer(INFINI_DTYPE_I64, {nreq}, stream);
auto logits_in = Tensor::buffer(dt_logits, {ntok, d}, rsrc.memory_pool);
auto logits_out = Tensor::buffer(dt_logits, {ntok, d}, rsrc.memory_pool);
auto qkv_buf = Tensor::buffer(dt_logits, {ntok, (nh + nkvh * 2) * dh}, rsrc.memory_pool);
auto gate_up_buf = Tensor::buffer(dt_logits, {ntok, 2 * di}, rsrc.memory_pool);
auto o_buf = Tensor::buffer(dt_logits, {ntok, nh * dh}, rsrc.memory_pool);
auto prob_buf = Tensor::buffer(dt_logits, {nreq, dvoc}, rsrc.memory_pool);
auto result_buf = Tensor::buffer(INFINI_DTYPE_I64, {nreq}, rsrc.memory_pool);
auto result_cpu = std::vector<int64_t>(nreq);
// Prepare inputs
......@@ -153,7 +154,7 @@ void inferDeviceBatch(const JiugeMeta &meta, DeviceResource &rsrc,
if (rsrc.device == INFINI_DEVICE_CPU) {
pos_ids_buf = Tensor::weight(batch_pos_ids.data(), INFINI_DTYPE_U32, {ntok});
} else {
pos_ids_buf = Tensor::buffer(INFINI_DTYPE_U32, {ntok}, stream);
pos_ids_buf = Tensor::buffer(INFINI_DTYPE_U32, {ntok}, rsrc.memory_pool);
RUN_INFINI(infinirtMemcpyAsync(pos_ids_buf->data(), batch_pos_ids.data(), sizeof(uint32_t) * ntok,
INFINIRT_MEMCPY_H2D, stream));
}
......@@ -164,7 +165,6 @@ void inferDeviceBatch(const JiugeMeta &meta, DeviceResource &rsrc,
}
// Prepare operators and workspace
void *workspace;
size_t workspace_size = 0, temp_size = 0;
// attn & mlp rmsnorm
infiniopRMSNormDescriptor_t desc_norm;
......@@ -270,9 +270,9 @@ void inferDeviceBatch(const JiugeMeta &meta, DeviceResource &rsrc,
token_offset += seq_len;
}
auto qk_buf = Tensor::buffer(dt_logits, {nh, max_qk_size}, stream);
auto rearrange_q_buf = Tensor::buffer(dt_logits, {nkvh, ngroup * max_seq_len, dh}, stream);
auto attn_val_buf = Tensor::buffer(dt_logits, {nh, max_seq_len, dh}, stream);
auto qk_buf = Tensor::buffer(dt_logits, {nh, max_qk_size}, rsrc.memory_pool);
auto rearrange_q_buf = Tensor::buffer(dt_logits, {nkvh, ngroup * max_seq_len, dh}, rsrc.memory_pool);
auto attn_val_buf = Tensor::buffer(dt_logits, {nh, max_seq_len, dh}, rsrc.memory_pool);
// MLP descriptors
infiniopGemmDescriptor_t desc_ffn_gate_up, desc_ffn_down;
......@@ -317,7 +317,8 @@ void inferDeviceBatch(const JiugeMeta &meta, DeviceResource &rsrc,
RUN_INFINI(infiniopGetRandomSampleWorkspaceSize(desc_sample, &temp_size));
workspace_size = std::max(workspace_size, temp_size);
// Allocate workspace
workspace = rsrc.workspace_allocator->alloc(workspace_size);
std::shared_ptr<Storage> workspace_storage = Storage::createFromPool(workspace_size, rsrc.memory_pool);
void *workspace = workspace_storage->memory;
// Compute
for (uint32_t layer = 0; layer < nlayer; layer++) {
......
......@@ -27,7 +27,7 @@ struct DeviceResource {
// Communicator
infinicclComm_t comm;
std::unique_ptr<WorkspaceAllocator> workspace_allocator;
std::shared_ptr<MemoryPool> memory_pool;
};
struct InferState {
......
#ifndef INFER_TENSOR_H
#define INFER_TENSOR_H
#include "allocator.hpp"
#include "infinicore_infer.h"
#include "utils.hpp"
#include <memory>
#include <string>
#include <vector>
struct Storage {
class Storage {
public:
void *memory;
size_t size;
infiniDevice_t device_type;
int device_id;
std::shared_ptr<MemoryPool> memory_pool;
static std::shared_ptr<Storage> create(size_t size);
static std::shared_ptr<Storage> createAsync(size_t size, infinirtStream_t stream = nullptr);
static std::shared_ptr<Storage> createFromPool(size_t size, std::shared_ptr<MemoryPool> pool = nullptr);
static std::shared_ptr<Storage> createHost(size_t size);
~Storage();
};
......@@ -68,7 +72,7 @@ private:
public:
static std::shared_ptr<Tensor> buffer(infiniDtype_t dtype,
const std::vector<size_t> &shape,
infinirtStream_t stream = nullptr);
std::shared_ptr<MemoryPool> pool = nullptr);
static std::shared_ptr<Tensor> weight(void *host_data,
infiniDtype_t dtype,
const std::vector<size_t> &shape);
......
#include "../allocator.hpp"
#include "../tensor.hpp"
std::shared_ptr<Storage> Storage::create(size_t size) {
......@@ -16,19 +17,37 @@ std::shared_ptr<Storage> Storage::createAsync(size_t size, infinirtStream_t stre
return storage;
}
std::shared_ptr<Storage> Storage::createFromPool(size_t size, std::shared_ptr<MemoryPool> pool) {
auto storage = std::make_shared<Storage>();
storage->memory_pool = pool;
if (pool) {
storage->memory = pool->alloc(size);
} else {
RUN_INFINI(infinirtMalloc(&storage->memory, size));
}
storage->size = size;
RUN_INFINI(infinirtGetDevice(&storage->device_type, &storage->device_id));
return storage;
}
std::shared_ptr<Storage> Storage::createHost(size_t size) {
auto storage = std::make_shared<Storage>();
RUN_INFINI(infinirtMallocHost(&storage->memory, size));
storage->size = size;
storage->device_type = INFINI_DEVICE_CPU;
storage->device_id = 0;
storage->memory_pool = nullptr; // No pool for host memory
return storage;
}
Storage::~Storage() {
if (device_type == INFINI_DEVICE_CPU) {
RUN_INFINI(infinirtFreeHost(memory));
} else {
if (memory_pool) {
memory_pool->release(memory);
} else {
RUN_INFINI(infinirtFree(memory));
}
}
}
......@@ -68,7 +68,7 @@ std::shared_ptr<TensorDesc> Tensor::desc() const { return TensorDesc::create(thi
std::shared_ptr<Tensor> Tensor::buffer(infiniDtype_t dtype,
const std::vector<size_t> &shape,
infinirtStream_t stream) {
std::shared_ptr<MemoryPool> pool) {
std::shared_ptr<Tensor> tensor = std::make_shared<Tensor>();
tensor->_dtype = dtype;
auto ndim = shape.size();
......@@ -83,7 +83,7 @@ std::shared_ptr<Tensor> Tensor::buffer(infiniDtype_t dtype,
}
}
tensor->_strides = strides;
tensor->_storage = Storage::createAsync(size, stream);
tensor->_storage = Storage::createFromPool(size, pool);
tensor->_data = tensor->_storage->memory;
infiniopCreateTensorDescriptor(&tensor->_desc, ndim, tensor->_shape.data(),
strides.data(), dtype);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment