Unverified Commit 2d39f1b1 authored by Olga Andreeva's avatar Olga Andreeva Committed by GitHub
Browse files

feat: KVBM connector : enabling vectorized copy from pinned memory to device...


feat: KVBM connector : enabling vectorized copy from pinned memory to device memory and vice versa (#2989)
Signed-off-by: default avatarOlga Andreeva <oandreeva@nvidia.com>
Signed-off-by: default avataroandreeva-nv <oandreeva-nv@nvidia.com>
Co-authored-by: default avatarZiqi Fan <ziqif@nvidia.com>
Co-authored-by: default avataroandreeva-nv <oandreeva-nv@nvidia.com>
parent 5abea1be
...@@ -7,6 +7,7 @@ ...@@ -7,6 +7,7 @@
*.[Pp][Nn][Gg] binary *.[Pp][Nn][Gg] binary
*.[Zz][Ii][Pp] binary *.[Zz][Ii][Pp] binary
*.[Tt][Gg][Zz] binary *.[Tt][Gg][Zz] binary
*.fatbin binary
# Exclude test data files from linguist language detection # Exclude test data files from linguist language detection
lib/llm/tests/data/** linguist-vendored lib/llm/tests/data/** linguist-vendored
......
...@@ -5650,9 +5650,9 @@ dependencies = [ ...@@ -5650,9 +5650,9 @@ dependencies = [
[[package]] [[package]]
name = "potential_utf" name = "potential_utf"
version = "0.1.2" version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5a7c30837279ca13e7c867e9e40053bc68740f988cb07f7ca6df43cc734b585" checksum = "84df19adbe5b5a0782edcab45899906947ab039ccf4573713735ee7de1e6b08a"
dependencies = [ dependencies = [
"zerovec", "zerovec",
] ]
......
...@@ -218,7 +218,7 @@ impl Leader for KvConnectorLeader { ...@@ -218,7 +218,7 @@ impl Leader for KvConnectorLeader {
); );
if slot.state() == SlotState::SkippedPrefill || slot.state() == SlotState::SkippedDecode { if slot.state() == SlotState::SkippedPrefill || slot.state() == SlotState::SkippedDecode {
tracing::warn!("slot is in the SkippedPrefill or SkippedDecode state; will resume from skipped and return early"); tracing::debug!("slot is in the SkippedPrefill or SkippedDecode state; will resume from skipped and return early");
match slot.state() { match slot.state() {
SlotState::SkippedPrefill => { SlotState::SkippedPrefill => {
slot.mark_as_prefilling(self.iteration_counter)?; slot.mark_as_prefilling(self.iteration_counter)?;
......
...@@ -398,7 +398,7 @@ impl VllmConnectorSlot { ...@@ -398,7 +398,7 @@ impl VllmConnectorSlot {
SlotState::SkippedPrefill => Ok(()), // already skipped SlotState::SkippedPrefill => Ok(()), // already skipped
SlotState::SkippedDecode => Ok(()), // already skipped SlotState::SkippedDecode => Ok(()), // already skipped
_ => { _ => {
tracing::warn!("slot is in the {:?} state; will not explicitly mark as skipped, request_id: {}", self.state, self.request_id); tracing::debug!("slot is in the {:?} state; will not explicitly mark as skipped, request_id: {}", self.state, self.request_id);
Ok(()) Ok(())
} }
} }
......
...@@ -64,9 +64,7 @@ class KvConnectorWorker: ...@@ -64,9 +64,7 @@ class KvConnectorWorker:
Args: kv_caches: Args: kv_caches:
dictionary of layer names, kv cache dictionary of layer names, kv cache
""" """
print(
f"KvConnectorWorker.register_kv_caches called with {len(kv_caches)} kv_caches"
)
cache_config = self.vllm_config.cache_config cache_config = self.vllm_config.cache_config
# Create ordered list of (layer_name, tensor) tuples sorted by layer index # Create ordered list of (layer_name, tensor) tuples sorted by layer index
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
use std::env;
use std::path::PathBuf;
fn main() -> Result<(), Box<dyn std::error::Error>> { fn main() -> Result<(), Box<dyn std::error::Error>> {
// Declare our custom cfg flag to avoid unexpected_cfgs warnings
println!("cargo:rustc-check-cfg=cfg(have_vec_copy_fatbin)");
println!("cargo:warning=Building with CUDA KV off"); println!("cargo:warning=Building with CUDA KV off");
build_protos() build_protos()?;
// Get FATBIN path and copy it to OUT_DIR for embedding
if let Some(fatbin_path) = find_fatbin_file() {
// Copy FATBIN to OUT_DIR so we can include it with a predictable path
let out_dir = env::var("OUT_DIR").unwrap();
let dest_path = PathBuf::from(out_dir).join("vectorized_copy.fatbin");
if let Err(e) = std::fs::copy(&fatbin_path, &dest_path) {
println!("cargo:warning=Failed to copy FATBIN to OUT_DIR: {}", e);
} else {
// Emit cfg flag for conditional compilation
println!("cargo:rustc-cfg=have_vec_copy_fatbin");
println!(
"cargo:warning=CUDA FATBIN found at: {} - copied to OUT_DIR",
fatbin_path.display()
);
}
// Tell cargo to rerun if FATBIN file changes
println!("cargo:rerun-if-changed={}", fatbin_path.display());
} else {
println!(
"cargo:warning=CUDA FATBIN not found - run 'make fatbin' in cuda_kernels directory"
);
println!("cargo:warning=Set DYNAMO_FATBIN_PATH env var to specify custom location");
}
// Rerun build if environment variable changes
println!("cargo:rerun-if-env-changed=DYNAMO_FATBIN_PATH");
Ok(())
} }
fn build_protos() -> Result<(), Box<dyn std::error::Error>> { fn build_protos() -> Result<(), Box<dyn std::error::Error>> {
...@@ -11,6 +48,43 @@ fn build_protos() -> Result<(), Box<dyn std::error::Error>> { ...@@ -11,6 +48,43 @@ fn build_protos() -> Result<(), Box<dyn std::error::Error>> {
Ok(()) Ok(())
} }
fn find_fatbin_file() -> Option<PathBuf> {
// 1. Check if user specified custom path via environment variable
if let Ok(custom_path) = env::var("DYNAMO_FATBIN_PATH") {
let fatbin_file = PathBuf::from(custom_path);
if fatbin_file.exists() {
println!(
"cargo:warning=Using custom FATBIN path: {}",
fatbin_file.display()
);
return Some(fatbin_file);
} else {
println!(
"cargo:warning=Custom FATBIN path does not exist: {}",
fatbin_file.display()
);
}
}
// 2. Check standard locations (priority order)
let default_paths = [
"./src/block_manager/block/transfer/kernels/vectorized_copy.fatbin", // Primary: Next to transfer module
];
for path in &default_paths {
let fatbin_file = PathBuf::from(path);
if fatbin_file.exists() {
println!(
"cargo:warning=Found FATBIN at default location: {}",
fatbin_file.display()
);
return Some(fatbin_file);
}
}
None
}
// NOTE: Preserving this build.rs for reference. We may want to re-enable // NOTE: Preserving this build.rs for reference. We may want to re-enable
// custom kernel compilation in the future. // custom kernel compilation in the future.
......
...@@ -14,8 +14,6 @@ use crate::block_manager::storage::{ ...@@ -14,8 +14,6 @@ use crate::block_manager::storage::{
nixl::{NixlRegisterableStorage, NixlStorage}, nixl::{NixlRegisterableStorage, NixlStorage},
}; };
use cudarc::driver::CudaStream;
use nixl_sys::NixlDescriptor; use nixl_sys::NixlDescriptor;
use nixl_sys::XferOp::{Read, Write}; use nixl_sys::XferOp::{Read, Write};
use std::ops::Range; use std::ops::Range;
...@@ -23,7 +21,7 @@ use tokio::sync::oneshot; ...@@ -23,7 +21,7 @@ use tokio::sync::oneshot;
pub use crate::block_manager::storage::{CudaAccessible, Local, Remote}; pub use crate::block_manager::storage::{CudaAccessible, Local, Remote};
pub use async_trait::async_trait; pub use async_trait::async_trait;
pub use context::TransferContext; pub use context::{PoolConfig, TransferContext};
/// A block that can be the target of a write /// A block that can be the target of a write
pub trait Writable {} pub trait Writable {}
...@@ -82,6 +80,14 @@ impl NixlTransfer { ...@@ -82,6 +80,14 @@ impl NixlTransfer {
} }
} }
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CudaTransferMode {
/// Use the custom CUDA kernel for G1 <-> G2 transfers
Custom,
/// Use the default CUDA async memcpy for G1 <-> G2 transfers
Default,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TransferStrategy { pub enum TransferStrategy {
Memcpy, Memcpy,
...@@ -135,6 +141,33 @@ where ...@@ -135,6 +141,33 @@ where
} }
} }
#[inline]
fn resolve_cuda_transfer_mode(
base_strategy: TransferStrategy,
is_contiguous: bool,
) -> CudaTransferMode {
match base_strategy {
TransferStrategy::CudaAsyncH2D => {
if is_contiguous {
CudaTransferMode::Default
} else {
CudaTransferMode::Custom
}
}
TransferStrategy::CudaAsyncD2H => {
if is_contiguous {
CudaTransferMode::Default
} else {
CudaTransferMode::Custom
}
}
other => panic!(
"resolve_cuda_strategy called with non-CUDA strategy: {:?}",
other
),
}
}
pub fn handle_local_transfer<RB, WB>( pub fn handle_local_transfer<RB, WB>(
sources: &[RB], sources: &[RB],
targets: &mut [WB], targets: &mut [WB],
...@@ -162,13 +195,52 @@ where ...@@ -162,13 +195,52 @@ where
TransferStrategy::CudaAsyncH2D TransferStrategy::CudaAsyncH2D
| TransferStrategy::CudaAsyncD2H | TransferStrategy::CudaAsyncD2H
| TransferStrategy::CudaAsyncD2D => { | TransferStrategy::CudaAsyncD2D => {
tracing::debug!(
"Transfer: Using CUDA strategy: {:?}",
RB::write_to_strategy()
);
if RB::write_to_strategy() == TransferStrategy::CudaAsyncH2D
|| RB::write_to_strategy() == TransferStrategy::CudaAsyncD2H
{
let is_contiguous = sources[0].block_data().is_fully_contiguous()
&& targets[0].block_data().is_fully_contiguous();
let transfer_mode =
resolve_cuda_transfer_mode(RB::write_to_strategy(), is_contiguous);
match transfer_mode {
CudaTransferMode::Custom => {
let selected_stream = ctx.stream();
cuda::copy_blocks_with_customized_kernel(
sources,
targets,
selected_stream.as_ref(),
&ctx,
)?;
}
CudaTransferMode::Default => {
for (src, dst) in sources.iter().zip(targets.iter_mut()) { for (src, dst) in sources.iter().zip(targets.iter_mut()) {
cuda::copy_block(src, dst, ctx.stream().as_ref(), RB::write_to_strategy())?; cuda::copy_block(
src,
dst,
ctx.stream().as_ref(),
RB::write_to_strategy(),
)?;
}
}
} }
ctx.cuda_event(tx)?;
Ok(rx)
} else {
// Fall back to individual copy for D2Dblocks
for (src, dst) in sources.iter().zip(targets.iter_mut()) {
cuda::copy_block(src, dst, ctx.stream().as_ref(), RB::write_to_strategy())?;
}
ctx.cuda_event(tx)?; ctx.cuda_event(tx)?;
Ok(rx) Ok(rx)
} }
}
TransferStrategy::Nixl(transfer_type) => { TransferStrategy::Nixl(transfer_type) => {
let transfer_fut = nixl::write_blocks_to(sources, targets, &ctx, transfer_type)?; let transfer_fut = nixl::write_blocks_to(sources, targets, &ctx, transfer_type)?;
......
...@@ -6,17 +6,171 @@ use super::*; ...@@ -6,17 +6,171 @@ use super::*;
use cudarc::driver::{CudaEvent, CudaStream, sys::CUevent_flags}; use cudarc::driver::{CudaEvent, CudaStream, sys::CUevent_flags};
use nixl_sys::Agent as NixlAgent; use nixl_sys::Agent as NixlAgent;
use dynamo_runtime::utils::pool::{Returnable, SyncPool, SyncPoolItem};
use std::sync::Arc; use std::sync::Arc;
use std::thread::JoinHandle; use std::thread::JoinHandle;
use tokio::runtime::Handle; use tokio::runtime::Handle;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
// Pinned Buffer Resource for Pooling
#[derive(Debug)]
pub struct PinnedBuffer {
pub ptr: u64,
pub size: usize,
pub id: u64,
}
impl Returnable for PinnedBuffer {
fn on_return(&mut self) {
tracing::debug!(
"Returning pinned buffer {} ({}KB) to pool",
self.id,
self.size / 1024
);
}
}
impl Drop for PinnedBuffer {
fn drop(&mut self) {
tracing::debug!(
"Dropping pinned buffer {} ({}KB) - freeing CUDA pinned memory",
self.id,
self.size / 1024
);
unsafe {
if let Err(e) = cudarc::driver::result::free_host(self.ptr as *mut std::ffi::c_void) {
tracing::error!(
"Failed to free pinned buffer {} (0x{:x}): {}",
self.id,
self.ptr,
e
);
}
}
}
}
pub type SyncPinnedBufferPool = SyncPool<PinnedBuffer>;
pub struct TransferResources {
src_buffer: SyncPoolItem<PinnedBuffer>,
dst_buffer: SyncPoolItem<PinnedBuffer>,
}
impl TransferResources {
/// Create TransferResources by acquiring 2 buffers from the context
pub fn acquire_for_kernel_launch(
ctx: &TransferContext,
address_count: usize,
) -> Result<Self, TransferError> {
tracing::debug!(
"Acquiring TransferResources for {} addresses (need 2 buffers)",
address_count
);
// Acquire 2 buffers: one for src addresses, one for dst addresses
let src_buffer = ctx.acquire_resources_for_transfer_sync(address_count)?;
let dst_buffer = ctx.acquire_resources_for_transfer_sync(address_count)?;
tracing::debug!(
"TransferResources ready: src=0x{:x}, dst=0x{:x}",
src_buffer.ptr,
dst_buffer.ptr
);
Ok(Self {
src_buffer,
dst_buffer,
})
}
/// Copy address arrays into the pinned buffers
pub fn copy_addresses_to_buffers(
&self,
src_addresses: &[u64],
dst_addresses: &[u64],
) -> Result<(), TransferError> {
// Returns (), not pointers
if src_addresses.len() != dst_addresses.len() {
return Err(TransferError::ExecutionError(format!(
"Address array length mismatch: src={}, dst={}",
src_addresses.len(),
dst_addresses.len()
)));
}
let required_size = std::mem::size_of_val(src_addresses);
// Check buffer sizes
if self.src_buffer.size < required_size || self.dst_buffer.size < required_size {
return Err(TransferError::ExecutionError(format!(
"Buffer too small: {}B needed",
required_size
)));
}
// Copy addresses to pinned buffers
unsafe {
std::ptr::copy_nonoverlapping(
src_addresses.as_ptr(),
self.src_buffer.ptr as *mut u64,
src_addresses.len(),
);
std::ptr::copy_nonoverlapping(
dst_addresses.as_ptr(),
self.dst_buffer.ptr as *mut u64,
dst_addresses.len(),
);
}
tracing::debug!(
"Copied {} address pairs to pinned buffers",
src_addresses.len()
);
Ok(())
}
/// Get the source buffer pointer (for kernel launch)
pub fn src_ptr(&self) -> u64 {
self.src_buffer.ptr
}
/// Get the destination buffer pointer (for kernel launch)
pub fn dst_ptr(&self) -> u64 {
self.dst_buffer.ptr
}
}
impl Drop for TransferResources {
fn drop(&mut self) {
tracing::debug!(
"Releasing TransferResources: buffers {} & {} returning to pool",
self.src_buffer.id,
self.dst_buffer.id
);
// SyncPoolItem Drop handles returning buffers to pool automatically
}
}
#[derive(Debug, Clone)]
pub struct PoolConfig {
pub enable_pool: bool,
pub max_concurrent_transfers: usize,
pub max_transfer_batch_size: usize,
pub num_outer_components: usize,
pub num_layers: usize,
}
pub struct TransferContext { pub struct TransferContext {
nixl_agent: Arc<Option<NixlAgent>>, nixl_agent: Arc<Option<NixlAgent>>,
stream: Arc<CudaStream>, stream: Arc<CudaStream>,
async_rt_handle: Handle, async_rt_handle: Handle,
pinned_buffer_pool: Option<SyncPinnedBufferPool>,
cuda_event_tx: mpsc::UnboundedSender<(CudaEvent, oneshot::Sender<()>)>, cuda_event_tx: mpsc::UnboundedSender<(CudaEvent, oneshot::Sender<()>)>,
cuda_event_worker: Option<JoinHandle<()>>, cuda_event_worker: Option<JoinHandle<()>>,
cancel_token: CancellationToken, cancel_token: CancellationToken,
...@@ -27,14 +181,121 @@ impl TransferContext { ...@@ -27,14 +181,121 @@ impl TransferContext {
nixl_agent: Arc<Option<NixlAgent>>, nixl_agent: Arc<Option<NixlAgent>>,
stream: Arc<CudaStream>, stream: Arc<CudaStream>,
async_rt_handle: Handle, async_rt_handle: Handle,
config: Option<PoolConfig>,
) -> Self { ) -> Self {
let (cuda_event_tx, mut cuda_event_rx) = let (cuda_event_tx, cuda_event_rx) =
mpsc::unbounded_channel::<(CudaEvent, oneshot::Sender<()>)>(); mpsc::unbounded_channel::<(CudaEvent, oneshot::Sender<()>)>();
let cancel_token = CancellationToken::new(); let cancel_token = CancellationToken::new();
let cancel_token_clone = cancel_token.clone(); let cancel_token_clone = cancel_token.clone();
let cuda_event_worker = std::thread::spawn(move || { let cuda_event_worker = Self::setup_cuda_event_worker(cuda_event_rx, cancel_token_clone);
let pool = if let Some(config) = config {
if config.enable_pool {
let pool_size = config.max_concurrent_transfers * 2 + 2;
// Calculate buffer size for worst-case scenario
// In practice, transfers can be much larger than max_transfer_batch_size
// due to direct transfer paths bypassing the batcher
let max_blocks_per_transfer = config.max_transfer_batch_size; // Conservative estimate for large transfers
let buffer_size = max_blocks_per_transfer
* config.num_outer_components
* config.num_layers
* std::mem::size_of::<u64>();
tracing::info!(
"Creating pinned buffer pool: {} buffers × {}KB each",
pool_size,
buffer_size / 1024,
);
let total_memory_mb = (pool_size * buffer_size) / (1024 * 1024);
tracing::info!("Total pool memory: {}MB", total_memory_mb);
{
// Create initial pinned buffers
let mut initial_buffers = Vec::with_capacity(pool_size);
let mut successful_allocations = 0;
for i in 0..pool_size {
let ptr =
crate::block_manager::block::transfer::cuda::allocate_pinned_memory(
buffer_size,
)
.map_err(|e| {
tracing::error!(
"Failed to allocate pinned buffer {}/{}: {}",
i + 1,
pool_size,
e
);
e
})
.unwrap_or(0);
if ptr != 0 {
let buffer = PinnedBuffer {
ptr,
size: buffer_size,
id: i as u64,
};
initial_buffers.push(buffer);
successful_allocations += 1;
tracing::debug!(
"Allocated pinned buffer {}/{}: 0x{:x} ({}KB)",
i + 1,
pool_size,
ptr,
buffer_size / 1024
);
}
}
if successful_allocations == pool_size {
tracing::info!(
"Successfully created pinned buffer pool: {}/{} buffers allocated",
successful_allocations,
pool_size
);
} else {
tracing::warn!(
"Partial pool creation: {}/{} buffers allocated",
successful_allocations,
pool_size
);
}
if successful_allocations > 0 {
Some(SyncPinnedBufferPool::new_direct(initial_buffers))
} else {
tracing::error!("Failed to allocate any pinned buffers - pool disabled");
None
}
}
} else {
tracing::debug!("Pinned buffer pool disabled by configuration");
None
}
} else {
tracing::debug!("No pool configuration provided - using fallback allocation");
None
};
Self {
nixl_agent,
stream,
async_rt_handle,
pinned_buffer_pool: pool,
cuda_event_tx,
cuda_event_worker: Some(cuda_event_worker),
cancel_token,
}
}
fn setup_cuda_event_worker(
mut cuda_event_rx: mpsc::UnboundedReceiver<(CudaEvent, oneshot::Sender<()>)>,
cancel_token: CancellationToken,
) -> JoinHandle<()> {
std::thread::spawn(move || {
let runtime = tokio::runtime::Builder::new_current_thread() let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all() .enable_all()
.build() .build()
...@@ -49,22 +310,13 @@ impl TransferContext { ...@@ -49,22 +310,13 @@ impl TransferContext {
} }
let _ = tx.send(()); let _ = tx.send(());
} }
_ = cancel_token_clone.cancelled() => { _ = cancel_token.cancelled() => {
break; break;
} }
} }
} }
}); });
}); })
Self {
nixl_agent,
stream,
async_rt_handle,
cuda_event_tx,
cuda_event_worker: Some(cuda_event_worker),
cancel_token,
}
} }
pub fn nixl_agent(&self) -> Arc<Option<NixlAgent>> { pub fn nixl_agent(&self) -> Arc<Option<NixlAgent>> {
...@@ -90,6 +342,50 @@ impl TransferContext { ...@@ -90,6 +342,50 @@ impl TransferContext {
.map_err(|_| TransferError::ExecutionError("CUDA event worker exited.".into()))?; .map_err(|_| TransferError::ExecutionError("CUDA event worker exited.".into()))?;
Ok(()) Ok(())
} }
pub fn acquire_resources_for_transfer_sync(
&self,
size: usize,
) -> Result<SyncPoolItem<PinnedBuffer>, TransferError> {
let ptr_array_size = size * std::mem::size_of::<u64>();
tracing::debug!(
"Acquiring pinned buffer: need {} bytes for {} addresses",
ptr_array_size,
size
);
if let Some(pool) = &self.pinned_buffer_pool {
tracing::debug!("Pool available - acquiring buffer (blocking)...");
// All buffers are the same size, so just acquire one directly
let buffer = pool.acquire_blocking();
// Validate that the requested size fits in the buffer
if buffer.size < ptr_array_size {
return Err(TransferError::ExecutionError(format!(
"Buffer too small: need {}KB but buffer is only {}KB (addresses: {})",
ptr_array_size / 1024,
buffer.size / 1024,
size
)));
}
Ok(buffer)
} else {
tracing::warn!(
"No pinned buffer pool configured - this should not happen in production"
);
// No pool configured - this is a configuration error
Err(TransferError::ExecutionError(
"No sync pool configured - TransferContext must be created with a pool".into(),
))
}
}
pub fn calculate_buffer_size(&self, address_count: usize) -> usize {
address_count * std::mem::size_of::<u64>()
}
} }
impl Drop for TransferContext { impl Drop for TransferContext {
......
...@@ -4,10 +4,52 @@ ...@@ -4,10 +4,52 @@
use super::*; use super::*;
use super::TransferError; use super::TransferError;
use crate::block_manager::storage::{DeviceStorage, PinnedStorage}; use crate::block_manager::block::{BlockDataProvider, BlockDataProviderMut};
use anyhow::Result; use anyhow::Result;
use cudarc::driver::CudaStream;
use cudarc::driver::result as cuda_result; use cudarc::driver::result as cuda_result;
use std::ops::Range; use std::ops::Range;
use std::sync::Mutex;
use std::sync::OnceLock;
/// Simple pinned memory allocation
pub fn allocate_pinned_memory(size: usize) -> Result<u64, TransferError> {
// 16-byte alignment for vectorized operations
let aligned_size = (size + 15) & !15;
if aligned_size == 0 {
return Err(TransferError::ExecutionError(
"Invalid allocation size".to_string(),
));
}
unsafe {
let result = cuda_result::malloc_host(aligned_size, 0);
match result {
Ok(ptr) => {
let ptr_value = ptr as u64;
tracing::debug!(
"Allocated pinned memory: {}KB, ptr=0x{:x}",
aligned_size / 1024,
ptr_value
);
Ok(ptr_value)
}
Err(e) => {
tracing::error!("Pinned memory allocation failed: {}", e);
Err(TransferError::ExecutionError(format!(
"Pinned memory allocation failed: {}",
e
)))
}
}
}
}
// Global storage for kernel function - store as usize to avoid Send/Sync issues
static COPY_KERNEL_MODULE: Mutex<Option<usize>> = Mutex::new(None);
static COPY_KERNEL_FUNCTION: Mutex<Option<usize>> = Mutex::new(None);
type CudaMemcpyFnPtr = unsafe fn( type CudaMemcpyFnPtr = unsafe fn(
src_ptr: *const u8, src_ptr: *const u8,
...@@ -27,6 +69,206 @@ fn cuda_memcpy_fn_ptr(strategy: &TransferStrategy) -> Result<CudaMemcpyFnPtr, Tr ...@@ -27,6 +69,206 @@ fn cuda_memcpy_fn_ptr(strategy: &TransferStrategy) -> Result<CudaMemcpyFnPtr, Tr
} }
} }
/// Collect K/V cache addresses from source and destination blocks
fn collect_kv_addresses<Source, Destination>(
sources: &[Source],
destinations: &[Destination],
num_layers: usize,
num_outer_dims: usize,
) -> Result<(Vec<u64>, Vec<u64>), TransferError>
where
Source: BlockDataProvider,
Destination: BlockDataProviderMut,
{
if sources.is_empty() {
return Err(TransferError::ExecutionError(
"No source blocks provided".to_string(),
));
}
let total_address_pairs = sources.len() * num_layers * num_outer_dims;
let mut src_addresses = Vec::with_capacity(total_address_pairs);
let mut dst_addresses = Vec::with_capacity(total_address_pairs);
let src_block_data: Vec<_> = sources.iter().map(|block| block.block_data()).collect();
let dst_block_data: Vec<_> = destinations
.iter()
.map(|block| block.block_data())
.collect();
for (src_data, dst_data) in src_block_data.iter().zip(dst_block_data.iter()) {
for layer_idx in 0..num_layers {
for outer_idx in 0..num_outer_dims {
let src_view = src_data.layer_view(layer_idx, outer_idx)?;
let dst_view = dst_data.layer_view(layer_idx, outer_idx)?;
unsafe {
src_addresses.push(src_view.as_ptr() as u64);
dst_addresses.push(dst_view.as_ptr() as u64);
}
}
}
}
Ok((src_addresses, dst_addresses))
}
/// Launch CUDA kernel directly with pinned buffer pointers (no address copying)
unsafe fn launch_copy_kernel_direct(
src_pinned_ptr: u64,
dst_pinned_ptr: u64,
address_count: usize,
layer_size: usize,
stream: &CudaStream,
) -> Result<(), TransferError> {
// Get kernel function
let kernel = get_copy_kernel()?;
tracing::debug!(
"LAUNCHING KERNEL: {} pairs, src=0x{:x}, dst=0x{:x}",
address_count,
src_pinned_ptr,
dst_pinned_ptr
);
let threads_per_block = 256u32;
let max_blocks = 1024u32;
let blocks_needed = std::cmp::min(max_blocks, address_count as u32);
let grid_dim = (blocks_needed, 1, 1);
let block_dim = (threads_per_block, 1, 1);
// cuLaunchKernel expects pointers to parameter values
let src_ptr_param = src_pinned_ptr;
let dst_ptr_param = dst_pinned_ptr;
let size_param = layer_size;
let num_pairs_param = address_count as i32;
let params = [
&src_ptr_param as *const _ as *mut std::ffi::c_void,
&dst_ptr_param as *const _ as *mut std::ffi::c_void,
&size_param as *const _ as *mut std::ffi::c_void,
&num_pairs_param as *const _ as *mut std::ffi::c_void,
];
let result = unsafe {
cudarc::driver::sys::cuLaunchKernel(
kernel,
grid_dim.0,
grid_dim.1,
grid_dim.2,
block_dim.0,
block_dim.1,
block_dim.2,
0, // shared memory
stream.cu_stream(),
params.as_ptr() as *mut *mut std::ffi::c_void,
std::ptr::null_mut(), // extra
)
};
if result != cudarc::driver::sys::cudaError_enum::CUDA_SUCCESS {
tracing::error!("Kernel launch failed: {:?}", result);
return Err(TransferError::ExecutionError(format!(
"CUDA kernel launch failed: {:?}",
result
)));
}
Ok(())
}
#[derive(Clone, Copy, Debug)]
struct CachedBlockDimensions {
num_layers: usize,
num_outer_dims: usize,
layer_size: usize,
}
static BLOCK_DIMENSIONS_CACHE: OnceLock<CachedBlockDimensions> = OnceLock::new();
fn get_cached_block_dimensions<T: BlockDataProvider>(
block: &T,
) -> Result<CachedBlockDimensions, TransferError> {
Ok(*BLOCK_DIMENSIONS_CACHE
.get_or_init(|| calculate_block_dimensions_from_layout(block).unwrap()))
}
fn calculate_block_dimensions_from_layout<T: BlockDataProvider>(
block: &T,
) -> Result<CachedBlockDimensions, TransferError> {
let block_data = block.block_data();
// Get dimensions directly from layout (pre-computed values)
let num_layers = block_data.num_layers();
let num_outer_dims = block_data.num_outer_dims();
let layer_size = block_data.layer_view(0, 0).map(|v| v.size()).unwrap_or(0);
Ok(CachedBlockDimensions {
num_layers,
num_outer_dims,
layer_size,
})
}
pub fn copy_blocks_with_customized_kernel<'a, Source, Destination>(
sources: &'a [Source],
destinations: &'a mut [Destination],
stream: &CudaStream,
ctx: &crate::block_manager::block::transfer::TransferContext,
) -> Result<Option<(Vec<u64>, usize)>, TransferError>
where
Source: BlockDataProvider,
Destination: BlockDataProviderMut,
{
let _context_guard = stream.context().bind_to_thread();
// Get cached dimensions (calculated once per program lifetime!)
let dims = get_cached_block_dimensions(&sources[0])?;
// Use cached dimensions
let (src_addresses, dst_addresses) =
collect_kv_addresses(sources, destinations, dims.num_layers, dims.num_outer_dims)?;
tracing::debug!(
"Using vectorized_copy for {} blocks [{}L×{}O×{}B], {} address pairs",
sources.len(),
dims.num_layers,
dims.num_outer_dims,
dims.layer_size,
src_addresses.len()
);
// Use pool-based approach with TransferResources
let resources = crate::block_manager::block::transfer::context::TransferResources::acquire_for_kernel_launch(
ctx,
src_addresses.len()
)?;
// Copy addresses to pinned buffers
resources.copy_addresses_to_buffers(&src_addresses, &dst_addresses)?;
tracing::debug!(
" Using pooled pinned buffers: src=0x{:x}, dst=0x{:x} ({} address pairs)",
resources.src_ptr(),
resources.dst_ptr(),
src_addresses.len()
);
// Launch kernel with pooled resources (addresses already copied)
unsafe {
launch_copy_kernel_direct(
resources.src_ptr(),
resources.dst_ptr(),
src_addresses.len(),
dims.layer_size,
stream,
)?;
}
tracing::debug!("vectorized_copy completed - resources will be returned to pool automatically");
Ok(None) // No manual cleanup needed - TransferResources handles it via Drop
}
/// Copy a block from a source to a destination using CUDA memcpy /// Copy a block from a source to a destination using CUDA memcpy
pub fn copy_block<'a, Source, Destination>( pub fn copy_block<'a, Source, Destination>(
sources: &'a Source, sources: &'a Source,
...@@ -159,11 +401,6 @@ unsafe fn cuda_memcpy_h2d( ...@@ -159,11 +401,6 @@ unsafe fn cuda_memcpy_h2d(
) -> Result<(), TransferError> { ) -> Result<(), TransferError> {
debug_assert!(!src_ptr.is_null(), "Source host pointer is null"); debug_assert!(!src_ptr.is_null(), "Source host pointer is null");
debug_assert!(!dst_ptr.is_null(), "Destination device pointer is null"); debug_assert!(!dst_ptr.is_null(), "Destination device pointer is null");
debug_assert!(
(src_ptr as usize + size <= dst_ptr as usize)
|| (dst_ptr as usize + size <= src_ptr as usize),
"Source and destination device memory regions must not overlap for D2D copy"
);
unsafe { unsafe {
let src_slice = std::slice::from_raw_parts(src_ptr, size); let src_slice = std::slice::from_raw_parts(src_ptr, size);
...@@ -183,11 +420,6 @@ unsafe fn cuda_memcpy_d2h( ...@@ -183,11 +420,6 @@ unsafe fn cuda_memcpy_d2h(
) -> Result<(), TransferError> { ) -> Result<(), TransferError> {
debug_assert!(!src_ptr.is_null(), "Source device pointer is null"); debug_assert!(!src_ptr.is_null(), "Source device pointer is null");
debug_assert!(!dst_ptr.is_null(), "Destination host pointer is null"); debug_assert!(!dst_ptr.is_null(), "Destination host pointer is null");
debug_assert!(
(src_ptr as usize + size <= dst_ptr as usize)
|| (dst_ptr as usize + size <= src_ptr as usize),
"Source and destination device memory regions must not overlap for D2D copy"
);
unsafe { unsafe {
let dst_slice = std::slice::from_raw_parts_mut(dst_ptr, size); let dst_slice = std::slice::from_raw_parts_mut(dst_ptr, size);
...@@ -220,6 +452,172 @@ unsafe fn cuda_memcpy_d2d( ...@@ -220,6 +452,172 @@ unsafe fn cuda_memcpy_d2d(
Ok(()) Ok(())
} }
// Load the vectorized_copy module from FATBIN
fn get_copy_kernel_module() -> Result<cudarc::driver::sys::CUmodule, TransferError> {
let mut module_guard = COPY_KERNEL_MODULE.lock().unwrap();
if let Some(module_ptr) = *module_guard {
return Ok(module_ptr as cudarc::driver::sys::CUmodule);
}
// Load the module on first access
let module = match load_embedded_fatbin() {
Ok(module) => {
tracing::debug!("Successfully loaded embedded FATBIN module");
module
}
Err(embedded_err) => {
tracing::debug!("Embedded FATBIN loading failed: {:?}", embedded_err);
match load_runtime_fatbin() {
Ok(module) => {
tracing::debug!("Successfully loaded runtime FATBIN module");
module
}
Err(runtime_err) => {
tracing::error!(" Both FATBIN loading methods failed:");
tracing::error!(" Embedded error: {:?}", embedded_err);
tracing::error!(" Runtime error: {:?}", runtime_err);
return Err(TransferError::ExecutionError(
"No vectorized_copy FATBIN found (tried embedded and runtime paths)"
.to_string(),
));
}
}
}
};
let module_ptr = module as usize;
*module_guard = Some(module_ptr);
Ok(module as cudarc::driver::sys::CUmodule)
}
// Get the vectorized_copy function
fn get_copy_kernel() -> Result<cudarc::driver::sys::CUfunction, TransferError> {
let mut func_guard = COPY_KERNEL_FUNCTION.lock().unwrap();
if let Some(func_ptr) = *func_guard {
return Ok(func_ptr as cudarc::driver::sys::CUfunction);
}
// Load the function on first access
let module = get_copy_kernel_module()?;
let func = unsafe {
let mut func = std::ptr::null_mut();
let func_name = std::ffi::CString::new("vectorised_copy").unwrap();
let result =
cudarc::driver::sys::cuModuleGetFunction(&mut func, module, func_name.as_ptr());
if result == cudarc::driver::sys::cudaError_enum::CUDA_SUCCESS {
func
} else {
return Err(TransferError::ExecutionError(format!(
"Failed to get kernel function: {:?}",
result
)));
}
};
let func_ptr = func as usize;
*func_guard = Some(func_ptr);
Ok(func as cudarc::driver::sys::CUfunction)
}
// Try to load embedded FATBIN (compile-time) - only compiled when FATBIN is available
#[cfg(have_vec_copy_fatbin)]
fn load_embedded_fatbin() -> Result<cudarc::driver::sys::CUmodule, cudarc::driver::DriverError> {
// FATBIN was copied to OUT_DIR by build.rs and embedded here
const FATBIN: &[u8] = include_bytes!(concat!(env!("OUT_DIR"), "/vectorized_copy.fatbin"));
tracing::debug!("Loading embedded FATBIN ({} bytes)", FATBIN.len());
unsafe {
let mut module = std::ptr::null_mut();
let result = cudarc::driver::sys::cuModuleLoadData(
&mut module,
FATBIN.as_ptr() as *const std::ffi::c_void,
);
if result == cudarc::driver::sys::cudaError_enum::CUDA_SUCCESS {
tracing::debug!("Embedded FATBIN module loaded successfully: {:p}", module);
return Ok(module);
} else {
tracing::error!(
"Embedded FATBIN cuModuleLoadData failed with CUDA error: {:?}",
result
);
}
}
Err(cudarc::driver::DriverError(
cudarc::driver::sys::cudaError_enum::CUDA_ERROR_FILE_NOT_FOUND,
))
}
// Fallback implementation when FATBIN is not available at compile time
#[cfg(not(have_vec_copy_fatbin))]
fn load_embedded_fatbin() -> Result<cudarc::driver::sys::CUmodule, cudarc::driver::DriverError> {
tracing::debug!("No embedded FATBIN available (not compiled with have_vec_copy_fatbin)");
Err(cudarc::driver::DriverError(
cudarc::driver::sys::cudaError_enum::CUDA_ERROR_FILE_NOT_FOUND,
))
}
// Try to load FATBIN from filesystem (runtime)
fn load_runtime_fatbin() -> Result<cudarc::driver::sys::CUmodule, cudarc::driver::DriverError> {
// 1. Check runtime environment variable first
if let Ok(runtime_path) = std::env::var("DYNAMO_FATBIN_PATH")
&& let Ok(fatbin_data) = std::fs::read(&runtime_path)
{
tracing::debug!("Loading FATBIN from runtime env var: {}", runtime_path);
unsafe {
let mut module = std::ptr::null_mut();
let result = cudarc::driver::sys::cuModuleLoadData(
&mut module,
fatbin_data.as_ptr() as *const std::ffi::c_void,
);
if result == cudarc::driver::sys::cudaError_enum::CUDA_SUCCESS {
tracing::debug!("Runtime FATBIN module loaded successfully: {:p}", module);
return Ok(module);
} else {
tracing::error!(
"Runtime FATBIN cuModuleLoadData failed with CUDA error: {:?}",
result
);
}
}
}
// 2. Check standard runtime locations
let runtime_paths = ["./src/block_manager/block/transfer/kernels/vectorized_copy.fatbin"];
for path in &runtime_paths {
if let Ok(fatbin_data) = std::fs::read(path) {
tracing::debug!("Loading FATBIN from runtime path: {}", path);
unsafe {
let mut module = std::ptr::null_mut();
let result = cudarc::driver::sys::cuModuleLoadData(
&mut module,
fatbin_data.as_ptr() as *const std::ffi::c_void,
);
if result == cudarc::driver::sys::cudaError_enum::CUDA_SUCCESS {
tracing::debug!(
"Runtime path FATBIN module loaded successfully: {:p}",
module
);
return Ok(module);
} else {
tracing::error!(
"Runtime path FATBIN cuModuleLoadData failed with CUDA error: {:?}",
result
);
}
}
} else {
tracing::debug!("Could not read FATBIN file: {}", path);
}
}
Err(cudarc::driver::DriverError(
cudarc::driver::sys::cudaError_enum::CUDA_ERROR_FILE_NOT_FOUND,
))
}
#[cfg(all(test, feature = "testing-cuda"))] #[cfg(all(test, feature = "testing-cuda"))]
mod tests { mod tests {
use super::*; use super::*;
......
File suppressed by a .gitattributes entry or the file's encoding is unsupported.
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
use super::*; use super::*;
use futures::future::try_join_all;
use nixl_sys::NixlDescriptor; use nixl_sys::NixlDescriptor;
use utils::*; use utils::*;
use zmq::*; use zmq::*;
...@@ -18,6 +19,7 @@ use crate::block_manager::{ ...@@ -18,6 +19,7 @@ use crate::block_manager::{
transfer::{TransferContext, WriteTo, WriteToStrategy}, transfer::{TransferContext, WriteTo, WriteToStrategy},
}, },
connector::scheduler::{SchedulingDecision, TransferSchedulerClient}, connector::scheduler::{SchedulingDecision, TransferSchedulerClient},
offload::MAX_TRANSFER_BATCH_SIZE,
storage::{DeviceStorage, DiskStorage, Local, PinnedStorage}, storage::{DeviceStorage, DiskStorage, Local, PinnedStorage},
}; };
...@@ -28,6 +30,59 @@ use std::{any::Any, sync::Arc}; ...@@ -28,6 +30,59 @@ use std::{any::Any, sync::Arc};
type LocalBlock<S, M> = Block<S, locality::Local, M>; type LocalBlock<S, M> = Block<S, locality::Local, M>;
type LocalBlockDataList<S> = Vec<LocalBlockData<S>>; type LocalBlockDataList<S> = Vec<LocalBlockData<S>>;
/// A batching wrapper for connector transfers to prevent resource exhaustion.
/// Splits large transfers into smaller batches that can be handled by the resource pools.
#[derive(Clone, Debug)]
pub struct ConnectorTransferBatcher {
max_batch_size: usize,
}
impl ConnectorTransferBatcher {
pub fn new() -> Self {
Self {
max_batch_size: MAX_TRANSFER_BATCH_SIZE,
}
}
pub async fn execute_batched_transfer(
&self,
handler: &BlockTransferHandler,
request: BlockTransferRequest,
) -> Result<()> {
let blocks = request.blocks();
let num_blocks = blocks.len();
if num_blocks <= self.max_batch_size {
return handler.execute_transfer_direct(request).await;
}
let batches = blocks.chunks(self.max_batch_size);
let batch_futures: Vec<_> = batches
.map(|batch| {
let batch_request = BlockTransferRequest {
from_pool: *request.from_pool(),
to_pool: *request.to_pool(),
blocks: batch.to_vec(),
connector_req: None,
};
handler.execute_transfer_direct(batch_request)
})
.collect();
// Execute all batches concurrently
tracing::debug!("Executing {} batches concurrently", batch_futures.len());
match try_join_all(batch_futures).await {
Ok(_) => Ok(()),
Err(e) => {
tracing::error!("Batched connector transfer failed: {}", e);
Err(e)
}
}
}
}
/// A handler for all block transfers. Wraps a group of [`BlockTransferPoolManager`]s. /// A handler for all block transfers. Wraps a group of [`BlockTransferPoolManager`]s.
#[derive(Clone)] #[derive(Clone)]
pub struct BlockTransferHandler { pub struct BlockTransferHandler {
...@@ -36,6 +91,7 @@ pub struct BlockTransferHandler { ...@@ -36,6 +91,7 @@ pub struct BlockTransferHandler {
disk: Option<LocalBlockDataList<DiskStorage>>, disk: Option<LocalBlockDataList<DiskStorage>>,
context: Arc<TransferContext>, context: Arc<TransferContext>,
scheduler_client: Option<TransferSchedulerClient>, scheduler_client: Option<TransferSchedulerClient>,
batcher: ConnectorTransferBatcher,
// add worker-connector scheduler client here // add worker-connector scheduler client here
} }
...@@ -54,6 +110,7 @@ impl BlockTransferHandler { ...@@ -54,6 +110,7 @@ impl BlockTransferHandler {
disk: Self::get_local_data(disk_blocks), disk: Self::get_local_data(disk_blocks),
context, context,
scheduler_client, scheduler_client,
batcher: ConnectorTransferBatcher::new(),
}) })
} }
...@@ -122,7 +179,13 @@ impl BlockTransferHandler { ...@@ -122,7 +179,13 @@ impl BlockTransferHandler {
} }
} }
/// Execute transfer with batching to prevent resource exhaustion
pub async fn execute_transfer(&self, request: BlockTransferRequest) -> Result<()> { pub async fn execute_transfer(&self, request: BlockTransferRequest) -> Result<()> {
self.batcher.execute_batched_transfer(self, request).await
}
/// Execute transfer directly without batching (used by the batcher)
pub async fn execute_transfer_direct(&self, request: BlockTransferRequest) -> Result<()> {
tracing::debug!( tracing::debug!(
"Performing transfer of {} blocks from {:?} to {:?}", "Performing transfer of {} blocks from {:?} to {:?}",
request.blocks().len(), request.blocks().len(),
......
...@@ -11,9 +11,13 @@ use zmq::*; ...@@ -11,9 +11,13 @@ use zmq::*;
use crate::block_manager::{ use crate::block_manager::{
BasicMetadata, BlockMetadata, LayoutConfigBuilder, NixlLayout, Storage, BasicMetadata, BlockMetadata, LayoutConfigBuilder, NixlLayout, Storage,
block::{Block, layout_to_blocks, locality, transfer::TransferContext}, block::{
Block, layout_to_blocks, locality,
transfer::{PoolConfig, TransferContext},
},
connector::scheduler::TransferSchedulerClient, connector::scheduler::TransferSchedulerClient,
layout::LayoutType, layout::LayoutType,
offload::{MAX_CONCURRENT_TRANSFERS, MAX_TRANSFER_BATCH_SIZE},
storage::{DeviceAllocator, DeviceStorage, DiskAllocator, PinnedAllocator, torch::TorchTensor}, storage::{DeviceAllocator, DeviceStorage, DiskAllocator, PinnedAllocator, torch::TorchTensor},
}; };
...@@ -570,6 +574,14 @@ impl KvbmWorker { ...@@ -570,6 +574,14 @@ impl KvbmWorker {
let agent = build_agent(worker_id, leader_data.num_disk_blocks > 0)?; let agent = build_agent(worker_id, leader_data.num_disk_blocks > 0)?;
let pool_config = PoolConfig {
enable_pool: true,
max_concurrent_transfers: MAX_CONCURRENT_TRANSFERS,
max_transfer_batch_size: MAX_TRANSFER_BATCH_SIZE,
num_outer_components: device_layout.config().outer_dim,
num_layers: device_layout.config().num_layers,
};
let transfer_context = Arc::new(TransferContext::new( let transfer_context = Arc::new(TransferContext::new(
Arc::new(Some(agent)), Arc::new(Some(agent)),
DeviceAllocator::new(config.device_id) DeviceAllocator::new(config.device_id)
...@@ -578,6 +590,7 @@ impl KvbmWorker { ...@@ -578,6 +590,7 @@ impl KvbmWorker {
.new_stream() .new_stream()
.unwrap(), .unwrap(),
Handle::current(), Handle::current(),
Some(pool_config),
)); ));
// Build our device, host, and disk block lists. // Build our device, host, and disk block lists.
...@@ -622,7 +635,6 @@ impl KvbmWorker { ...@@ -622,7 +635,6 @@ impl KvbmWorker {
None None
}; };
// Create the handler for our active message worker.
let block_transfer_handler = BlockTransferHandler::new( let block_transfer_handler = BlockTransferHandler::new(
device_blocks, device_blocks,
host_blocks, host_blocks,
......
...@@ -34,12 +34,13 @@ ...@@ -34,12 +34,13 @@
use super::block::{ use super::block::{
BlockError, BlockMetadata, BlockState, ImmutableBlock, MutableBlock, BlockError, BlockMetadata, BlockState, ImmutableBlock, MutableBlock,
locality::LocalityProvider, transfer::TransferContext, locality::LocalityProvider,
transfer::{PoolConfig, TransferContext},
}; };
use super::metrics::{BlockManagerMetrics, PoolMetrics}; use super::metrics::{BlockManagerMetrics, PoolMetrics};
use super::pool::{BlockPool, BlockPoolError}; use super::pool::{BlockPool, BlockPoolError};
use super::storage::{Cuda, Storage}; use super::storage::{Cuda, Storage};
use super::{DeviceStorage, DiskStorage, PinnedStorage}; use super::{DeviceStorage, DiskStorage, KvManagerModelConfig, PinnedStorage};
use nixl_sys::Agent as NixlAgent; use nixl_sys::Agent as NixlAgent;
use std::sync::Arc; use std::sync::Arc;
use tokio::runtime::Handle; use tokio::runtime::Handle;
...@@ -63,8 +64,17 @@ use request::{BlockResult, OffloadRequest, OffloadRequestKey, OnboardRequest}; ...@@ -63,8 +64,17 @@ use request::{BlockResult, OffloadRequest, OffloadRequestKey, OnboardRequest};
use dynamo_runtime::utils::task::CriticalTaskExecutionHandle; use dynamo_runtime::utils::task::CriticalTaskExecutionHandle;
const MAX_CONCURRENT_TRANSFERS: usize = 4; pub const MAX_CONCURRENT_TRANSFERS: usize = 4;
const MAX_TRANSFER_BATCH_SIZE: usize = 16; pub const MAX_TRANSFER_BATCH_SIZE: usize = 16;
/// Configuration for creating an OffloadManager
pub struct OffloadManagerConfig {
pub nixl_agent: Arc<Option<NixlAgent>>,
pub async_rt_handle: Handle,
pub metrics: Arc<BlockManagerMetrics>,
pub cancellation_token: CancellationToken,
pub model_config: KvManagerModelConfig,
}
/// The offload manager handles all block transfers between different cache levels. /// The offload manager handles all block transfers between different cache levels.
pub struct OffloadManager<Locality: LocalityProvider, Metadata: BlockMetadata> { pub struct OffloadManager<Locality: LocalityProvider, Metadata: BlockMetadata> {
...@@ -94,10 +104,7 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata> ...@@ -94,10 +104,7 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
disk: Option<Arc<dyn BlockPool<DiskStorage, Locality, Metadata>>>, disk: Option<Arc<dyn BlockPool<DiskStorage, Locality, Metadata>>>,
host: Option<Arc<dyn BlockPool<PinnedStorage, Locality, Metadata>>>, host: Option<Arc<dyn BlockPool<PinnedStorage, Locality, Metadata>>>,
device: Option<Arc<dyn BlockPool<DeviceStorage, Locality, Metadata>>>, device: Option<Arc<dyn BlockPool<DeviceStorage, Locality, Metadata>>>,
nixl_agent: Arc<Option<NixlAgent>>, config: OffloadManagerConfig,
async_rt_handle: Handle,
metrics: Arc<BlockManagerMetrics>,
cancellation_token: CancellationToken,
) -> Result<Arc<Self>> { ) -> Result<Arc<Self>> {
let (device_offload_tx, device_offload_rx) = mpsc::unbounded_channel(); let (device_offload_tx, device_offload_rx) = mpsc::unbounded_channel();
let (host_offload_tx, host_offload_rx) = mpsc::unbounded_channel(); let (host_offload_tx, host_offload_rx) = mpsc::unbounded_channel();
...@@ -118,16 +125,25 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata> ...@@ -118,16 +125,25 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
let cuda_ctx = Cuda::device_or_create(0)?; let cuda_ctx = Cuda::device_or_create(0)?;
let pool_config = PoolConfig {
enable_pool: true,
max_concurrent_transfers: MAX_CONCURRENT_TRANSFERS,
max_transfer_batch_size: MAX_TRANSFER_BATCH_SIZE,
num_outer_components: config.model_config.outer_dim,
num_layers: config.model_config.num_layers,
};
// We want cuda offloads to happen in parallel with host onboards, so we need to use a different stream. // We want cuda offloads to happen in parallel with host onboards, so we need to use a different stream.
let device_offload_transfer_ctx = Arc::new(TransferContext::new( let device_offload_transfer_ctx = Arc::new(TransferContext::new(
nixl_agent.clone(), config.nixl_agent.clone(),
cuda_ctx.new_stream()?, cuda_ctx.new_stream()?,
async_rt_handle.clone(), config.async_rt_handle.clone(),
Some(pool_config),
)); ));
let device_metrics = metrics.pool("device"); let device_metrics = config.metrics.pool("device");
let host_metrics = metrics.pool("host"); let host_metrics = config.metrics.pool("host");
let disk_metrics = metrics.pool("disk"); let disk_metrics = config.metrics.pool("disk");
// Device -> Host offload // Device -> Host offload
let device_to_host_task = OffloadManager::offload_worker( let device_to_host_task = OffloadManager::offload_worker(
...@@ -138,30 +154,31 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata> ...@@ -138,30 +154,31 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
LocalTransferManager::new( LocalTransferManager::new(
device_offload_transfer_ctx, device_offload_transfer_ctx,
MAX_CONCURRENT_TRANSFERS, MAX_CONCURRENT_TRANSFERS,
&async_rt_handle, &config.async_rt_handle,
cancellation_token.clone(), config.cancellation_token.clone(),
device_metrics.clone(), device_metrics.clone(),
"offload_bw".to_string(), "offload_bw".to_string(),
)?, )?,
MAX_TRANSFER_BATCH_SIZE, MAX_TRANSFER_BATCH_SIZE,
&async_rt_handle, &config.async_rt_handle,
cancellation_token.clone(), config.cancellation_token.clone(),
)), )),
device_metrics.clone(), device_metrics.clone(),
cancellation_token.clone(), config.cancellation_token.clone(),
); );
CriticalTaskExecutionHandle::new_with_runtime( CriticalTaskExecutionHandle::new_with_runtime(
|_| device_to_host_task, |_| device_to_host_task,
cancellation_token.clone(), config.cancellation_token.clone(),
"Device -> Host offload worker", "Device -> Host offload worker",
&async_rt_handle, &config.async_rt_handle,
)? )?
.detach(); .detach();
let transfer_ctx = Arc::new(TransferContext::new( let transfer_ctx = Arc::new(TransferContext::new(
nixl_agent.clone(), config.nixl_agent.clone(),
cuda_ctx.new_stream()?, cuda_ctx.new_stream()?,
async_rt_handle.clone(), config.async_rt_handle.clone(),
None,
)); ));
// Host -> Disk offload // Host -> Disk offload
...@@ -173,23 +190,23 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata> ...@@ -173,23 +190,23 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
LocalTransferManager::new( LocalTransferManager::new(
transfer_ctx.clone(), transfer_ctx.clone(),
MAX_CONCURRENT_TRANSFERS, MAX_CONCURRENT_TRANSFERS,
&async_rt_handle, &config.async_rt_handle,
cancellation_token.clone(), config.cancellation_token.clone(),
host_metrics.clone(), host_metrics.clone(),
"offload_bw".to_string(), "offload_bw".to_string(),
)?, )?,
MAX_TRANSFER_BATCH_SIZE, MAX_TRANSFER_BATCH_SIZE,
&async_rt_handle, &config.async_rt_handle,
cancellation_token.clone(), config.cancellation_token.clone(),
)), )),
host_metrics.clone(), host_metrics.clone(),
cancellation_token.clone(), config.cancellation_token.clone(),
); );
CriticalTaskExecutionHandle::new_with_runtime( CriticalTaskExecutionHandle::new_with_runtime(
|_| host_to_disk_task, |_| host_to_disk_task,
cancellation_token.clone(), config.cancellation_token.clone(),
"Host -> Disk offload worker", "Host -> Disk offload worker",
&async_rt_handle, &config.async_rt_handle,
)? )?
.detach(); .detach();
...@@ -202,23 +219,23 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata> ...@@ -202,23 +219,23 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
LocalTransferManager::new( LocalTransferManager::new(
transfer_ctx.clone(), transfer_ctx.clone(),
MAX_CONCURRENT_TRANSFERS, MAX_CONCURRENT_TRANSFERS,
&async_rt_handle, &config.async_rt_handle,
cancellation_token.clone(), config.cancellation_token.clone(),
host_metrics.clone(), host_metrics.clone(),
"onboard_bw".to_string(), "onboard_bw".to_string(),
)?, )?,
MAX_TRANSFER_BATCH_SIZE, MAX_TRANSFER_BATCH_SIZE,
&async_rt_handle, &config.async_rt_handle,
cancellation_token.clone(), config.cancellation_token.clone(),
)), )),
host_metrics.clone(), host_metrics.clone(),
cancellation_token.clone(), config.cancellation_token.clone(),
); );
CriticalTaskExecutionHandle::new_with_runtime( CriticalTaskExecutionHandle::new_with_runtime(
|_| host_to_device_task, |_| host_to_device_task,
cancellation_token.clone(), config.cancellation_token.clone(),
"Host -> Device onboarding worker", "Host -> Device onboarding worker",
&async_rt_handle, &config.async_rt_handle,
)? )?
.detach(); .detach();
...@@ -231,23 +248,23 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata> ...@@ -231,23 +248,23 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
LocalTransferManager::new( LocalTransferManager::new(
transfer_ctx.clone(), transfer_ctx.clone(),
MAX_CONCURRENT_TRANSFERS, MAX_CONCURRENT_TRANSFERS,
&async_rt_handle, &config.async_rt_handle,
cancellation_token.clone(), config.cancellation_token.clone(),
disk_metrics.clone(), disk_metrics.clone(),
"onboard_bw".to_string(), "onboard_bw".to_string(),
)?, )?,
MAX_TRANSFER_BATCH_SIZE, MAX_TRANSFER_BATCH_SIZE,
&async_rt_handle, &config.async_rt_handle,
cancellation_token.clone(), config.cancellation_token.clone(),
)), )),
disk_metrics.clone(), disk_metrics.clone(),
cancellation_token.clone(), config.cancellation_token.clone(),
); );
CriticalTaskExecutionHandle::new_with_runtime( CriticalTaskExecutionHandle::new_with_runtime(
|_| disk_to_device_task, |_| disk_to_device_task,
cancellation_token.clone(), config.cancellation_token.clone(),
"Disk -> Device onboarding worker", "Disk -> Device onboarding worker",
&async_rt_handle, &config.async_rt_handle,
)? )?
.detach(); .detach();
...@@ -734,14 +751,27 @@ mod tests { ...@@ -734,14 +751,27 @@ mod tests {
let async_rt_handle = Handle::current(); let async_rt_handle = Handle::current();
let minimal_config = KvManagerModelConfig::builder()
.num_layers(config.num_layers)
.outer_dim(config.outer_dim) // K and V
.page_size(config.page_size) // Minimal page size
.inner_dim(config.inner_dim) // Small inner dim
.build()
.expect("Failed to build minimal config");
let config = OffloadManagerConfig {
nixl_agent: agent_arc,
async_rt_handle,
metrics: BlockManagerMetrics::new(&Arc::new(Registry::new()))?,
cancellation_token: CancellationToken::new(),
model_config: minimal_config,
};
let manager = OffloadManager::new( let manager = OffloadManager::new(
disk_pool.clone(), disk_pool.clone(),
host_pool.clone(), host_pool.clone(),
device_pool.clone(), device_pool.clone(),
agent_arc, config,
async_rt_handle,
BlockManagerMetrics::new(&Arc::new(Registry::new()))?,
CancellationToken::new(),
)?; )?;
Ok((manager, device_pool, host_pool, disk_pool)) Ok((manager, device_pool, host_pool, disk_pool))
......
...@@ -11,7 +11,7 @@ use crate::block_manager::offload::request::BlockResult; ...@@ -11,7 +11,7 @@ use crate::block_manager::offload::request::BlockResult;
use super::*; use super::*;
// use super::offload::OffloadManager; // use super::offload::{OffloadManager, OffloadManagerConfig};
use super::{ use super::{
block::{ block::{
Block, GlobalRegistry, ImmutableBlock, factory::LocalBlockDataFactory, Block, GlobalRegistry, ImmutableBlock, factory::LocalBlockDataFactory,
...@@ -20,7 +20,7 @@ use super::{ ...@@ -20,7 +20,7 @@ use super::{
config::NixlOptions, config::NixlOptions,
events::{EventManager, NullEventManager}, events::{EventManager, NullEventManager},
metrics::BlockManagerMetrics, metrics::BlockManagerMetrics,
offload::OffloadManager, offload::{OffloadManager, OffloadManagerConfig},
}; };
use derive_getters::Dissolve; use derive_getters::Dissolve;
use std::sync::Arc; use std::sync::Arc;
...@@ -103,6 +103,7 @@ impl<R: LogicalResources, Metadata: BlockMetadata> ...@@ -103,6 +103,7 @@ impl<R: LogicalResources, Metadata: BlockMetadata>
KvBlockManagerState<locality::Logical<R>, Metadata> KvBlockManagerState<locality::Logical<R>, Metadata>
{ {
pub async fn new(config: KvBlockManagerConfig, logical_resources: R) -> Result<Arc<Self>> { pub async fn new(config: KvBlockManagerConfig, logical_resources: R) -> Result<Arc<Self>> {
let model_config = config.model.clone();
let mut resources = Resources::new(config)?; let mut resources = Resources::new(config)?;
let block_data_factories = let block_data_factories =
logical::LogicalBlockFactories::new(&mut resources, logical_resources)?; logical::LogicalBlockFactories::new(&mut resources, logical_resources)?;
...@@ -145,14 +146,19 @@ impl<R: LogicalResources, Metadata: BlockMetadata> ...@@ -145,14 +146,19 @@ impl<R: LogicalResources, Metadata: BlockMetadata>
} }
}; };
let offload_config = OffloadManagerConfig {
nixl_agent: resources.nixl_agent.clone(),
async_rt_handle: resources.async_rt_handle.clone(),
metrics: resources.metrics.clone(),
cancellation_token: resources.cancellation_token.clone(),
model_config,
};
let offload_manager = OffloadManager::new( let offload_manager = OffloadManager::new(
disk_pool.clone(), disk_pool.clone(),
host_pool.clone(), host_pool.clone(),
device_pool.clone(), device_pool.clone(),
resources.nixl_agent.clone(), offload_config,
resources.async_rt_handle.clone(),
resources.metrics.clone(),
resources.cancellation_token.clone(),
)?; )?;
let resources = Arc::new(resources); let resources = Arc::new(resources);
...@@ -206,6 +212,7 @@ impl<R: LogicalResources, Metadata: BlockMetadata> ...@@ -206,6 +212,7 @@ impl<R: LogicalResources, Metadata: BlockMetadata>
// - this will allow us to use the locality abstraction to build our factories and block pools // - this will allow us to use the locality abstraction to build our factories and block pools
impl<Metadata: BlockMetadata> KvBlockManagerState<locality::Local, Metadata> { impl<Metadata: BlockMetadata> KvBlockManagerState<locality::Local, Metadata> {
pub async fn new(config: KvBlockManagerConfig) -> Result<Arc<Self>> { pub async fn new(config: KvBlockManagerConfig) -> Result<Arc<Self>> {
let model_config = config.model.clone();
let mut resources = Resources::new(config)?; let mut resources = Resources::new(config)?;
let block_data_factories = local::LocalBlockDataFactories::new(&mut resources)?; let block_data_factories = local::LocalBlockDataFactories::new(&mut resources)?;
...@@ -254,14 +261,19 @@ impl<Metadata: BlockMetadata> KvBlockManagerState<locality::Local, Metadata> { ...@@ -254,14 +261,19 @@ impl<Metadata: BlockMetadata> KvBlockManagerState<locality::Local, Metadata> {
local_block_set.set_nixl_metadata(nixl_agent.get_local_md()?); local_block_set.set_nixl_metadata(nixl_agent.get_local_md()?);
} }
let offload_config = OffloadManagerConfig {
nixl_agent: resources.nixl_agent.clone(),
async_rt_handle: resources.async_rt_handle.clone(),
metrics: resources.metrics.clone(),
cancellation_token: resources.cancellation_token.clone(),
model_config,
};
let offload_manager = OffloadManager::new( let offload_manager = OffloadManager::new(
disk_pool.clone(), disk_pool.clone(),
host_pool.clone(), host_pool.clone(),
device_pool.clone(), device_pool.clone(),
resources.nixl_agent.clone(), offload_config,
resources.async_rt_handle.clone(),
resources.metrics.clone(),
resources.cancellation_token.clone(),
)?; )?;
let resources = Arc::new(resources); let resources = Arc::new(resources);
......
...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
use std::collections::VecDeque; use std::collections::VecDeque;
use std::ops::{Deref, DerefMut}; use std::ops::{Deref, DerefMut};
use std::sync::Arc; use std::sync::Arc;
use std::sync::Mutex; use std::sync::{Condvar, Mutex};
use tokio::sync::Notify; use tokio::sync::Notify;
/// Trait for items that can be returned to a pool /// Trait for items that can be returned to a pool
...@@ -268,9 +268,126 @@ impl<T: Returnable> Clone for Pool<T> { ...@@ -268,9 +268,126 @@ impl<T: Returnable> Clone for Pool<T> {
} }
} }
} }
pub struct SyncPool<T: Returnable> {
state: Arc<SyncPoolState<T>>,
capacity: usize,
}
struct SyncPoolState<T: Returnable> {
pool: Mutex<VecDeque<PoolValue<T>>>,
available: Condvar,
}
impl<T: Returnable> SyncPool<T> {
pub fn new(initial_elements: Vec<PoolValue<T>>) -> Self {
let capacity = initial_elements.len();
let pool = initial_elements
.into_iter()
.collect::<VecDeque<PoolValue<T>>>();
let state = Arc::new(SyncPoolState {
pool: Mutex::new(pool),
available: Condvar::new(),
});
Self { state, capacity }
}
pub fn new_direct(initial_elements: Vec<T>) -> Self {
let initial_values = initial_elements
.into_iter()
.map(PoolValue::from_direct)
.collect();
Self::new(initial_values)
}
pub fn try_acquire(&self) -> Option<SyncPoolItem<T>> {
let mut pool = self.state.pool.lock().unwrap();
pool.pop_front()
.map(|value| SyncPoolItem::new(value, self.state.clone()))
}
pub fn acquire_blocking(&self) -> SyncPoolItem<T> {
let mut pool = self.state.pool.lock().unwrap();
while pool.is_empty() {
tracing::debug!("SyncPool: waiting for available resource (pool empty)");
pool = self.state.available.wait(pool).unwrap();
tracing::debug!(
"SyncPool: woke up, checking pool again (size: {})",
pool.len()
);
}
let value = pool.pop_front().unwrap();
tracing::debug!("SyncPool: acquired resource, pool size now: {}", pool.len());
SyncPoolItem::new(value, self.state.clone())
}
pub fn capacity(&self) -> usize {
self.capacity
}
}
impl<T: Returnable> Clone for SyncPool<T> {
fn clone(&self) -> Self {
Self {
state: self.state.clone(),
capacity: self.capacity,
}
}
}
pub struct SyncPoolItem<T: Returnable> {
value: Option<PoolValue<T>>,
state: Arc<SyncPoolState<T>>,
}
impl<T: Returnable> SyncPoolItem<T> {
fn new(value: PoolValue<T>, state: Arc<SyncPoolState<T>>) -> Self {
Self {
value: Some(value),
state,
}
}
}
impl<T: Returnable> Deref for SyncPoolItem<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
self.value.as_ref().unwrap().get()
}
}
impl<T: Returnable> DerefMut for SyncPoolItem<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.value.as_mut().unwrap().get_mut()
}
}
impl<T: Returnable> Drop for SyncPoolItem<T> {
fn drop(&mut self) {
if let Some(mut value) = self.value.take() {
value.on_return();
let mut pool = self.state.pool.lock().unwrap();
pool.push_back(value);
tracing::debug!(
"SyncPool: returned resource, pool size now: {}, notifying waiters",
pool.len()
);
self.state.available.notify_one();
}
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
use tokio::time::{Duration, timeout}; use tokio::time::{Duration, timeout};
// Implement Returnable for u32 just for testing // Implement Returnable for u32 just for testing
...@@ -412,4 +529,145 @@ mod tests { ...@@ -412,4 +529,145 @@ mod tests {
// _token: /* can't create this */ // _token: /* can't create this */
// }; // };
} }
#[test]
fn test_sync_pool_basic_acquire_release() {
let initial_elements = vec![1u32, 2, 3];
let pool = SyncPool::new_direct(initial_elements);
// Try acquire (non-blocking)
let item1 = pool.try_acquire().unwrap();
assert_eq!(*item1, 1);
let item2 = pool.try_acquire().unwrap();
assert_eq!(*item2, 2);
// Pool should have one item left
let item3 = pool.try_acquire().unwrap();
assert_eq!(*item3, 3);
// Pool should be empty now
assert!(pool.try_acquire().is_none());
// Drop items to return to pool
drop(item1); // Returns 0 (after on_return)
drop(item2); // Returns 0 (after on_return)
drop(item3); // Returns 0 (after on_return)
// Should be able to acquire again
let item = pool.try_acquire().unwrap();
assert_eq!(*item, 0); // Value was reset by on_return
}
#[test]
fn test_sync_pool_blocking_acquire() {
let pool = SyncPool::new_direct(vec![42u32]);
// Acquire the only item
let item = pool.acquire_blocking();
assert_eq!(*item, 42);
let pool_clone = pool.clone();
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = counter.clone();
// Spawn a thread that will wait for the item
let handle = thread::spawn(move || {
counter_clone.store(1, Ordering::SeqCst); // Mark that we're waiting
let waiting_item = pool_clone.acquire_blocking(); // This will block
counter_clone.store(2, Ordering::SeqCst); // Mark that we got it
assert_eq!(*waiting_item, 0); // Should be reset value
});
// Give the thread time to start waiting
thread::sleep(Duration::from_millis(10));
assert_eq!(counter.load(Ordering::SeqCst), 1); // Should be waiting
// Drop the item to trigger condvar notification
drop(item);
// Wait for the other thread to complete
handle.join().unwrap();
assert_eq!(counter.load(Ordering::SeqCst), 2); // Should have completed
}
#[test]
fn test_sync_pool_multiple_waiters() {
let pool = SyncPool::new_direct(vec![1u32]);
// Acquire the only item
let item = pool.acquire_blocking();
let pool_clone1 = pool.clone();
let pool_clone2 = pool.clone();
let completed = Arc::new(AtomicUsize::new(0));
let completed1 = completed.clone();
let completed2 = completed.clone();
// Spawn two threads that will wait
let handle1 = thread::spawn(move || {
let _item = pool_clone1.acquire_blocking(); // Will block
completed1.fetch_add(1, Ordering::SeqCst); // Mark completion
// Item drops here, potentially waking thread 2
});
let handle2 = thread::spawn(move || {
let _item = pool_clone2.acquire_blocking(); // Will block
completed2.fetch_add(1, Ordering::SeqCst); // Mark completion
// Item drops here
});
// Give threads time to start waiting
thread::sleep(Duration::from_millis(50));
assert_eq!(completed.load(Ordering::SeqCst), 0); // Both should be waiting
// Drop the item - should wake exactly one thread
drop(item);
// Wait for both threads to complete
handle1.join().unwrap();
handle2.join().unwrap();
// Both threads should have completed eventually
assert_eq!(completed.load(Ordering::SeqCst), 2);
}
#[test]
fn test_sync_vs_async_pool_compatibility() {
// Test that both pool types work with the same Returnable type
let async_pool = Pool::new_direct(vec![1u32, 2u32]);
let sync_pool = SyncPool::new_direct(vec![3u32, 4u32]);
// Both should work
let async_rt = tokio::runtime::Runtime::new().unwrap();
let async_item = async_rt.block_on(async { async_pool.acquire().await });
assert_eq!(*async_item, 1);
let sync_item = sync_pool.acquire_blocking();
assert_eq!(*sync_item, 3);
// Both use the same Returnable trait
drop(async_item); // Should reset to 0
drop(sync_item); // Should reset to 0
}
#[test]
fn test_sync_pool_condvar_performance() {
let pool = SyncPool::new_direct((0..10).collect::<Vec<u32>>());
let start = std::time::Instant::now();
// Rapid acquire/release cycles
for _ in 0..1000 {
let item = pool.acquire_blocking();
// Simulate minimal work
let _ = *item + 1;
drop(item); // Return to pool
}
let duration = start.elapsed();
println!("1000 sync pool operations took {:?}", duration);
// Should be fast (< 10ms on most systems)
assert!(duration < Duration::from_millis(50));
}
} }
...@@ -996,12 +996,6 @@ class TestDeterminism: ...@@ -996,12 +996,6 @@ class TestDeterminism:
# Wait for 10 seconds to make sure all transfers are complete # Wait for 10 seconds to make sure all transfers are complete
time.sleep(10) time.sleep(10)
# Reset cache
print("\n" + "=" * 50)
print("RESETTING CACHE AFTER WARMUP")
print("=" * 50)
tester.reset_prefix_cache()
time.sleep(10)
else: else:
print("Skipping warmup (already done in previous phase)") print("Skipping warmup (already done in previous phase)")
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment