Unverified Commit cde3b2a5 authored by Olga Andreeva's avatar Olga Andreeva Committed by GitHub
Browse files

fix: enabling cuda mem pools for vectorized transfer in kvbm (#5475)


Signed-off-by: default avatarOlga Andreeva <124622579+oandreeva-nv@users.noreply.github.com>
parent 64ba7dd0
...@@ -1619,6 +1619,7 @@ dependencies = [ ...@@ -1619,6 +1619,7 @@ dependencies = [
"derive_builder", "derive_builder",
"dialoguer", "dialoguer",
"dynamo-async-openai", "dynamo-async-openai",
"dynamo-memory",
"dynamo-parsers", "dynamo-parsers",
"dynamo-runtime", "dynamo-runtime",
"either", "either",
...@@ -1683,6 +1684,22 @@ dependencies = [ ...@@ -1683,6 +1684,22 @@ dependencies = [
"zeromq", "zeromq",
] ]
[[package]]
name = "dynamo-memory"
version = "0.8.0"
dependencies = [
"anyhow",
"cudarc 0.17.8",
"dynamo-config",
"libc",
"nix 0.30.1",
"nixl-sys",
"offset-allocator",
"serde",
"thiserror 2.0.17",
"tracing",
]
[[package]] [[package]]
name = "dynamo-parsers" name = "dynamo-parsers"
version = "0.8.0" version = "0.8.0"
......
...@@ -21,7 +21,7 @@ testing-full = ["testing-cuda", "testing-nixl"] ...@@ -21,7 +21,7 @@ testing-full = ["testing-cuda", "testing-nixl"]
testing-cuda = ["dep:cudarc"] testing-cuda = ["dep:cudarc"]
testing-nixl = ["dep:nixl-sys"] testing-nixl = ["dep:nixl-sys"]
testing-etcd = [] testing-etcd = []
block-manager = ["dep:nixl-sys", "dep:cudarc", "dep:nix", "dep:aligned-vec"] block-manager = ["dep:nixl-sys", "dep:cudarc", "dep:nix", "dep:aligned-vec", "dep:dynamo-memory"]
block-manager-bench = ["block-manager", "testing-full", "dep:clap", "dep:indicatif"] block-manager-bench = ["block-manager", "testing-full", "dep:clap", "dep:indicatif"]
cuda = ["dep:cudarc"] cuda = ["dep:cudarc"]
integration = ["dynamo-runtime/integration"] integration = ["dynamo-runtime/integration"]
......
...@@ -242,7 +242,7 @@ where ...@@ -242,7 +242,7 @@ where
)?; )?;
} }
} }
} };
ctx.cuda_event(tx)?; ctx.cuda_event(tx)?;
Ok(rx) Ok(rx)
......
...@@ -6,6 +6,8 @@ use super::*; ...@@ -6,6 +6,8 @@ use super::*;
use cudarc::driver::{CudaEvent, CudaStream, sys::CUevent_flags}; use cudarc::driver::{CudaEvent, CudaStream, sys::CUevent_flags};
use nixl_sys::Agent as NixlAgent; use nixl_sys::Agent as NixlAgent;
use anyhow::Result;
use dynamo_memory::pool::CudaMemPool;
use dynamo_runtime::utils::pool::{Returnable, SyncPool, SyncPoolItem}; use dynamo_runtime::utils::pool::{Returnable, SyncPool, SyncPoolItem};
use std::sync::Arc; use std::sync::Arc;
use std::thread::JoinHandle; use std::thread::JoinHandle;
...@@ -13,6 +15,10 @@ use tokio::runtime::Handle; ...@@ -13,6 +15,10 @@ use tokio::runtime::Handle;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
// ============================================================================
// Legacy: Pinned Buffer Resource for Old Pooling (to be removed)
// ============================================================================
// Pinned Buffer Resource for Pooling // Pinned Buffer Resource for Pooling
#[derive(Debug)] #[derive(Debug)]
pub struct PinnedBuffer { pub struct PinnedBuffer {
...@@ -169,6 +175,10 @@ pub struct TransferContext { ...@@ -169,6 +175,10 @@ pub struct TransferContext {
stream: Arc<CudaStream>, stream: Arc<CudaStream>,
async_rt_handle: Handle, async_rt_handle: Handle,
// NEW: CUDA memory pool for stream-ordered host memory allocation
cuda_mem_pool: Option<Arc<CudaMemPool>>,
// LEGACY: Old pinned buffer pool (still used by TransferResources)
pinned_buffer_pool: Option<SyncPinnedBufferPool>, pinned_buffer_pool: Option<SyncPinnedBufferPool>,
cuda_event_tx: mpsc::UnboundedSender<(CudaEvent, oneshot::Sender<()>)>, cuda_event_tx: mpsc::UnboundedSender<(CudaEvent, oneshot::Sender<()>)>,
...@@ -182,7 +192,7 @@ impl TransferContext { ...@@ -182,7 +192,7 @@ impl TransferContext {
stream: Arc<CudaStream>, stream: Arc<CudaStream>,
async_rt_handle: Handle, async_rt_handle: Handle,
config: Option<PoolConfig>, config: Option<PoolConfig>,
) -> Self { ) -> Result<Self, anyhow::Error> {
let (cuda_event_tx, cuda_event_rx) = let (cuda_event_tx, cuda_event_rx) =
mpsc::unbounded_channel::<(CudaEvent, oneshot::Sender<()>)>(); mpsc::unbounded_channel::<(CudaEvent, oneshot::Sender<()>)>();
...@@ -190,105 +200,61 @@ impl TransferContext { ...@@ -190,105 +200,61 @@ impl TransferContext {
let cancel_token_clone = cancel_token.clone(); let cancel_token_clone = cancel_token.clone();
let cuda_event_worker = Self::setup_cuda_event_worker(cuda_event_rx, cancel_token_clone); let cuda_event_worker = Self::setup_cuda_event_worker(cuda_event_rx, cancel_token_clone);
let pool = if let Some(config) = config {
if config.enable_pool {
let pool_size = config.max_concurrent_transfers * 2 + 2;
// Calculate buffer size for worst-case scenario
// In practice, transfers can be much larger than max_transfer_batch_size
// due to direct transfer paths bypassing the batcher
let max_blocks_per_transfer = config.max_transfer_batch_size; // Conservative estimate for large transfers
let buffer_size = max_blocks_per_transfer
* config.num_outer_components
* config.num_layers
* std::mem::size_of::<u64>();
tracing::info!( let pool = {
"Creating pinned buffer pool: {} buffers × {}KB each", tracing::debug!(
pool_size, "Pinned buffer pool is no longer used for kernel transfers and will be removed in the future"
buffer_size / 1024,
); );
None
};
let total_memory_mb = (pool_size * buffer_size) / (1024 * 1024); // Create CUDA memory pool for stream-ordered allocation
tracing::info!("Total pool memory: {}MB", total_memory_mb); let cuda_mem_pool = if let Some(ref cfg) = config {
if cfg.enable_pool {
// Calculate total reserve size for pre-warming
let num_buffers = cfg.max_concurrent_transfers * 2 + 2;
let buffer_size = cfg.max_transfer_batch_size
* cfg.num_outer_components
* cfg.num_layers
* std::mem::size_of::<u64>();
let reserve_size = num_buffers * buffer_size;
{ tracing::info!(
// Create initial pinned buffers "Creating CUDA memory pool: {} buffers × {}KB = {}MB total",
let mut initial_buffers = Vec::with_capacity(pool_size); num_buffers,
let mut successful_allocations = 0; buffer_size / 1024,
reserve_size / (1024 * 1024)
for i in 0..pool_size {
let ptr =
crate::block_manager::block::transfer::cuda::allocate_pinned_memory(
buffer_size,
)
.map_err(|e| {
tracing::error!(
"Failed to allocate pinned buffer {}/{}: {}",
i + 1,
pool_size,
e
); );
e
})
.unwrap_or(0);
if ptr != 0 { let pool = CudaMemPool::builder(stream.context().clone(), reserve_size)
let buffer = PinnedBuffer { .release_threshold(128 * 1024 * 1024) // Release memory above 128MB back to OS
ptr, .build()
size: buffer_size, .map_err(|e| anyhow::anyhow!("Failed to create CUDA memory pool: {}", e))?;
id: i as u64,
};
initial_buffers.push(buffer);
successful_allocations += 1;
tracing::debug!(
"Allocated pinned buffer {}/{}: 0x{:x} ({}KB)",
i + 1,
pool_size,
ptr,
buffer_size / 1024
);
}
}
if successful_allocations == pool_size {
tracing::info!( tracing::info!(
"Successfully created pinned buffer pool: {}/{} buffers allocated", "CUDA memory pool created successfully (DEVICE memory, stream-ordered allocation, pre-warmed with {}MB)",
successful_allocations, reserve_size / (1024 * 1024)
pool_size
);
} else {
tracing::warn!(
"Partial pool creation: {}/{} buffers allocated",
successful_allocations,
pool_size
); );
} Some(Arc::new(pool))
if successful_allocations > 0 {
Some(SyncPinnedBufferPool::new_direct(initial_buffers))
} else {
tracing::error!("Failed to allocate any pinned buffers - pool disabled");
None
}
}
} else { } else {
tracing::debug!("Pinned buffer pool disabled by configuration"); tracing::debug!("CUDA memory pool disabled by configuration");
None None
} }
} else { } else {
tracing::debug!("No pool configuration provided - using fallback allocation"); tracing::debug!("No pool configuration provided - CUDA memory pool disabled");
None None
}; };
Self { Ok(Self {
nixl_agent, nixl_agent,
stream, stream,
async_rt_handle, async_rt_handle,
cuda_mem_pool,
pinned_buffer_pool: pool, pinned_buffer_pool: pool,
cuda_event_tx, cuda_event_tx,
cuda_event_worker: Some(cuda_event_worker), cuda_event_worker: Some(cuda_event_worker),
cancel_token, cancel_token,
} })
} }
fn setup_cuda_event_worker( fn setup_cuda_event_worker(
...@@ -331,6 +297,11 @@ impl TransferContext { ...@@ -331,6 +297,11 @@ impl TransferContext {
&self.async_rt_handle &self.async_rt_handle
} }
/// Get the CUDA memory pool for stream-ordered allocations
pub fn cuda_mem_pool(&self) -> Option<&Arc<CudaMemPool>> {
self.cuda_mem_pool.as_ref()
}
pub fn cuda_event(&self, tx: oneshot::Sender<()>) -> Result<(), TransferError> { pub fn cuda_event(&self, tx: oneshot::Sender<()>) -> Result<(), TransferError> {
let event = self let event = self
.stream .stream
...@@ -382,10 +353,6 @@ impl TransferContext { ...@@ -382,10 +353,6 @@ impl TransferContext {
)) ))
} }
} }
pub fn calculate_buffer_size(&self, address_count: usize) -> usize {
address_count * std::mem::size_of::<u64>()
}
} }
impl Drop for TransferContext { impl Drop for TransferContext {
......
...@@ -9,45 +9,12 @@ use crate::block_manager::block::{BlockDataProvider, BlockDataProviderMut}; ...@@ -9,45 +9,12 @@ use crate::block_manager::block::{BlockDataProvider, BlockDataProviderMut};
use anyhow::Result; use anyhow::Result;
use cudarc::driver::CudaStream; use cudarc::driver::CudaStream;
use cudarc::driver::result as cuda_result; use cudarc::driver::result as cuda_result;
use cudarc::driver::sys::{CUevent_flags, CUresult, cuMemcpyHtoDAsync_v2};
use dynamo_runtime::config::environment_names::cuda as env_cuda; use dynamo_runtime::config::environment_names::cuda as env_cuda;
use std::ops::Range; use std::ops::Range;
use std::sync::Mutex; use std::sync::Mutex;
use std::sync::OnceLock; use std::sync::OnceLock;
/// Simple pinned memory allocation
pub fn allocate_pinned_memory(size: usize) -> Result<u64, TransferError> {
// 16-byte alignment for vectorized operations
let aligned_size = (size + 15) & !15;
if aligned_size == 0 {
return Err(TransferError::ExecutionError(
"Invalid allocation size".to_string(),
));
}
unsafe {
let result = cuda_result::malloc_host(aligned_size, 0);
match result {
Ok(ptr) => {
let ptr_value = ptr as u64;
tracing::debug!(
"Allocated pinned memory: {}KB, ptr=0x{:x}",
aligned_size / 1024,
ptr_value
);
Ok(ptr_value)
}
Err(e) => {
tracing::error!("Pinned memory allocation failed: {}", e);
Err(TransferError::ExecutionError(format!(
"Pinned memory allocation failed: {}",
e
)))
}
}
}
}
// Global storage for kernel function - store as usize to avoid Send/Sync issues // Global storage for kernel function - store as usize to avoid Send/Sync issues
static COPY_KERNEL_MODULE: Mutex<Option<usize>> = Mutex::new(None); static COPY_KERNEL_MODULE: Mutex<Option<usize>> = Mutex::new(None);
static COPY_KERNEL_FUNCTION: Mutex<Option<usize>> = Mutex::new(None); static COPY_KERNEL_FUNCTION: Mutex<Option<usize>> = Mutex::new(None);
...@@ -169,10 +136,17 @@ unsafe fn launch_copy_kernel_direct( ...@@ -169,10 +136,17 @@ unsafe fn launch_copy_kernel_direct(
}; };
if result != cudarc::driver::sys::cudaError_enum::CUDA_SUCCESS { if result != cudarc::driver::sys::cudaError_enum::CUDA_SUCCESS {
tracing::error!("Kernel launch failed: {:?}", result); tracing::error!(
"Kernel launch failed: {:?} - kernel params: {} pairs, layer_size={}, src=0x{:x}, dst=0x{:x}",
result,
address_count,
layer_size,
src_pinned_ptr,
dst_pinned_ptr
);
return Err(TransferError::ExecutionError(format!( return Err(TransferError::ExecutionError(format!(
"CUDA kernel launch failed: {:?}", "CUDA kernel launch failed: {:?} (address_count={}, layer_size={})",
result result, address_count, layer_size
))); )));
} }
...@@ -217,7 +191,7 @@ pub fn copy_blocks_with_customized_kernel<'a, Source, Destination>( ...@@ -217,7 +191,7 @@ pub fn copy_blocks_with_customized_kernel<'a, Source, Destination>(
destinations: &'a mut [Destination], destinations: &'a mut [Destination],
stream: &CudaStream, stream: &CudaStream,
ctx: &crate::block_manager::block::transfer::TransferContext, ctx: &crate::block_manager::block::transfer::TransferContext,
) -> Result<Option<(Vec<u64>, usize)>, TransferError> ) -> Result<(), TransferError>
where where
Source: BlockDataProvider, Source: BlockDataProvider,
Destination: BlockDataProviderMut, Destination: BlockDataProviderMut,
...@@ -239,35 +213,86 @@ where ...@@ -239,35 +213,86 @@ where
src_addresses.len() src_addresses.len()
); );
// Use pool-based approach with TransferResources let size = src_addresses.len() * std::mem::size_of::<u64>();
let resources = crate::block_manager::block::transfer::context::TransferResources::acquire_for_kernel_launch(
ctx,
src_addresses.len()
)?;
// Copy addresses to pinned buffers let pool = ctx.cuda_mem_pool().ok_or_else(|| {
resources.copy_addresses_to_buffers(&src_addresses, &dst_addresses)?; TransferError::ExecutionError(
"TransferContext was not instantiated with a CudaPool; please report this error"
.to_string(),
)
})?;
// Allocate DEVICE memory from pool (stream-ordered)
let src_buffer = pool.alloc_async(size, stream).map_err(|e| {
TransferError::ExecutionError(format!("CUDA pool allocation failed: {}", e))
})?;
let dst_buffer = pool.alloc_async(size, stream).map_err(|e| {
TransferError::ExecutionError(format!("CUDA pool allocation failed: {}", e))
})?;
// Copy address buffers from host to device using stream-ordered H2D memcpy
let result_src = unsafe {
cuMemcpyHtoDAsync_v2(
src_buffer,
src_addresses.as_ptr() as *const std::ffi::c_void,
size,
stream.cu_stream(),
)
};
if result_src != CUresult::CUDA_SUCCESS {
return Err(TransferError::ExecutionError(format!(
"H2D memcpy for src buffer failed: {:?}",
result_src
)));
}
tracing::debug!( let result_dst = unsafe {
" Using pooled pinned buffers: src=0x{:x}, dst=0x{:x} ({} address pairs)", cuMemcpyHtoDAsync_v2(
resources.src_ptr(), dst_buffer,
resources.dst_ptr(), dst_addresses.as_ptr() as *const std::ffi::c_void,
src_addresses.len() size,
); stream.cu_stream(),
)
};
if result_dst != CUresult::CUDA_SUCCESS {
return Err(TransferError::ExecutionError(format!(
"H2D memcpy for dst buffer failed: {:?}",
result_dst
)));
}
// Record event and synchronize to ensure H2D completes before host vectors drop
// This is critical: the async H2D memcpy is still reading from src_addresses/dst_addresses
// host memory when it returns. We must wait for completion before those vectors are dropped.
let h2d_event = stream
.record_event(Some(CUevent_flags::CU_EVENT_BLOCKING_SYNC))
.map_err(|e| TransferError::ExecutionError(format!("Failed to record H2D event: {}", e)))?;
// Launch kernel with pooled resources (addresses already copied) // Launch kernel (reads from device buffers)
unsafe { unsafe {
launch_copy_kernel_direct( launch_copy_kernel_direct(
resources.src_ptr(), src_buffer,
resources.dst_ptr(), dst_buffer,
src_addresses.len(), src_addresses.len(),
dims.layer_size, dims.layer_size,
stream, stream,
)?; )?;
} }
tracing::debug!("vectorized_copy completed - resources will be returned to pool automatically"); // Free buffers immediately (stream-ordered - CUDA ensures kernel completes first)
Ok(None) // No manual cleanup needed - TransferResources handles it via Drop pool.free_async(src_buffer, stream)
.map_err(|e| TransferError::ExecutionError(format!("Failed to free src buffer: {}", e)))?;
pool.free_async(dst_buffer, stream)
.map_err(|e| TransferError::ExecutionError(format!("Failed to free dst buffer: {}", e)))?;
// By synchronizing here, we enqueue all the work to the stream, then wait.
// There is cpu overheads associated with each of those calls.
// We might as well amortize the transfer of the pointers with those launch overheads.
h2d_event
.synchronize()
.map_err(|e| TransferError::ExecutionError(format!("Failed to sync H2D event: {}", e)))?;
Ok(())
} }
/// Copy a block from a source to a destination using CUDA memcpy /// Copy a block from a source to a destination using CUDA memcpy
......
...@@ -120,12 +120,24 @@ async fn perform_allocation_and_build_handler( ...@@ -120,12 +120,24 @@ async fn perform_allocation_and_build_handler(
num_outer_components: device_layout.config().outer_dim, num_outer_components: device_layout.config().outer_dim,
num_layers: device_layout.config().num_layers, num_layers: device_layout.config().num_layers,
}; };
let transfer_context = Arc::new(TransferContext::new( let transfer_context = Arc::new(
TransferContext::new(
Arc::new(Some(agent)), Arc::new(Some(agent)),
DeviceAllocator::new(device_id)?.ctx().new_stream()?, DeviceAllocator::new(device_id)?.ctx().new_stream()?,
Handle::current(), Handle::current(),
Some(pool_config), Some(pool_config),
)); )
.map_err(|e| {
anyhow::anyhow!(
"Failed to create transfer context for worker {} with CUDA memory pool: {}. \
This is a critical error - the worker cannot start without CUDA memory pools. \
Please ensure sufficient GPU memory is available on device {}.",
worker_id,
e,
device_id
)
})?,
);
// device // device
let device_blocks = Some(KvbmWorker::make_layout::<_, BasicMetadata>( let device_blocks = Some(KvbmWorker::make_layout::<_, BasicMetadata>(
......
...@@ -154,12 +154,22 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata> ...@@ -154,12 +154,22 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
}; };
// We want cuda offloads to happen in parallel with host onboards, so we need to use a different stream. // We want cuda offloads to happen in parallel with host onboards, so we need to use a different stream.
let device_offload_transfer_ctx = Arc::new(TransferContext::new( let device_offload_transfer_ctx = Arc::new(
TransferContext::new(
config.nixl_agent.clone(), config.nixl_agent.clone(),
cuda_ctx.new_stream()?, cuda_ctx.new_stream()?,
config.async_rt_handle.clone(), config.async_rt_handle.clone(),
Some(pool_config), Some(pool_config.clone()),
)); )
.map_err(|e| {
anyhow::anyhow!(
"Failed to create device offload transfer context with CUDA memory pool: {}. \
This is a critical error - the system cannot operate without CUDA memory pools. \
Please ensure sufficient GPU memory is available.",
e
)
})?,
);
// Device -> Host offload // Device -> Host offload
let device_to_host_task = OffloadManager::offload_worker( let device_to_host_task = OffloadManager::offload_worker(
...@@ -192,12 +202,20 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata> ...@@ -192,12 +202,20 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
)? )?
.detach(); .detach();
let transfer_ctx = Arc::new(TransferContext::new( let transfer_ctx = Arc::new(
TransferContext::new(
config.nixl_agent.clone(), config.nixl_agent.clone(),
cuda_ctx.new_stream()?, cuda_ctx.new_stream()?,
config.async_rt_handle.clone(), config.async_rt_handle.clone(),
None, Some(pool_config),
)); )
.map_err(|e| {
anyhow::anyhow!(
"Failed to create transfer context for host onboard operations: {}",
e
)
})?,
);
// Host -> Disk offload // Host -> Disk offload
let host_to_disk_task = OffloadManager::offload_worker( let host_to_disk_task = OffloadManager::offload_worker(
......
...@@ -239,7 +239,16 @@ impl PinnedStorage { ...@@ -239,7 +239,16 @@ impl PinnedStorage {
impl Drop for PinnedStorage { impl Drop for PinnedStorage {
fn drop(&mut self) { fn drop(&mut self) {
self.handles.release(); self.handles.release();
unsafe { cudarc::driver::result::free_host(self.ptr as _) }.unwrap(); unsafe {
if let Err(e) = cudarc::driver::result::free_host(self.ptr as _) {
tracing::error!(
"Failed to free pinned storage at 0x{:x} (size={}): {}",
self.ptr,
self.size,
e
);
}
}
} }
} }
......
...@@ -13,6 +13,7 @@ pub mod actions; ...@@ -13,6 +13,7 @@ pub mod actions;
pub mod arena; pub mod arena;
pub mod nixl; pub mod nixl;
pub mod offset; pub mod offset;
pub mod pool;
pub mod prelude; pub mod prelude;
mod device; mod device;
......
// SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! CUDA memory pool for efficient device memory allocation in hot paths.
//!
//! This module provides a safe wrapper around CUDA's memory pool APIs, enabling
//! fast async allocations that avoid the overhead of cudaMalloc/cudaFree per call.
//! Memory is returned to the pool on free and reused for subsequent allocations.
use anyhow::{Result, anyhow};
use cudarc::driver::sys::{
self, CUmemAllocationType, CUmemLocationType, CUmemPool_attribute, CUmemPoolProps,
CUmemoryPool, CUresult, CUstream,
};
use cudarc::driver::{CudaContext, CudaStream};
use std::ptr;
use std::sync::{Arc, Mutex};
/// Builder for creating a CUDA memory pool with configurable parameters.
///
/// # Example
/// ```ignore
/// let pool = CudaMemPoolBuilder::new(context, 64 * 1024 * 1024) // 64 MiB reserve
/// .release_threshold(32 * 1024 * 1024) // 32 MiB release threshold
/// .build()?;
/// ```
pub struct CudaMemPoolBuilder {
/// CUDA context for the target device.
context: Arc<CudaContext>,
/// Bytes to pre-allocate to warm the pool.
reserve_size: usize,
/// Optional threshold above which memory is returned to the system on free.
release_threshold: Option<u64>,
}
impl CudaMemPoolBuilder {
/// Create a new builder with the required reserve size.
///
/// # Arguments
/// * `context` - CUDA context for the device
/// * `reserve_size` - Number of bytes to pre-allocate to warm the pool
pub fn new(context: Arc<CudaContext>, reserve_size: usize) -> Self {
Self {
context,
reserve_size,
release_threshold: None,
}
}
/// Set the release threshold for the pool.
///
/// Memory above this threshold is returned to the system when freed.
/// If not set, no release threshold is configured (CUDA default behavior).
pub fn release_threshold(mut self, threshold: u64) -> Self {
self.release_threshold = Some(threshold);
self
}
/// Build the CUDA memory pool.
///
/// This will:
/// 1. Create the pool
/// 2. Set the release threshold if configured
/// 3. Pre-allocate and free memory to warm the pool
pub fn build(self) -> Result<CudaMemPool> {
// Initialize pool properties
let mut props: CUmemPoolProps = unsafe { std::mem::zeroed() };
props.allocType = CUmemAllocationType::CU_MEM_ALLOCATION_TYPE_PINNED;
props.location.type_ = CUmemLocationType::CU_MEM_LOCATION_TYPE_DEVICE;
props.location.id = self.context.cu_device();
let mut pool: CUmemoryPool = ptr::null_mut();
// Create the pool
let result = unsafe { sys::cuMemPoolCreate(&mut pool, &props) };
if result != CUresult::CUDA_SUCCESS {
return Err(anyhow!("cuMemPoolCreate failed with error: {:?}", result));
}
// Set release threshold if configured
if let Some(threshold) = self.release_threshold {
let result = unsafe {
sys::cuMemPoolSetAttribute(
pool,
CUmemPool_attribute::CU_MEMPOOL_ATTR_RELEASE_THRESHOLD,
&threshold as *const u64 as *mut std::ffi::c_void,
)
};
if result != CUresult::CUDA_SUCCESS {
// Clean up on failure
unsafe { sys::cuMemPoolDestroy(pool) };
return Err(anyhow!(
"cuMemPoolSetAttribute failed with error: {:?}",
result
));
}
}
let cuda_pool = CudaMemPool {
inner: Mutex::new(pool),
};
// Warm the pool by pre-allocating and freeing memory
if self.reserve_size > 0 {
// Create a temporary stream for warming
let stream = self.context.new_stream()?;
// Allocate to warm the pool (using safe variant)
let ptr = cuda_pool.alloc_async(self.reserve_size, &stream)?;
// Free back to pool (memory stays reserved)
cuda_pool.free_async(ptr, &stream)?;
// Synchronize to ensure operations complete
// SAFETY: stream.cu_stream() is valid for the lifetime of `stream`
let result = unsafe { sys::cuStreamSynchronize(stream.cu_stream()) };
if result != CUresult::CUDA_SUCCESS {
return Err(anyhow!(
"cuStreamSynchronize failed with error: {:?}",
result
));
}
}
Ok(cuda_pool)
}
}
/// Safe wrapper around a CUDA memory pool.
///
/// The pool amortizes allocation overhead by maintaining a reservoir of device memory.
/// Allocations are fast sub-allocations from this reservoir, and frees return memory
/// to the pool rather than the OS (until the release threshold is exceeded).
///
/// # Thread Safety
///
/// This type uses internal locking to serialize host-side calls to CUDA driver APIs.
/// `cuMemAllocFromPoolAsync` is not host-thread reentrant, so concurrent calls from
/// multiple threads must be serialized. The GPU-side operations remain asynchronous
/// and stream-ordered.
///
/// Use [`CudaMemPoolBuilder`] for configurable pool creation with pre-allocation.
pub struct CudaMemPool {
/// Mutex protecting the pool handle for host-thread serialization.
///
/// CUDA's `cuMemAllocFromPoolAsync` does not guarantee host-thread reentrancy,
/// so all calls to the pool must be serialized on the host side.
inner: Mutex<CUmemoryPool>,
}
// SAFETY: CudaMemPool is Send because the Mutex serializes all host-side access
// to the pool handle, and CUDA driver state is thread-safe when properly serialized.
unsafe impl Send for CudaMemPool {}
// SAFETY: CudaMemPool is Sync because all access to the pool handle goes through
// the Mutex, which serializes host-thread access. The CUDA driver requires this
// serialization because cuMemAllocFromPoolAsync is not host-thread reentrant.
unsafe impl Sync for CudaMemPool {}
impl CudaMemPool {
/// Create a builder for a new CUDA memory pool.
///
/// # Arguments
/// * `context` - CUDA context for the device
/// * `reserve_size` - Number of bytes to pre-allocate to warm the pool
pub fn builder(context: Arc<CudaContext>, reserve_size: usize) -> CudaMemPoolBuilder {
CudaMemPoolBuilder::new(context, reserve_size)
}
/// Allocate memory from the pool asynchronously.
///
/// This is the safe variant that takes a `&CudaStream` reference, ensuring
/// the stream is valid for the duration of the call.
///
/// The allocation is stream-ordered; the memory is available for use
/// after all preceding operations on the stream complete.
///
/// # Host Serialization
///
/// This method acquires an internal mutex because `cuMemAllocFromPoolAsync`
/// is not host-thread reentrant. The allocation itself is stream-ordered on
/// the GPU side.
///
/// # Arguments
/// * `size` - Size in bytes to allocate
/// * `stream` - CUDA stream for async ordering
///
/// # Returns
/// Device pointer to the allocated memory
pub fn alloc_async(&self, size: usize, stream: &CudaStream) -> Result<u64> {
// SAFETY: stream.cu_stream() returns a valid handle owned by the CudaStream,
// and the borrow ensures the stream lives for the duration of this call.
unsafe { self.alloc_async_raw(size, stream.cu_stream()) }
}
/// Allocate memory from the pool asynchronously (raw stream handle variant).
///
/// This is the unsafe variant for use when you have a raw `CUstream` handle
/// from sources other than cudarc's `CudaStream`.
///
/// # Host Serialization
///
/// This method acquires an internal mutex because `cuMemAllocFromPoolAsync`
/// is not host-thread reentrant.
///
/// # Arguments
/// * `size` - Size in bytes to allocate
/// * `stream` - Raw CUDA stream handle for async ordering
///
/// # Returns
/// Device pointer to the allocated memory
///
/// # Safety
///
/// The caller must ensure that `stream` is a valid CUDA stream handle that
/// will remain valid for the duration of this call.
pub unsafe fn alloc_async_raw(&self, size: usize, stream: CUstream) -> Result<u64> {
let pool = self
.inner
.lock()
.map_err(|e| anyhow!("mutex poisoned: {}", e))?;
let mut ptr: u64 = 0;
let result = unsafe { sys::cuMemAllocFromPoolAsync(&mut ptr, size, *pool, stream) };
if result != CUresult::CUDA_SUCCESS {
return Err(anyhow!(
"cuMemAllocFromPoolAsync failed with error: {:?}",
result
));
}
Ok(ptr)
}
/// Free memory back to the pool asynchronously.
///
/// This is the safe variant that takes a `&CudaStream` reference.
///
/// The memory is returned to the pool's reservoir (not the OS) and can be
/// reused by subsequent allocations. The free is stream-ordered.
///
/// # Arguments
/// * `ptr` - Device pointer previously allocated from this pool
/// * `stream` - CUDA stream for async ordering
pub fn free_async(&self, ptr: u64, stream: &CudaStream) -> Result<()> {
// SAFETY: stream.cu_stream() returns a valid handle owned by the CudaStream,
// and the borrow ensures the stream lives for the duration of this call.
unsafe { self.free_async_raw(ptr, stream.cu_stream()) }
}
// NOTE: Unlike alloc_async_raw, this method does NOT acquire the pool mutex.
// The mutex in alloc_async_raw ensures each allocation returns a unique pointer.
// cuMemFreeAsync only enqueues a stream-ordered free operation for that unique
// pointer - multiple threads can safely enqueue frees for different unique pointers
// concurrently. The actual return-to-pool happens asynchronously on the GPU side.
/// Free memory back to the pool asynchronously (raw stream handle variant).
///
/// This is the unsafe variant for use when you have a raw `CUstream` handle.
///
/// The memory is returned to the pool's reservoir (not the OS) and can be
/// reused by subsequent allocations. The free is stream-ordered.
///
/// # Arguments
/// * `ptr` - Device pointer previously allocated from this pool
/// * `stream` - Raw CUDA stream handle for async ordering
///
/// # Safety
///
/// The caller must ensure that:
/// - `ptr` is a valid device pointer previously allocated from this pool
/// - `stream` is a valid CUDA stream handle
pub unsafe fn free_async_raw(&self, ptr: u64, stream: CUstream) -> Result<()> {
let result = unsafe { sys::cuMemFreeAsync(ptr, stream) };
if result != CUresult::CUDA_SUCCESS {
return Err(anyhow!("cuMemFreeAsync failed with error: {:?}", result));
}
Ok(())
}
}
impl Drop for CudaMemPool {
fn drop(&mut self) {
// No need to lock - we have &mut self so exclusive access is guaranteed
let pool = self
.inner
.get_mut()
.expect("mutex should not be poisoned during drop");
// Destroy the pool, releasing all memory back to the system
let result = unsafe { sys::cuMemPoolDestroy(*pool) };
if result != CUresult::CUDA_SUCCESS {
tracing::warn!("cuMemPoolDestroy failed with error: {:?}", result);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_pool_creation_with_builder() {
// Skip if no CUDA device available
let context = match CudaContext::new(0) {
Ok(ctx) => ctx,
Err(e) => {
eprintln!("Skipping test - no CUDA device: {:?}", e);
return;
}
};
// Test builder with reserve size and release threshold
let result = CudaMemPool::builder(context.clone(), 1024 * 1024) // 1 MiB reserve
.release_threshold(64 * 1024 * 1024) // 64 MiB threshold
.build();
if result.is_err() {
eprintln!("Skipping test - pool creation failed: {:?}", result.err());
return;
}
let pool = result.unwrap();
drop(pool);
}
#[test]
fn test_pool_creation_no_threshold() {
// Skip if no CUDA device available
let context = match CudaContext::new(0) {
Ok(ctx) => ctx,
Err(e) => {
eprintln!("Skipping test - no CUDA device: {:?}", e);
return;
}
};
// Test builder without release threshold
let result = CudaMemPool::builder(context, 0).build();
if result.is_err() {
eprintln!("Skipping test - pool creation failed: {:?}", result.err());
return;
}
let pool = result.unwrap();
drop(pool);
}
}
// SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Memory pool for efficient device memory allocation in hot paths.
pub mod cuda;
pub use cuda::{CudaMemPool, CudaMemPoolBuilder};
...@@ -306,7 +306,7 @@ class DeterminismTester(ApiTester): ...@@ -306,7 +306,7 @@ class DeterminismTester(ApiTester):
with open(self.shakespeare_file, "w", encoding="utf-8") as f: with open(self.shakespeare_file, "w", encoding="utf-8") as f:
f.write(content) f.write(content)
# Inherited from ApiTester, but override to add top_p for determinism testing # Inherited from ApiTester, but override to add determinism-specific parameters
def make_request( def make_request(
self, self,
content: str, content: str,
...@@ -322,12 +322,18 @@ class DeterminismTester(ApiTester): ...@@ -322,12 +322,18 @@ class DeterminismTester(ApiTester):
if seed == 42: # Default seed, use env override if seed == 42: # Default seed, use env override
seed = int(os.environ.get("KVBM_SEED", "42")) seed = int(os.environ.get("KVBM_SEED", "42"))
top_k = -1
if check_module_available("tensorrt_llm"):
top_k = 0
# For determinism: use temperature=0 which should trigger greedy decoding in vLLM
# Setting top_p=1.0 and top_k=-1 to avoid any sampling/filtering
return super().make_request( return super().make_request(
content, content,
max_tokens=max_tokens, max_tokens=max_tokens,
temperature=temperature, temperature=temperature,
seed=seed, seed=seed,
top_p=0.0001, # For determinism top_p=1.0, # No nucleus sampling filtering
top_k=top_k, # No top-k filtering
**kwargs, **kwargs,
) )
......
...@@ -449,10 +449,6 @@ class TestDeterminismAgg(BaseTestDeterminism): ...@@ -449,10 +449,6 @@ class TestDeterminismAgg(BaseTestDeterminism):
@pytest.mark.skipif( @pytest.mark.skipif(
not HAS_VLLM_BENCH, reason="requires vllm bench (vllm module not found)" not HAS_VLLM_BENCH, reason="requires vllm bench (vllm module not found)"
) )
@pytest.mark.xfail(
reason="Known issue, fixed in PR: https://github.com/ai-dynamo/dynamo/pull/5475",
run=True,
)
def test_concurrent_determinism_under_load( def test_concurrent_determinism_under_load(
self, tester, llm_server, runtime_services self, tester, llm_server, runtime_services
): ):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment