Unverified Commit d5840cda authored by Ziqi Fan's avatar Ziqi Fan Committed by GitHub
Browse files

feat: enable KVBM GPU offload to Disk bypassing CPU (#3510)


Signed-off-by: default avatarZiqi Fan <ziqif@nvidia.com>
parent 9ae98ed7
......@@ -19,7 +19,7 @@
"editable": true,
"fiscalYearStartMonth": 0,
"graphTooltip": 0,
"id": 1,
"id": 3,
"links": [],
"panels": [
{
......@@ -336,13 +336,109 @@
"title": "Offload Blocks - Host to Disk",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "P1809F7CD0C75ACF3"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"barWidthFactor": 0.6,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 18
},
"id": 12,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"hideZeros": false,
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "12.0.1",
"targets": [
{
"disableTextWrap": false,
"editorMode": "code",
"expr": "kvbm_offload_blocks_d2d",
"fullMetaSearch": false,
"includeNullMetadata": true,
"legendFormat": "__auto",
"range": true,
"refId": "A",
"useBackend": false
}
],
"title": "Offload Blocks - Device to Disk (Bypass Host)",
"type": "timeseries"
},
{
"collapsed": false,
"gridPos": {
"h": 1,
"w": 24,
"x": 0,
"y": 18
"y": 26
},
"id": 6,
"panels": [],
......@@ -412,7 +508,7 @@
"h": 8,
"w": 12,
"x": 0,
"y": 19
"y": 27
},
"id": 4,
"options": {
......@@ -508,7 +604,7 @@
"h": 8,
"w": 12,
"x": 12,
"y": 19
"y": 27
},
"id": 8,
"options": {
......@@ -550,12 +646,12 @@
"list": []
},
"time": {
"from": "now-30m",
"from": "now-15m",
"to": "now"
},
"timepicker": {},
"timezone": "browser",
"title": "KVBM Dashboard",
"uid": "3f679257-70a5-402c-92b4-05382337b548",
"version": 14
"version": 4
}
\ No newline at end of file
......@@ -25,7 +25,6 @@ To learn what KVBM is, please check [here](kvbm_architecture.md)
> - Ensure that `etcd` and `nats` are running before starting.
> - KVBM does not currently support CUDA graphs in TensorRT-LLM.
> - KVBM only supports TensorRT-LLM’s PyTorch backend.
> - To enable disk cache offloading, you must first enable a CPU memory cache offloading.
> - Disable partial reuse `enable_partial_reuse: false` in the LLM API config’s `kv_connector_config` to increase offloading cache hits.
> - KVBM requires TensorRT-LLM v1.1.0rc5 or newer.
> - Enabling KVBM metrics with TensorRT-LLM is still a work in progress.
......@@ -44,14 +43,25 @@ docker compose -f deploy/docker-compose.yml up -d
# launch the container
./container/run.sh --framework trtllm -it --mount-workspace --use-nixl-gds
# enable kv offloading to CPU memory
# Configure KVBM cache tiers (choose one of the following options):
# Option 1: CPU cache only (GPU -> CPU offloading)
# 4 means 4GB of pinned CPU memory would be used
export DYN_KVBM_CPU_CACHE_GB=4
# enable kv offloading to disk. Note: To enable disk cache offloading, you must first enable a CPU memory cache offloading.
# Option 2: Both CPU and Disk cache (GPU -> CPU -> Disk tiered offloading)
export DYN_KVBM_CPU_CACHE_GB=4
# 8 means 8GB of disk would be used
export DYN_KVBM_DISK_CACHE_GB=8
# [Experimental] Option 3: Disk cache only (GPU -> Disk direct offloading, bypassing CPU)
# NOTE: this option is only experimental and it might give out the best performance.
# NOTE: disk offload filtering is not support when using this option.
export DYN_KVBM_DISK_CACHE_GB=8
# Note: You can also use DYN_KVBM_CPU_CACHE_OVERRIDE_NUM_BLOCKS or
# DYN_KVBM_DISK_CACHE_OVERRIDE_NUM_BLOCKS to specify exact block counts instead of GB
# Allocating memory and disk storage can take some time.
# We recommend setting a higher timeout for leader–worker initialization.
# 1200 means 1200 seconds timeout
......
......@@ -59,7 +59,25 @@ cd $DYNAMO_HOME/components/backends/vllm
> To tune the size of CPU or disk cache, set `DYN_KVBM_CPU_CACHE_GB` and `DYN_KVBM_DISK_CACHE_GB` accordingly. We only set `DYN_KVBM_CPU_CACHE_GB=20` in both scripts above.
> [!NOTE]
> `DYN_KVBM_CPU_CACHE_GB` must be set and `DYN_KVBM_DISK_CACHE_GB` is optional.
> Configure KVBM cache tiers (choose one of the following options):
> ```bash
> # Option 1: CPU cache only (GPU -> CPU offloading)
> # 4 means 4GB of pinned CPU memory would be used
> export DYN_KVBM_CPU_CACHE_GB=4
>
> # Option 2: Both CPU and Disk cache (GPU -> CPU -> Disk tiered offloading)
> export DYN_KVBM_CPU_CACHE_GB=4
> # 8 means 8GB of disk would be used
> export DYN_KVBM_DISK_CACHE_GB=8
>
> # [Experimental] Option 3: Disk cache only (GPU -> Disk direct offloading, bypassing CPU)
> # NOTE: this option is only experimental and it might give out the best performance.
> # NOTE: disk offload filtering is not support when using this option.
> export DYN_KVBM_DISK_CACHE_GB=8
> ```
>
> You can also use "DYN_KVBM_CPU_CACHE_OVERRIDE_NUM_BLOCKS" or
> "DYN_KVBM_DISK_CACHE_OVERRIDE_NUM_BLOCKS" to specify exact block counts instead of GB
> [!NOTE]
> When disk offloading is enabled, to extend SSD lifespan, disk offload filtering would be enabled by default. The current policy is only offloading KV blocks from CPU to disk if the blocks have frequency equal or more than `2`. Frequency is determined via doubling on cache hit (init with 1) and decrement by 1 on each time decay step.
......
......@@ -5,8 +5,9 @@ use std::{any::Any, cmp::max, sync::Arc};
use dynamo_llm::{
block_manager::{
Storage,
BlockPool, NixlRegisterableStorage, Storage,
block::{BlockMetadata, locality::LocalityProvider},
config::should_bypass_cpu_cache,
connector::protocol::{LeaderTransferRequest, RequestType, TransferType},
distributed::{BlockTransferPool, BlockTransferRequest, KvbmLeader},
},
......@@ -1282,42 +1283,98 @@ async fn process_offload_request(
leader: &Arc<KvbmLeader>,
kvbm_metrics: KvbmMetrics,
) -> anyhow::Result<()> {
let request_id = &offload_req.request_id;
let operation_id = &offload_req.operation_id;
let request_id = offload_req.request_id.clone();
let operation_id = offload_req.operation_id;
tracing::debug!(
"Processing offload request for {} blocks",
offload_req.block_ids.len()
);
// 1. Acquire mutable host blocks
let host_blocks = block_manager
.host()
.unwrap()
// Determine if we should bypass CPU memory (G2) and offload directly from GPU (G1) to Disk (G3)
let bypass_cpu_mem = should_bypass_cpu_cache();
if bypass_cpu_mem {
// Direct G1 -> G3 path (Device to Disk, bypassing Host)
kvbm_metrics
.offload_blocks_d2d
.inc_by(offload_req.block_ids.len() as u64);
tracing::debug!(
request_id = %request_id,
operation_id = %operation_id,
"offloading directly to disk (bypassing host)"
);
process_offload_to_storage(
offload_req,
block_manager.disk().unwrap(),
BlockTransferPool::Disk,
leader,
&request_id,
&operation_id,
"disk",
)
.await?;
} else {
// Standard path: G1 -> G2 (Device to Host)
kvbm_metrics
.offload_blocks_d2h
.inc_by(offload_req.block_ids.len() as u64);
process_offload_to_storage(
offload_req,
block_manager.host().unwrap(),
BlockTransferPool::Host,
leader,
&request_id,
&operation_id,
"host",
)
.await?;
}
Ok(())
}
async fn process_offload_to_storage<S, L, M>(
offload_req: LocalOffloadRequest,
storage_pool: &dyn BlockPool<S, L, M>,
transfer_pool: BlockTransferPool,
leader: &Arc<KvbmLeader>,
request_id: &str,
operation_id: &uuid::Uuid,
storage_name: &str,
) -> anyhow::Result<()>
where
S: Storage + NixlRegisterableStorage,
L: LocalityProvider,
M: BlockMetadata,
{
// 1. Acquire mutable blocks
let blocks = storage_pool
.allocate_blocks(offload_req.block_ids.len())
.await?;
let token_blocks = offload_req.token_blocks;
let host_block_ids: Vec<usize> = host_blocks.iter().map(|b| b.block_id()).collect();
let allocated_block_ids: Vec<usize> = blocks.iter().map(|b| b.block_id()).collect();
let block_pairs: Vec<(usize, usize)> = offload_req
.block_ids
.into_iter()
.zip(host_block_ids.into_iter())
.zip(allocated_block_ids.into_iter())
.collect();
tracing::debug!(
request_id = request_id,
operation_id = %operation_id,
"offload - stage 1 complete"
"offload to {} - stage 1 complete",
storage_name
);
// 2. Apply token blocks
// create an iterator over the mutable blocks zipped with the token blocks
let mut blocks_to_register = Vec::new();
let zipped_blocks = host_blocks.into_iter().zip(token_blocks.into_iter());
let zipped_blocks = blocks.into_iter().zip(token_blocks.into_iter());
// apply the token blocks to the mutable blocks
for (mut mutable_block, token_block) in zipped_blocks {
mutable_block
.apply_token_block(token_block.clone())
......@@ -1328,14 +1385,14 @@ async fn process_offload_request(
tracing::debug!(
request_id = request_id,
operation_id = %operation_id,
"offload - stage 2 complete"
"offload to {} - stage 2 complete",
storage_name
);
// 3. Issue the offload request using `leader`
let block_xfer_req = BlockTransferRequest {
from_pool: BlockTransferPool::Device,
to_pool: BlockTransferPool::Host,
to_pool: transfer_pool,
blocks: block_pairs,
connector_req: Some(LeaderTransferRequest {
request_id: offload_req.request_id.clone(),
......@@ -1348,13 +1405,17 @@ async fn process_offload_request(
tracing::debug!(
request_id = request_id,
operation_id = %operation_id,
"offload - stage 3 complete"
"offload to {} - stage 3 complete",
storage_name
);
// 4. Wait for the offload request to complete
match notify_receiver.await {
Ok(_) => {
tracing::debug!("Offloading transfer completed successfully");
tracing::debug!(
"Offloading transfer to {} completed successfully",
storage_name
);
}
Err(_) => {
return Err(anyhow::anyhow!(
......@@ -1365,26 +1426,21 @@ async fn process_offload_request(
tracing::debug!(
request_id = request_id,
operation_id = %operation_id,
"offload - stage 4 complete"
"offload to {} - stage 4 complete",
storage_name
);
kvbm_metrics
.offload_blocks_d2h
.inc_by(blocks_to_register.len() as u64);
// 5. Register the mutable blocks
let immutable_blocks = block_manager
.host()
.unwrap()
.register_blocks(blocks_to_register)
.await?;
let immutable_blocks = storage_pool.register_blocks(blocks_to_register).await?;
tracing::debug!(
request_id = request_id,
operation_id = %operation_id,
"registered {} blocks",
immutable_blocks.len()
"registered {} blocks to {}",
immutable_blocks.len(),
storage_name
);
Ok(())
}
......
......@@ -211,3 +211,38 @@ impl KvBlockManagerConfig {
KvBlockManagerConfigBuilder::default()
}
}
/// Determines if CPU memory (G2) should be bypassed for direct G1->G3 (Device->Disk) offloading.
///
/// Returns `true` if:
/// - Disk cache env vars are set (`DYN_KVBM_DISK_CACHE_GB` or `DYN_KVBM_DISK_CACHE_OVERRIDE_NUM_BLOCKS`)
/// AND their values are non-zero
/// - AND CPU cache env vars are NOT set (`DYN_KVBM_CPU_CACHE_GB` or `DYN_KVBM_CPU_CACHE_OVERRIDE_NUM_BLOCKS`)
/// OR their values are zero (treated as not set)
pub fn should_bypass_cpu_cache() -> bool {
let cpu_cache_gb_set = std::env::var("DYN_KVBM_CPU_CACHE_GB")
.ok()
.and_then(|v| v.parse::<u64>().ok())
.map(|v| v > 0)
.unwrap_or(false);
let cpu_cache_override_set = std::env::var("DYN_KVBM_CPU_CACHE_OVERRIDE_NUM_BLOCKS")
.ok()
.and_then(|v| v.parse::<usize>().ok())
.map(|v| v > 0)
.unwrap_or(false);
let disk_cache_gb_set = std::env::var("DYN_KVBM_DISK_CACHE_GB")
.ok()
.and_then(|v| v.parse::<u64>().ok())
.map(|v| v > 0)
.unwrap_or(false);
let disk_cache_override_set = std::env::var("DYN_KVBM_DISK_CACHE_OVERRIDE_NUM_BLOCKS")
.ok()
.and_then(|v| v.parse::<usize>().ok())
.map(|v| v > 0)
.unwrap_or(false);
let cpu_cache_set = cpu_cache_gb_set || cpu_cache_override_set;
let disk_cache_set = disk_cache_gb_set || disk_cache_override_set;
disk_cache_set && !cpu_cache_set
}
......@@ -81,29 +81,23 @@ impl KvbmLeaderConfig {
pub fn sanity_check(&self) -> anyhow::Result<()> {
let cpu = &self.host_blocks_config;
let disk = &self.disk_blocks_config;
if cpu.num_blocks_overriden == 0 && cpu.cache_size_in_gb == 0.0 {
if disk.num_blocks_overriden == 0 && disk.cache_size_in_gb == 0.0 {
let cpu_configured = cpu.num_blocks_overriden > 0 || cpu.cache_size_in_gb > 0.0;
let disk_configured = disk.num_blocks_overriden > 0 || disk.cache_size_in_gb > 0.0;
if !cpu_configured && !disk_configured {
panic!(
"KVBM Configuration Error: No CPU memory configured.\n\
"KVBM Configuration Error: At least one cache tier must be configured.\n\
\n\
To fix this, set one of the following environment variables:\n\
Configure CPU cache (G2) for CPU memory offloading:\n\
• DYN_KVBM_CPU_CACHE_GB=<size_in_gb> (e.g., DYN_KVBM_CPU_CACHE_GB=4)\n\
• DYN_KVBM_CPU_CACHE_OVERRIDE_NUM_BLOCKS=<num_blocks> (e.g., DYN_KVBM_CPU_CACHE_OVERRIDE_NUM_BLOCKS=1000)\n\
\n\
Example: export DYN_KVBM_CPU_CACHE_GB=4"
);
} else {
panic!(
"KVBM Configuration Error: CPU memory must be configured before disk memory.\n\
OR configure disk cache (G3) for direct GPU->Disk offloading:\n\
• DYN_KVBM_DISK_CACHE_GB=<size_in_gb> (e.g., DYN_KVBM_DISK_CACHE_GB=8)\n\
• DYN_KVBM_DISK_CACHE_OVERRIDE_NUM_BLOCKS=<num_blocks>\n\
\n\
To fix this, set one of the following environment variables:\n\
• DYN_KVBM_CPU_CACHE_GB=<size_in_gb> (e.g., DYN_KVBM_CPU_CACHE_GB=4)\n\
• DYN_KVBM_CPU_CACHE_OVERRIDE_NUM_BLOCKS=<num_blocks> (e.g., DYN_KVBM_CPU_CACHE_OVERRIDE_NUM_BLOCKS=1000)\n\
\n\
Example: export DYN_KVBM_CPU_CACHE_GB=4"
Note: If only disk cache is configured, KVBM will offload directly from GPU (G1) to Disk (G3), bypassing CPU memory (G2)."
);
}
}
Ok(())
}
}
......
......@@ -197,6 +197,7 @@ impl BlockTransferHandler {
let notify = match (request.from_pool(), request.to_pool()) {
(Device, Host) => self.begin_transfer(&self.device, &self.host, request).await,
(Device, Disk) => self.begin_transfer(&self.device, &self.disk, request).await,
(Host, Device) => self.begin_transfer(&self.host, &self.device, request).await,
(Host, Disk) => self.begin_transfer(&self.host, &self.disk, request).await,
(Disk, Device) => self.begin_transfer(&self.disk, &self.device, request).await,
......
......@@ -4,8 +4,8 @@
use axum::Router;
use dynamo_runtime::metrics::prometheus_names::{
kvbm::{
MATCHED_TOKENS, OFFLOAD_BLOCKS_D2H, OFFLOAD_BLOCKS_H2D, ONBOARD_BLOCKS_D2D,
ONBOARD_BLOCKS_H2D,
MATCHED_TOKENS, OFFLOAD_BLOCKS_D2D, OFFLOAD_BLOCKS_D2H, OFFLOAD_BLOCKS_H2D,
ONBOARD_BLOCKS_D2D, ONBOARD_BLOCKS_H2D,
},
sanitize_prometheus_name,
};
......@@ -23,6 +23,9 @@ pub struct KvbmMetrics {
// number of blocks offloaded from host to disk
pub offload_blocks_h2d: IntCounter,
// number of blocks offloaded from device to disk (bypassing host memory)
pub offload_blocks_d2d: IntCounter,
// number of blocks onboarded from host to device
pub onboard_blocks_h2d: IntCounter,
......@@ -54,6 +57,13 @@ impl KvbmMetrics {
&[],
)
.unwrap();
let offload_blocks_d2d = mr
.create_intcounter(
OFFLOAD_BLOCKS_D2D,
"The number of offload blocks from device to disk (bypassing host memory)",
&[],
)
.unwrap();
let onboard_blocks_h2d = mr
.create_intcounter(
ONBOARD_BLOCKS_H2D,
......@@ -77,6 +87,7 @@ impl KvbmMetrics {
return Self {
offload_blocks_d2h,
offload_blocks_h2d,
offload_blocks_d2d,
onboard_blocks_h2d,
onboard_blocks_d2d,
matched_tokens,
......@@ -130,6 +141,7 @@ impl KvbmMetrics {
Self {
offload_blocks_d2h,
offload_blocks_h2d,
offload_blocks_d2d,
onboard_blocks_h2d,
onboard_blocks_d2d,
matched_tokens,
......
......@@ -80,6 +80,8 @@ pub struct OffloadManagerConfig {
pub model_config: KvManagerModelConfig,
/// Optional KVBM-level metrics for tracking offload/onboard operations
pub kvbm_metrics: Option<crate::block_manager::metrics_kvbm::KvbmMetrics>,
/// If true, offload directly from device (G1) to disk (G3), bypassing host (G2)
pub bypass_cpu_mem: bool,
}
/// The offload manager handles all block transfers between different cache levels.
......@@ -93,6 +95,10 @@ pub struct OffloadManager<Locality: LocalityProvider, Metadata: BlockMetadata> {
device_offload_tx: mpsc::UnboundedSender<OffloadRequest<DeviceStorage, Locality, Metadata>>,
host_offload_tx: mpsc::UnboundedSender<OffloadRequest<PinnedStorage, Locality, Metadata>>,
/// Queue of device-to-disk direct offloading requests (bypass CPU memory)
device_to_disk_offload_tx:
mpsc::UnboundedSender<OffloadRequest<DeviceStorage, Locality, Metadata>>,
/// Queue of pending onboarding requests.
host_onboard_tx:
mpsc::UnboundedSender<OnboardRequest<PinnedStorage, DeviceStorage, Locality, Metadata>>,
......@@ -101,6 +107,9 @@ pub struct OffloadManager<Locality: LocalityProvider, Metadata: BlockMetadata> {
/// An incrementing counter for offloaded blocks. Within the same priority, blocks with lower tick values are processed first.
tick: Arc<AtomicU64>,
/// If true, offload directly from device (G1) to disk (G3), bypassing host (G2)
bypass_cpu_mem: bool,
}
impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
......@@ -116,6 +125,7 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
) -> Result<Arc<Self>> {
let (device_offload_tx, device_offload_rx) = mpsc::unbounded_channel();
let (host_offload_tx, host_offload_rx) = mpsc::unbounded_channel();
let (device_to_disk_offload_tx, device_to_disk_offload_rx) = mpsc::unbounded_channel();
let (host_onboard_tx, host_onboard_rx) = mpsc::unbounded_channel();
let (disk_onboard_tx, disk_onboard_rx) = mpsc::unbounded_channel();
......@@ -126,9 +136,11 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
device,
device_offload_tx,
host_offload_tx,
device_to_disk_offload_tx,
host_onboard_tx,
disk_onboard_tx,
tick: Arc::new(AtomicU64::new(0)),
bypass_cpu_mem: config.bypass_cpu_mem,
});
let cuda_ctx = Cuda::device_or_create(0)?;
......@@ -270,6 +282,43 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
)?
.detach();
// Device -> Disk direct offload (bypass CPU memory)
if config.bypass_cpu_mem {
tracing::info!(
"G1->G3 direct offload enabled: Device will offload directly to Disk, bypassing Host memory (CPU cache disabled)"
);
let device_to_disk_task = OffloadManager::offload_worker(
this.device.clone(),
this.disk.clone(),
device_to_disk_offload_rx,
Arc::new(TransferBatcher::new(
LocalTransferManager::new(
transfer_ctx.clone(),
MAX_CONCURRENT_TRANSFERS,
&config.async_rt_handle,
config.cancellation_token.clone(),
)?,
MAX_TRANSFER_BATCH_SIZE,
&config.async_rt_handle,
config.cancellation_token.clone(),
)),
filters.device.clone(),
config
.kvbm_metrics
.as_ref()
.map(|m| m.offload_blocks_d2d.clone()),
config.cancellation_token.clone(),
);
CriticalTaskExecutionHandle::new_with_runtime(
|_| device_to_disk_task,
config.cancellation_token.clone(),
"Device -> Disk direct offload worker (bypass CPU)",
&config.async_rt_handle,
)?
.detach();
}
Ok(this)
}
......@@ -459,7 +508,26 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
if let Some(device_block) =
any_block.downcast_ref::<ImmutableBlock<DeviceStorage, Locality, Metadata>>()
{
// The host pool doesn't exist, so we can't offload to it.
// Check if we should bypass CPU memory and go directly to disk
if self.bypass_cpu_mem && self.disk.is_some() {
// Offload directly from Device (G1) to Disk (G3), bypassing Host (G2)
if self.device_to_disk_offload_tx.is_closed() {
return Ok(());
}
let request = OffloadRequest {
block: Arc::downgrade(device_block.mutable_block()),
sequence_hash: device_block.sequence_hash(),
key,
};
tracing::debug!(
"Offloading device block {} directly to disk (bypassing host memory)",
device_block.sequence_hash()
);
self.device_to_disk_offload_tx.send(request).unwrap();
} else {
// Standard path: Device (G1) -> Host (G2)
if self.device_offload_tx.is_closed() {
return Ok(());
}
......@@ -471,10 +539,11 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
};
self.device_offload_tx.send(request).unwrap();
}
} else if let Some(host_block) =
any_block.downcast_ref::<ImmutableBlock<PinnedStorage, Locality, Metadata>>()
{
// The disk pool doesn't exist, so we can't offload to it.
// Host (G2) -> Disk (G3) offload
if self.host_offload_tx.is_closed() {
return Ok(());
}
......@@ -725,6 +794,7 @@ mod tests {
inner_dim,
LayoutType::FullyContiguous,
BlockRegistrationDuplicationSetting::Disabled,
false,
)
}
......@@ -736,6 +806,7 @@ mod tests {
inner_dim: Option<usize>,
layout_type: LayoutType,
duplication_setting: BlockRegistrationDuplicationSetting,
bypass_cpu_mem: bool,
) -> Result<(
Arc<OffloadManager<Local, BasicMetadata>>,
DevicePool,
......@@ -805,6 +876,7 @@ mod tests {
cancellation_token: CancellationToken::new(),
model_config: minimal_config,
kvbm_metrics: None,
bypass_cpu_mem,
};
let manager = OffloadManager::new(
......@@ -996,6 +1068,7 @@ mod tests {
None,
layout_type,
BlockRegistrationDuplicationSetting::Disabled,
false,
)?;
let device_pool = device_pool.as_ref().unwrap();
......@@ -1037,6 +1110,71 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn test_offload_device_to_disk_bypass_cpu() -> Result<()> {
let (offload_manager, device_pool, host_pool, disk_pool) = build_pools_with_layout(
4,
Some(4),
Some(4),
None,
LayoutType::FullyContiguous,
BlockRegistrationDuplicationSetting::Disabled,
true,
)?;
let device_pool = device_pool.as_ref().unwrap();
let host_pool = host_pool.as_ref().unwrap();
let disk_pool = disk_pool.as_ref().unwrap();
// Create a block and register it with the offload manager
let block = completed_block(device_pool, [0, 1, 2, 3]).await?;
let immutable_device_block = device_pool
.register_blocks(vec![block])
.await?
.into_iter()
.next()
.ok_or(anyhow::anyhow!("Failed to register block"))?;
populate_block(&immutable_device_block, 42)?;
// Synchronize ALL CUDA streams to ensure populate_block completes before offload starts
// This is critical because cudaMemset uses the default stream, but GDS transfer uses a different stream
unsafe {
cudaDeviceSynchronize().result()?;
}
// Offloads should only go to G3 directly since bypass_cpu_mem is true in offload_manager config
offload_manager.offload(&immutable_device_block, 0).await?;
// Wait for it to be processed.
// TODO: This is a bit of a hack, and may lead to non-deterministic behavior.
// In theory, the offload + memcpy should take much less time than this.
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
// Check that the block exists in the host pool
let disk_blocks = disk_pool
.match_sequence_hashes(vec![immutable_device_block.sequence_hash()].as_slice())
.await?;
assert_eq!(disk_blocks.len(), 1);
assert_eq!(
disk_blocks[0].sequence_hash(),
immutable_device_block.sequence_hash()
);
check_block_contents(&immutable_device_block, &disk_blocks[0], 42)?;
let host_blocks = host_pool
.match_sequence_hashes(vec![immutable_device_block.sequence_hash()].as_slice())
.await?;
// since host is bypassed, there should be no host blocks
assert_eq!(host_blocks.len(), 0);
Ok(())
}
#[tokio::test]
async fn test_no_host_blocks_available() -> Result<()> {
let (offload_manager, device_pool, host_pool, _) = build_pools(4, Some(4), None, None)?;
......@@ -1098,6 +1236,7 @@ mod tests {
None,
layout_type,
BlockRegistrationDuplicationSetting::Disabled,
false,
)?;
let device_pool = device_pool.as_ref().unwrap();
......@@ -1163,6 +1302,7 @@ mod tests {
None,
layout_type,
BlockRegistrationDuplicationSetting::Disabled,
false,
)?;
let device_pool = device_pool.as_ref().unwrap();
......@@ -1291,6 +1431,7 @@ mod tests {
None,
layout_type,
BlockRegistrationDuplicationSetting::Disabled,
false,
)?;
let host_pool = host_pool.as_ref().unwrap();
......@@ -1337,6 +1478,7 @@ mod tests {
None,
layout_type,
BlockRegistrationDuplicationSetting::Disabled,
false,
)?;
let device_pool = device_pool.as_ref().unwrap();
......@@ -1387,6 +1529,7 @@ mod tests {
None,
layout_type,
BlockRegistrationDuplicationSetting::Disabled,
false,
)?;
let disk_pool = disk_pool.as_ref().unwrap();
......@@ -1496,6 +1639,7 @@ mod tests {
Some(GDS_ALIGNMENT), // Use GDS-friendly alignment
layout_type,
BlockRegistrationDuplicationSetting::Disabled,
false,
)?;
let host_pool = host_pool.as_ref().unwrap();
......@@ -1602,6 +1746,7 @@ mod tests {
None, // inner_dim
LayoutType::FullyContiguous,
BlockRegistrationDuplicationSetting::Disabled,
false,
)?;
let disk_pool = disk_pool
......@@ -1672,6 +1817,7 @@ mod tests {
None,
LayoutType::FullyContiguous,
BlockRegistrationDuplicationSetting::Disabled,
false,
);
match result {
......@@ -1702,6 +1848,7 @@ mod tests {
None, // inner_dim
LayoutType::FullyContiguous,
BlockRegistrationDuplicationSetting::Disabled,
false,
);
// This should succeed, but we'll test behavior under constrained conditions
......@@ -1737,6 +1884,7 @@ mod tests {
Some(4096), // GDS-friendly alignment
LayoutType::FullyContiguous,
BlockRegistrationDuplicationSetting::Disabled,
false,
)?;
let host_pool = host_pool.as_ref().unwrap();
......@@ -1811,6 +1959,7 @@ mod tests {
outer_contiguous: false,
}, // Most complex
BlockRegistrationDuplicationSetting::Disabled,
false,
)
}
......
......@@ -142,6 +142,9 @@ impl<R: LogicalResources, Metadata: BlockMetadata>
}
};
// Determine if we should bypass CPU memory (G2) and offload directly from GPU (G1) to Disk (G3)
let bypass_cpu_mem = config::should_bypass_cpu_cache();
let offload_filters = OffloadFilters::builder()
.device(device_offload_filter)
.host(host_offload_filter)
......@@ -154,6 +157,7 @@ impl<R: LogicalResources, Metadata: BlockMetadata>
cancellation_token: resources.cancellation_token.clone(),
model_config,
kvbm_metrics: resources.config.kvbm_metrics.clone(),
bypass_cpu_mem,
};
let offload_manager = OffloadManager::new(
......@@ -270,12 +274,16 @@ impl<Metadata: BlockMetadata> KvBlockManagerState<locality::Local, Metadata> {
.disk(disk_offload_filter)
.build()?;
// Determine if we should bypass CPU memory (G2) and offload directly from GPU (G1) to Disk (G3)
let bypass_cpu_mem = config::should_bypass_cpu_cache();
let offload_config = OffloadManagerConfig {
nixl_agent: resources.nixl_agent.clone(),
async_rt_handle: resources.async_rt_handle.clone(),
cancellation_token: resources.cancellation_token.clone(),
model_config,
kvbm_metrics: resources.config.kvbm_metrics.clone(),
bypass_cpu_mem,
};
let offload_manager = OffloadManager::new(
......
......@@ -325,6 +325,9 @@ pub mod kvbm {
/// The number of offload blocks from host to disk
pub const OFFLOAD_BLOCKS_H2D: &str = "offload_blocks_h2d";
/// The number of offload blocks from device to disk (bypassing host memory)
pub const OFFLOAD_BLOCKS_D2D: &str = "offload_blocks_d2d";
/// The number of onboard blocks from host to device
pub const ONBOARD_BLOCKS_H2D: &str = "onboard_blocks_h2d";
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment