Commit ab7855a3 authored by Ville Pietilä's avatar Ville Pietilä
Browse files

Configure selection between static vs dynamic memory pool using environemtn variables.

parent a19235d4
...@@ -36,13 +36,13 @@ namespace memory { ...@@ -36,13 +36,13 @@ namespace memory {
pid_(getpid()) pid_(getpid())
{ {
if (enableLogging_) if (enableLogging_)
std::cout << "[ MemPool ] Created memory pool for process " << pid_ << std::endl; std::cout << "[ DynamicMemPool ] Created memory pool for process " << pid_ << std::endl;
} }
~DynamicMemPool() override ~DynamicMemPool() override
{ {
if (enableLogging_) if (enableLogging_)
std::cout << "[ MemPool ] Deleting pool for process " << pid_ << "..."<< std::endl; std::cout << "[ DynamicMemPool ] Deleting pool for process " << pid_ << "..."<< std::endl;
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
for (auto& [size, q] : memory_pool_) for (auto& [size, q] : memory_pool_)
...@@ -51,7 +51,7 @@ namespace memory { ...@@ -51,7 +51,7 @@ namespace memory {
} }
if (enableLogging_) if (enableLogging_)
std::cout << "[ MemPool ] Deleted pool for process " << pid_ << std::endl; std::cout << "[ DynamicMemPool ] Deleted pool for process " << pid_ << std::endl;
} }
void* allocate(std::size_t sizeInBytes) override void* allocate(std::size_t sizeInBytes) override
...@@ -62,7 +62,7 @@ namespace memory { ...@@ -62,7 +62,7 @@ namespace memory {
{ {
if (enableLogging_) if (enableLogging_)
{ {
std::cout << "[ MemPool ] Reusing memory from pool for size " << sizeInBytes << std::endl; std::cout << "[ DynamicMemPool ] Reusing memory from pool for size " << sizeInBytes << std::endl;
} }
void* p = memory_pool_[sizeInBytes].front(); void* p = memory_pool_[sizeInBytes].front();
memory_pool_[sizeInBytes].pop(); memory_pool_[sizeInBytes].pop();
...@@ -70,14 +70,14 @@ namespace memory { ...@@ -70,14 +70,14 @@ namespace memory {
if (enableLogging_) if (enableLogging_)
{ {
std::cout << "[ MemPool ] Total memory in pool: " << memPoolSizeInBytes_ << std::endl; std::cout << "[ DynamicMemPool ] Total memory in pool: " << memPoolSizeInBytes_ << std::endl;
} }
return p; return p;
} }
if (enableLogging_) if (enableLogging_)
{ {
std::cout << "[ MemPool ] Allocating new memory for size " << sizeInBytes << std::endl; std::cout << "[ DynamicMemPool ] Allocating new memory for size " << sizeInBytes << std::endl;
} }
void* p; void* p;
constexpr unsigned flags = hipDeviceScheduleYield; //hipDeviceScheduleSpin doesn not work, leads to freezing. constexpr unsigned flags = hipDeviceScheduleYield; //hipDeviceScheduleSpin doesn not work, leads to freezing.
...@@ -92,7 +92,7 @@ namespace memory { ...@@ -92,7 +92,7 @@ namespace memory {
{ {
if (enableLogging_) if (enableLogging_)
{ {
std::cout << "[ MemPool ] Adding memory to pool for size " << sizeInBytes << std::endl; std::cout << "[ DynamicMemPool ] Adding memory to pool for size " << sizeInBytes << std::endl;
} }
auto& q = memory_pool_[sizeInBytes]; auto& q = memory_pool_[sizeInBytes];
q.push(p); q.push(p);
...@@ -102,7 +102,7 @@ namespace memory { ...@@ -102,7 +102,7 @@ namespace memory {
{ {
if (enableLogging_) if (enableLogging_)
{ {
std::cout << "[ MemPool ] Clearing pool queue for size " << sizeInBytes << std::endl; std::cout << "[ DynamicMemPool ] Clearing pool queue for size " << sizeInBytes << std::endl;
} }
memPoolSizeInBytes_ -= sizeInBytes * q.size(); memPoolSizeInBytes_ -= sizeInBytes * q.size();
clearMemoryPoolQueue(q); clearMemoryPoolQueue(q);
...@@ -111,7 +111,7 @@ namespace memory { ...@@ -111,7 +111,7 @@ namespace memory {
else { else {
if (enableLogging_) if (enableLogging_)
{ {
std::cout << "[ MemPool ] Creating new pool queue for size " << sizeInBytes << std::endl; std::cout << "[ DynamicMemPool ] Creating new pool queue for size " << sizeInBytes << std::endl;
} }
std::queue<void*> q; std::queue<void*> q;
q.push(p); q.push(p);
...@@ -120,7 +120,7 @@ namespace memory { ...@@ -120,7 +120,7 @@ namespace memory {
} }
if (enableLogging_) if (enableLogging_)
{ {
std::cout << "[ MemPool ] Total memory in pool: " << memPoolSizeInBytes_ << std::endl; std::cout << "[ DynamicMemPool ] Total memory in pool: " << memPoolSizeInBytes_ << std::endl;
} }
} }
private: private:
...@@ -131,9 +131,7 @@ namespace memory { ...@@ -131,9 +131,7 @@ namespace memory {
while (!q.empty()) while (!q.empty())
{ {
void* p = q.front(); void* p = q.front();
q.pop(); q.pop();
// This performs an implicit hipDeviceSynchronize().
// Does this create a deadlock situation when grouped GEMM is used in distributed training with NCCL?
hip_check_error(hipHostFree(p)); hip_check_error(hipHostFree(p));
} }
} }
...@@ -151,12 +149,13 @@ namespace memory { ...@@ -151,12 +149,13 @@ namespace memory {
StaticMemPool() : StaticMemPool() :
enableLogging_(ck::EnvIsEnabled(CK_ENV(CK_LOGGING))), enableLogging_(ck::EnvIsEnabled(CK_ENV(CK_LOGGING))),
pid_(getpid()), pid_(getpid()),
offsetInBytes_(0) offsetInBytes_(0),
preferNewAllocation_(ck::EnvIsEnabled(CK_ENV(CK_PREFER_NEW_PINNED_MEM_ALLOCATION)))
{ {
hip_check_error(hipHostMalloc(&pinnedMemoryBaseAddress_, memoryPoolSizeInBytes_)); hip_check_error(hipHostMalloc(&pinnedMemoryBaseAddress_, memoryPoolSizeInBytes_));
if (enableLogging_) if (enableLogging_)
{ {
std::cout << "[ MemPool ] Created memory pool with " << memoryPoolSizeInBytes_ << " bytes for process " << pid_ << std::endl; std::cout << "[ StaticMemPool ] Created memory pool with " << memoryPoolSizeInBytes_ << " bytes for process " << pid_ << std::endl;
} }
} }
...@@ -165,7 +164,7 @@ namespace memory { ...@@ -165,7 +164,7 @@ namespace memory {
hip_check_error(hipHostFree(pinnedMemoryBaseAddress_)); hip_check_error(hipHostFree(pinnedMemoryBaseAddress_));
if (enableLogging_) if (enableLogging_)
{ {
std::cout << "[ MemPool ] Deleted pool for process " << pid_ << std::endl; std::cout << "[ StaticMemPool ] Deleted pool for process " << pid_ << std::endl;
} }
} }
...@@ -173,60 +172,27 @@ namespace memory { ...@@ -173,60 +172,27 @@ namespace memory {
{ {
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
if (offsetInBytes_ + sizeInBytes < memoryPoolSizeInBytes_) if (preferNewAllocation_ && offsetInBytes_ + sizeInBytes < memoryPoolSizeInBytes_)
{ {
// Return new memory from the preallocated block return allocateNewMemory(sizeInBytes);
void* p = pinnedMemoryBaseAddress_ + offsetInBytes_;
offsetInBytes_ += sizeInBytes;
if (enableLogging_)
{
const auto pct = 100.0f * static_cast<float>(offsetInBytes_) / memoryPoolSizeInBytes_;
std::cout << "[ MemPool ] Allocation: return new memory, pinned host memory usage: " << pct << "%." << std::endl;
}
return p;
} }
else if (memory_pool_.find(sizeInBytes) != memory_pool_.end() && !memory_pool_[sizeInBytes].empty())
void* ptr = tryAllocateMemoryFromPool(sizeInBytes);
if (ptr)
{ {
// If there is a memory pool for the requested size, return memory from the pool. return ptr;
if (enableLogging_)
{
std::cout << "[ MemPool ] Allocation: reusing memory from pool for size " << sizeInBytes << std::endl;
}
void* p = memory_pool_[sizeInBytes].front();
memory_pool_[sizeInBytes].pop();
return p;
} }
else
{
// Try to find memory from the queue that is nearest in size.
std::pair<size_t, std::queue<void*>> nearest_queue = {std::numeric_limits<size_t>::max(), std::queue<void*>()};
for (auto& [size, q] : memory_pool_)
{
if (size > sizeInBytes && !q.empty() && size < nearest_queue.first)
{
nearest_queue = {size, q};
}
}
if (nearest_queue.first != std::numeric_limits<size_t>::max()) if (offsetInBytes_ + sizeInBytes < memoryPoolSizeInBytes_)
{ {
if (enableLogging_) return allocateNewMemory(sizeInBytes);
{ }
std::cout << "[ MemPool ] Allocation: reusing memory from pool for size " << nearest_queue.first <<
" to allocate " << sizeInBytes << "bytes" <<std::endl;
}
void* p = nearest_queue.second.front();
nearest_queue.second.pop();
return p;
}
// TODO: Fix the pinned host memory fragmentation. This should be fairly rare case. if (enableLogging_)
if (enableLogging_) {
{ std::cerr << "[ StaticMemPool ] Memory pool exausted." << std::endl;
std::cerr << "[ MemPool ] Internal error: memory pool exhausted." << std::endl; }
} throw std::runtime_error("Memory pool exausted");
throw std::runtime_error("Memory pool exhausted.");
}
} }
void deallocate(void* p, std::size_t sizeInBytes) override void deallocate(void* p, std::size_t sizeInBytes) override
...@@ -237,7 +203,7 @@ namespace memory { ...@@ -237,7 +203,7 @@ namespace memory {
{ {
if (enableLogging_) if (enableLogging_)
{ {
std::cout << "[ MemPool ] Deallocate: Adding memory to pool for size " << sizeInBytes << std::endl; std::cout << "[ StaticMemPool ] Deallocate: Adding memory to pool for size " << sizeInBytes << std::endl;
} }
auto& q = memory_pool_[sizeInBytes]; auto& q = memory_pool_[sizeInBytes];
q.push(p); q.push(p);
...@@ -245,7 +211,7 @@ namespace memory { ...@@ -245,7 +211,7 @@ namespace memory {
else { else {
if (enableLogging_) if (enableLogging_)
{ {
std::cout << "[ MemPool ] Deallocate: Creating new pool queue for size " << sizeInBytes << std::endl; std::cout << "[ StaticMemPool ] Deallocate: Creating new pool queue for size " << sizeInBytes << std::endl;
} }
std::queue<void*> q; std::queue<void*> q;
q.push(p); q.push(p);
...@@ -260,15 +226,69 @@ namespace memory { ...@@ -260,15 +226,69 @@ namespace memory {
bool enableLogging_; bool enableLogging_;
int pid_; int pid_;
int offsetInBytes_; int offsetInBytes_;
bool preferNewAllocation_;
void* allocateNewMemory(size_t sizeInBytes)
{
// Return new memory from the preallocated block
void* p = pinnedMemoryBaseAddress_ + offsetInBytes_;
offsetInBytes_ += sizeInBytes;
if (enableLogging_)
{
const auto pct = 100.0f * static_cast<float>(offsetInBytes_) / memoryPoolSizeInBytes_;
std::cout << "[ StaticMemPool ] Allocation: return new memory, pinned host memory usage: " << pct << "%." << std::endl;
}
return p;
}
void* tryAllocateMemoryFromPool(size_t sizeInBytes)
{
if (memory_pool_.find(sizeInBytes) != memory_pool_.end() && !memory_pool_[sizeInBytes].empty())
{
// If there is a memory pool for the requested size, return memory from the pool.
if (enableLogging_)
{
std::cout << "[ StaticMemPool ] Allocation: reusing memory from pool for size " << sizeInBytes << std::endl;
}
void* p = memory_pool_[sizeInBytes].front();
memory_pool_[sizeInBytes].pop();
return p;
}
// Try to find memory from the queue that is nearest in size.
std::pair<size_t, std::queue<void*>> nearest_queue = {std::numeric_limits<size_t>::max(), std::queue<void*>()};
for (auto& [size, q] : memory_pool_)
{
if (size > sizeInBytes && !q.empty() && size < nearest_queue.first)
{
nearest_queue = {size, q};
}
}
if (nearest_queue.first != std::numeric_limits<size_t>::max())
{
if (enableLogging_)
{
std::cout << "[ StaticMemPool ] Allocation: reusing memory from pool for size " << nearest_queue.first <<
" to allocate " << sizeInBytes << "bytes" <<std::endl;
}
void* p = nearest_queue.second.front();
nearest_queue.second.pop();
return p;
}
return nullptr;
}
}; };
class PinnedHostMemoryAllocatorBase class PinnedHostMemoryAllocatorBase
{ {
protected: protected:
static IMemPool& get_memory_pool() { static IMemPool* get_memory_pool() {
//static DynamicMemPool memory_pool; static DynamicMemPool dynamic_memory_pool;
static StaticMemPool memory_pool; static StaticMemPool static_memory_pool;
return memory_pool; static bool use_dynamic_mem_pool = ck::EnvIsEnabled(CK_ENV(CK_USE_DYNAMIC_MEM_POOL));
return use_dynamic_mem_pool ? static_cast<IMemPool*>(&dynamic_memory_pool) : static_cast<IMemPool*>(&static_memory_pool);
} }
}; };
...@@ -296,9 +316,9 @@ namespace memory { ...@@ -296,9 +316,9 @@ namespace memory {
{} {}
T* allocate(std::size_t n) { T* allocate(std::size_t n) {
auto& memory_pool = get_memory_pool(); auto* memory_pool = get_memory_pool();
const size_t sizeInBytes = n * sizeof(T); const size_t sizeInBytes = n * sizeof(T);
return static_cast<T*>(memory_pool.allocate(sizeInBytes)); return static_cast<T*>(memory_pool->allocate(sizeInBytes));
} }
void deallocate(T* p, std::size_t n) void deallocate(T* p, std::size_t n)
...@@ -310,9 +330,9 @@ namespace memory { ...@@ -310,9 +330,9 @@ namespace memory {
} }
} }
auto& memory_pool = get_memory_pool(); auto* memory_pool = get_memory_pool();
const size_t sizeInBytes = n * sizeof(T); const size_t sizeInBytes = n * sizeof(T);
memory_pool.deallocate(p, sizeInBytes); memory_pool->deallocate(p, sizeInBytes);
} }
template<typename U, typename... Args> template<typename U, typename... Args>
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment