Commit 578fd2d7 authored by Ville Pietilä's avatar Ville Pietilä
Browse files

Add alternative memory pool implementation.

parent db108e89
......@@ -8,15 +8,25 @@
#include <map>
#include <queue>
#include <mutex>
#include <cstddef>
#include <limits>
#include "unistd.h"
namespace ck {
namespace memory {
class MemPool
class IMemPool
{
public:
MemPool() :
virtual ~IMemPool() = default;
virtual void* allocate(std::size_t sizeInBytes) = 0;
virtual void deallocate(void* p, std::size_t sizeInBytes) = 0;
};
class DynamicMemPool : public IMemPool
{
public:
DynamicMemPool() :
enableLogging_(ck::EnvIsEnabled(CK_ENV(CK_LOGGING))),
pid_(getpid())
{
......@@ -24,7 +34,7 @@ namespace memory {
std::cout << "[ MemPool ] Created memory pool for process " << pid_ << std::endl;
}
~MemPool()
~DynamicMemPool() override
{
if (enableLogging_)
std::cout << "[ MemPool ] Deleting pool for process " << pid_ << "..."<< std::endl;
......@@ -39,27 +49,46 @@ namespace memory {
std::cout << "[ MemPool ] Deleted pool for process " << pid_ << std::endl;
}
void* allocate(std::size_t sizeInBytes)
void* allocate(std::size_t sizeInBytes) override
{
std::lock_guard<std::mutex> lock(mutex_);
// If there is a memory pool for the requested size, return the memory from the pool.
if (memory_pool_.find(sizeInBytes) != memory_pool_.end() && !memory_pool_[sizeInBytes].empty())
{
if (enableLogging_)
{
std::cout << "[ MemPool ] Reusing memory from pool for size " << sizeInBytes << std::endl;
}
void* p = memory_pool_[sizeInBytes].front();
memory_pool_[sizeInBytes].pop();
memPoolSizeInBytes_ -= sizeInBytes;
if (enableLogging_)
{
std::cout << "[ MemPool ] Total memory in pool: " << memPoolSizeInBytes_ << std::endl;
}
return p;
}
if (enableLogging_)
{
std::cout << "[ MemPool ] Allocating new memory for size " << sizeInBytes << std::endl;
}
void* p;
hip_check_error(hipHostMalloc(&p, sizeInBytes));
constexpr unsigned flags = hipDeviceScheduleYield; //hipDeviceScheduleSpin doesn not work, leads to freezing.
hip_check_error(hipHostMalloc(&p, sizeInBytes, flags));
return p;
}
void deallocate(void* p, std::size_t sizeInBytes)
void deallocate(void* p, std::size_t sizeInBytes) override
{
std::lock_guard<std::mutex> lock(mutex_);
if (memory_pool_.find(sizeInBytes) != memory_pool_.end())
{
if (enableLogging_)
{
std::cout << "[ MemPool ] Adding memory to pool for size " << sizeInBytes << std::endl;
}
auto& q = memory_pool_[sizeInBytes];
q.push(p);
memPoolSizeInBytes_ += sizeInBytes;
......@@ -75,14 +104,22 @@ namespace memory {
}
}
else {
if (enableLogging_)
{
std::cout << "[ MemPool ] Creating new pool queue for size " << sizeInBytes << std::endl;
}
std::queue<void*> q;
q.push(p);
memory_pool_.insert(std::make_pair(sizeInBytes, std::move(q)));
memPoolSizeInBytes_ += sizeInBytes;
}
if (enableLogging_)
{
std::cout << "[ MemPool ] Total memory in pool: " << memPoolSizeInBytes_ << std::endl;
}
}
private:
constexpr static size_t maxMemoryPoolSizeInBytes_ = 10 * 1024 * 1024; // 10MB
constexpr static size_t maxMemoryPoolSizeInBytes_ = 100 * 1024 * 1024; // 100MB
static void clearMemoryPoolQueue(std::queue<void*>& q)
{
......@@ -103,11 +140,129 @@ namespace memory {
int pid_{-1};
};
class StaticMemPool : public IMemPool
{
public:
StaticMemPool() :
enableLogging_(ck::EnvIsEnabled(CK_ENV(CK_LOGGING))),
pid_(getpid()),
offsetInBytes_(0)
{
hip_check_error(hipHostMalloc(&pinnedMemoryBaseAddress_, memoryPoolSizeInBytes_));
if (enableLogging_)
{
std::cout << "[ MemPool ] Created memory pool with " << memoryPoolSizeInBytes_ << " bytes for process " << pid_ << std::endl;
}
}
~StaticMemPool() override
{
hip_check_error(hipHostFree(pinnedMemoryBaseAddress_));
if (enableLogging_)
{
std::cout << "[ MemPool ] Deleted pool for process " << pid_ << std::endl;
}
}
void* allocate(std::size_t sizeInBytes) override
{
std::lock_guard<std::mutex> lock(mutex_);
if (offsetInBytes_ + sizeInBytes < memoryPoolSizeInBytes_)
{
// Return new memory from the preallocated block
void* p = pinnedMemoryBaseAddress_ + offsetInBytes_;
offsetInBytes_ += sizeInBytes;
if (enableLogging_)
{
const auto pct = 100.0f * static_cast<float>(offsetInBytes_) / memoryPoolSizeInBytes_;
std::cout << "[ MemPool ] Allocation: return new memory, pinned host memory usage: " << pct << "%." << std::endl;
}
return p;
}
else if (memory_pool_.find(sizeInBytes) != memory_pool_.end() && !memory_pool_[sizeInBytes].empty())
{
// If there is a memory pool for the requested size, return memory from the pool.
if (enableLogging_)
{
std::cout << "[ MemPool ] Allocation: reusing memory from pool for size " << sizeInBytes << std::endl;
}
void* p = memory_pool_[sizeInBytes].front();
memory_pool_[sizeInBytes].pop();
return p;
}
else
{
// Try to find memory from the queue that is nearest in size.
std::pair<size_t, std::queue<void*>> nearest_queue = {std::numeric_limits<size_t>::max(), std::queue<void*>()};
for (auto& [size, q] : memory_pool_)
{
if (size > sizeInBytes && !q.empty() && size < nearest_queue.first)
{
nearest_queue = {size, q};
}
}
if (nearest_queue.first != std::numeric_limits<size_t>::max())
{
if (enableLogging_)
{
std::cout << "[ MemPool ] Allocation: reusing memory from pool for size " << nearest_queue.first <<
" to allocate " << sizeInBytes << "bytes" <<std::endl;
}
void* p = nearest_queue.second.front();
nearest_queue.second.pop();
return p;
}
// TODO: Fix the pinned host memory fragmentation. This should be fairly rare case.
if (enableLogging_)
{
std::cerr << "[ MemPool ] Internal error: memory pool exhausted." << std::endl;
}
throw std::runtime_error("Memory pool exhausted.");
}
}
void deallocate(void* p, std::size_t sizeInBytes) override
{
std::lock_guard<std::mutex> lock(mutex_);
if (memory_pool_.find(sizeInBytes) != memory_pool_.end())
{
if (enableLogging_)
{
std::cout << "[ MemPool ] Deallocate: Adding memory to pool for size " << sizeInBytes << std::endl;
}
auto& q = memory_pool_[sizeInBytes];
q.push(p);
}
else {
if (enableLogging_)
{
std::cout << "[ MemPool ] Deallocate: Creating new pool queue for size " << sizeInBytes << std::endl;
}
std::queue<void*> q;
q.push(p);
memory_pool_.insert(std::make_pair(sizeInBytes, std::move(q)));
}
}
private:
constexpr static size_t memoryPoolSizeInBytes_ = 10 * 1024 * 1024; // 10MB
std::mutex mutex_; // Mutex to protect access to the memory pool.
std::map<size_t, std::queue<void*>> memory_pool_{};
std::byte* pinnedMemoryBaseAddress_;
bool enableLogging_;
int pid_;
int offsetInBytes_;
};
class PinnedHostMemoryAllocatorBase
{
protected:
static MemPool& get_memory_pool() {
static MemPool memory_pool;
static IMemPool& get_memory_pool() {
//static DynamicMemPool memory_pool;
static StaticMemPool memory_pool;
return memory_pool;
}
};
......@@ -132,7 +287,7 @@ namespace memory {
PinnedHostMemoryAllocator() = default;
template <typename U>
PinnedHostMemoryAllocator(const PinnedHostMemoryAllocator<U>& other) : std::allocator<T>(other)
PinnedHostMemoryAllocator(const PinnedHostMemoryAllocator<U>&)
{}
T* allocate(std::size_t n) {
......@@ -163,6 +318,5 @@ namespace memory {
template <typename T, typename U>
bool operator!=(const PinnedHostMemoryAllocator<T>&, const PinnedHostMemoryAllocator<U>&) { return false; }
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment