Unverified Commit b94ecd16 authored by Ziqi Fan's avatar Ziqi Fan Committed by GitHub
Browse files

feat: add disk offloading filtering in KVBM (#3532)


Signed-off-by: default avatarZiqi Fan <ziqif@nvidia.com>
Co-authored-by: default avatarcoderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
parent 7a7d397c
...@@ -58,6 +58,11 @@ export DYN_KVBM_DISK_CACHE_GB=8 ...@@ -58,6 +58,11 @@ export DYN_KVBM_DISK_CACHE_GB=8
export DYN_KVBM_LEADER_WORKER_INIT_TIMEOUT_SECS=1200 export DYN_KVBM_LEADER_WORKER_INIT_TIMEOUT_SECS=1200
``` ```
> [!NOTE]
> When disk offloading is enabled, to extend SSD lifespan, disk offload filtering would be enabled by default. The current policy is only offloading KV blocks from CPU to disk if the blocks have frequency equal or more than `2`. Frequency is determined via doubling on cache hit (init with 1) and decrement by 1 on each time decay step.
>
> To disable disk offload filtering, set `DYN_KVBM_DISABLE_DISK_OFFLOAD_FILTER` to true or 1.
```bash ```bash
# write an example LLM API config # write an example LLM API config
# Note: Disable partial reuse "enable_partial_reuse: false" in the LLM API config’s "kv_connector_config" to increase offloading cache hits. # Note: Disable partial reuse "enable_partial_reuse: false" in the LLM API config’s "kv_connector_config" to increase offloading cache hits.
......
...@@ -61,6 +61,11 @@ cd $DYNAMO_HOME/components/backends/vllm ...@@ -61,6 +61,11 @@ cd $DYNAMO_HOME/components/backends/vllm
> [!NOTE] > [!NOTE]
> `DYN_KVBM_CPU_CACHE_GB` must be set and `DYN_KVBM_DISK_CACHE_GB` is optional. > `DYN_KVBM_CPU_CACHE_GB` must be set and `DYN_KVBM_DISK_CACHE_GB` is optional.
> [!NOTE]
> When disk offloading is enabled, to extend SSD lifespan, disk offload filtering would be enabled by default. The current policy is only offloading KV blocks from CPU to disk if the blocks have frequency equal or more than `2`. Frequency is determined via doubling on cache hit (init with 1) and decrement by 1 on each time decay step.
>
> To disable disk offload filtering, set `DYN_KVBM_DISABLE_DISK_OFFLOAD_FILTER` to true or 1.
### Sample Request ### Sample Request
```bash ```bash
# make a request to verify vLLM with KVBM is started up correctly # make a request to verify vLLM with KVBM is started up correctly
......
...@@ -6,8 +6,11 @@ use anyhow::Result; ...@@ -6,8 +6,11 @@ use anyhow::Result;
use dynamo_llm::block_manager::block::{ use dynamo_llm::block_manager::block::{
data::logical::distributed_leader_worker::DistributedLeaderWorkerResources, locality::Logical, data::logical::distributed_leader_worker::DistributedLeaderWorkerResources, locality::Logical,
}; };
use dynamo_llm::block_manager::offload::filter::FrequencyFilter;
use dynamo_llm::block_manager::{BasicMetadata, BlockParallelismStrategy}; use dynamo_llm::block_manager::{BasicMetadata, BlockParallelismStrategy};
use pyo3::PyResult; use pyo3::PyResult;
use std::time::Duration;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
mod controller; mod controller;
...@@ -94,13 +97,34 @@ impl BlockManager { ...@@ -94,13 +97,34 @@ impl BlockManager {
if leader.num_host_blocks() > 0 { if leader.num_host_blocks() > 0 {
tracing::info!("Using {} host blocks", leader.num_host_blocks()); tracing::info!("Using {} host blocks", leader.num_host_blocks());
config = config.host_layout(
let mut host_layout_config =
dynamo_llm::block_manager::KvManagerLayoutConfig::builder() dynamo_llm::block_manager::KvManagerLayoutConfig::builder()
.num_blocks(leader.num_host_blocks()) .num_blocks(leader.num_host_blocks())
.logical(Some(BlockParallelismStrategy::LeaderWorkerSharded)) .logical(Some(BlockParallelismStrategy::LeaderWorkerSharded));
.build()
.map_err(to_pyerr)?, if leader.num_disk_blocks() > 0 {
); // Check if disk offload filter is disabled via environment variable
let disable_filter = std::env::var("DYN_KVBM_DISABLE_DISK_OFFLOAD_FILTER")
.map(|v| v == "true" || v == "1")
.unwrap_or(false);
if !disable_filter {
// TODO: These values seem plausible for most use cases, but we need to figure out a better way to configure them.
let frequency_filter = FrequencyFilter::new(
2,
Duration::from_secs(600),
1e6 as usize,
cancel_token.child_token(),
rt.inner().runtime().primary().clone(),
)
.map_err(to_pyerr)?;
host_layout_config =
host_layout_config.offload_filter(Some(Arc::new(frequency_filter)));
}
}
config = config.host_layout(host_layout_config.build().map_err(to_pyerr)?);
} }
if leader.num_disk_blocks() > 0 { if leader.num_disk_blocks() > 0 {
......
...@@ -33,7 +33,7 @@ pub use block::{ ...@@ -33,7 +33,7 @@ pub use block::{
pub use config::*; pub use config::*;
pub use layout::{LayoutConfig, LayoutConfigBuilder, LayoutError, LayoutType, nixl::NixlLayout}; pub use layout::{LayoutConfig, LayoutConfigBuilder, LayoutError, LayoutType, nixl::NixlLayout};
pub use offload::request::BlockResult; pub use offload::{filter::OffloadFilter, request::BlockResult};
pub use pool::{BlockPool, ManagedBlockPool}; pub use pool::{BlockPool, ManagedBlockPool};
pub use storage::{ pub use storage::{
DeviceStorage, DiskStorage, PinnedStorage, Storage, StorageAllocator, DeviceStorage, DiskStorage, PinnedStorage, Storage, StorageAllocator,
......
...@@ -6,7 +6,7 @@ pub mod logical; ...@@ -6,7 +6,7 @@ pub mod logical;
pub use local::LocalBlockDataFactory; pub use local::LocalBlockDataFactory;
use crate::block_manager::LayoutConfig; use crate::block_manager::{LayoutConfig, OffloadFilter};
use super::*; use super::*;
...@@ -47,6 +47,9 @@ pub trait BlockFactory<S: Storage, L: LocalityProvider> { ...@@ -47,6 +47,9 @@ pub trait BlockFactory<S: Storage, L: LocalityProvider> {
/// Get the layout configuration information /// Get the layout configuration information
fn layout_config(&self) -> &LayoutConfig; fn layout_config(&self) -> &LayoutConfig;
/// Get the offload filter for this factory
fn offload_filter(&self) -> Option<Arc<dyn OffloadFilter>>;
} }
/// Extension trait for factories that can produce all blocks at once /// Extension trait for factories that can produce all blocks at once
......
...@@ -8,6 +8,7 @@ pub struct LocalBlockDataFactory<S: Storage> { ...@@ -8,6 +8,7 @@ pub struct LocalBlockDataFactory<S: Storage> {
layout: Arc<dyn BlockLayout<StorageType = S>>, layout: Arc<dyn BlockLayout<StorageType = S>>,
block_set_idx: usize, block_set_idx: usize,
worker_id: WorkerID, worker_id: WorkerID,
offload_filter: Option<Arc<dyn OffloadFilter>>,
} }
impl<S: Storage> LocalBlockDataFactory<S> { impl<S: Storage> LocalBlockDataFactory<S> {
...@@ -15,11 +16,13 @@ impl<S: Storage> LocalBlockDataFactory<S> { ...@@ -15,11 +16,13 @@ impl<S: Storage> LocalBlockDataFactory<S> {
layout: Arc<dyn BlockLayout<StorageType = S>>, layout: Arc<dyn BlockLayout<StorageType = S>>,
block_set_idx: usize, block_set_idx: usize,
worker_id: WorkerID, worker_id: WorkerID,
offload_filter: Option<Arc<dyn OffloadFilter>>,
) -> Self { ) -> Self {
Self { Self {
layout, layout,
block_set_idx, block_set_idx,
worker_id, worker_id,
offload_filter,
} }
} }
} }
...@@ -46,6 +49,10 @@ impl<S: Storage> BlockFactory<S, locality::Local> for LocalBlockDataFactory<S> { ...@@ -46,6 +49,10 @@ impl<S: Storage> BlockFactory<S, locality::Local> for LocalBlockDataFactory<S> {
fn layout_config(&self) -> &LayoutConfig { fn layout_config(&self) -> &LayoutConfig {
self.layout.config() self.layout.config()
} }
fn offload_filter(&self) -> Option<Arc<dyn OffloadFilter>> {
self.offload_filter.clone()
}
} }
impl<S: Storage> IntoBlocks<S, locality::Local> for LocalBlockDataFactory<S> {} impl<S: Storage> IntoBlocks<S, locality::Local> for LocalBlockDataFactory<S> {}
...@@ -2,7 +2,10 @@ ...@@ -2,7 +2,10 @@
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
use super::*; use super::*;
use crate::block_manager::locality::{Logical, LogicalBlockData, LogicalResources}; use crate::block_manager::{
OffloadFilter,
locality::{Logical, LogicalBlockData, LogicalResources},
};
#[derive(Debug)] #[derive(Debug)]
pub struct LogicalBlockFactory<S: Storage, R: LogicalResources> { pub struct LogicalBlockFactory<S: Storage, R: LogicalResources> {
...@@ -12,6 +15,7 @@ pub struct LogicalBlockFactory<S: Storage, R: LogicalResources> { ...@@ -12,6 +15,7 @@ pub struct LogicalBlockFactory<S: Storage, R: LogicalResources> {
resources: Arc<R>, resources: Arc<R>,
storage_type: StorageType, storage_type: StorageType,
storage: std::marker::PhantomData<S>, storage: std::marker::PhantomData<S>,
offload_filter: Option<Arc<dyn OffloadFilter>>,
} }
impl<S: Storage, R: LogicalResources> LogicalBlockFactory<S, R> { impl<S: Storage, R: LogicalResources> LogicalBlockFactory<S, R> {
...@@ -21,6 +25,7 @@ impl<S: Storage, R: LogicalResources> LogicalBlockFactory<S, R> { ...@@ -21,6 +25,7 @@ impl<S: Storage, R: LogicalResources> LogicalBlockFactory<S, R> {
worker_id: WorkerID, worker_id: WorkerID,
resources: Arc<R>, resources: Arc<R>,
storage_type: StorageType, storage_type: StorageType,
offload_filter: Option<Arc<dyn OffloadFilter>>,
) -> Self { ) -> Self {
Self { Self {
layout_config, layout_config,
...@@ -29,6 +34,7 @@ impl<S: Storage, R: LogicalResources> LogicalBlockFactory<S, R> { ...@@ -29,6 +34,7 @@ impl<S: Storage, R: LogicalResources> LogicalBlockFactory<S, R> {
resources, resources,
storage_type, storage_type,
storage: std::marker::PhantomData, storage: std::marker::PhantomData,
offload_filter,
} }
} }
} }
...@@ -57,6 +63,10 @@ impl<S: Storage, R: LogicalResources> BlockFactory<S, Logical<R>> for LogicalBlo ...@@ -57,6 +63,10 @@ impl<S: Storage, R: LogicalResources> BlockFactory<S, Logical<R>> for LogicalBlo
fn layout_config(&self) -> &LayoutConfig { fn layout_config(&self) -> &LayoutConfig {
&self.layout_config &self.layout_config
} }
fn offload_filter(&self) -> Option<Arc<dyn OffloadFilter>> {
self.offload_filter.clone()
}
} }
impl<S: Storage, R: LogicalResources> IntoBlocks<S, Logical<R>> for LogicalBlockFactory<S, R> {} impl<S: Storage, R: LogicalResources> IntoBlocks<S, Logical<R>> for LogicalBlockFactory<S, R> {}
...@@ -89,6 +99,7 @@ mod tests { ...@@ -89,6 +99,7 @@ mod tests {
TEST_WORKER_ID, TEST_WORKER_ID,
Arc::new(NullResources), Arc::new(NullResources),
StorageType::Pinned, StorageType::Pinned,
None,
); );
let block_data = factory.create_block_data(0).unwrap(); let block_data = factory.create_block_data(0).unwrap();
......
...@@ -116,6 +116,11 @@ pub struct KvManagerLayoutConfig<S: Storage + NixlRegisterableStorage> { ...@@ -116,6 +116,11 @@ pub struct KvManagerLayoutConfig<S: Storage + NixlRegisterableStorage> {
/// The type of block parallelism strategy to use /// The type of block parallelism strategy to use
#[builder(default)] #[builder(default)]
pub logical: Option<BlockParallelismStrategy>, pub logical: Option<BlockParallelismStrategy>,
/// The offload filter to use (if any).
/// This dictates which blocks will be offloaded to the next-lowest cache level.
#[builder(default = "None")]
pub offload_filter: Option<Arc<dyn OffloadFilter>>,
} }
impl<S: Storage + NixlRegisterableStorage> KvManagerLayoutConfig<S> { impl<S: Storage + NixlRegisterableStorage> KvManagerLayoutConfig<S> {
......
...@@ -95,7 +95,7 @@ pub enum ResetResponse { ...@@ -95,7 +95,7 @@ pub enum ResetResponse {
ResetBlocks(ResetBlocksResponse), ResetBlocks(ResetBlocksResponse),
} }
#[cfg(all(test, feature = "testing-full"))] #[cfg(all(test, feature = "testing-etcd", feature = "testing-full"))]
mod tests { mod tests {
use crate::tokens::Tokens; use crate::tokens::Tokens;
......
...@@ -42,10 +42,12 @@ use super::pool::{BlockPool, BlockPoolError}; ...@@ -42,10 +42,12 @@ use super::pool::{BlockPool, BlockPoolError};
use super::storage::{Cuda, Storage}; use super::storage::{Cuda, Storage};
use super::{DeviceStorage, DiskStorage, KvManagerModelConfig, PinnedStorage}; use super::{DeviceStorage, DiskStorage, KvManagerModelConfig, PinnedStorage};
use nixl_sys::Agent as NixlAgent; use nixl_sys::Agent as NixlAgent;
use std::sync::Arc; use std::sync::{
Arc,
atomic::{AtomicU64, Ordering},
};
use tokio::runtime::Handle; use tokio::runtime::Handle;
use tokio::sync::{ use tokio::sync::{
Mutex,
mpsc::{self, error::TryRecvError}, mpsc::{self, error::TryRecvError},
oneshot, oneshot,
}; };
...@@ -56,12 +58,16 @@ use std::any::Any; ...@@ -56,12 +58,16 @@ use std::any::Any;
use std::collections::BTreeSet; use std::collections::BTreeSet;
pub mod filter;
mod pending; mod pending;
pub mod request; pub mod request;
use filter::OffloadFilter;
use pending::{LocalTransferManager, PendingTransfer, TransferBatcher, TransferManager}; use pending::{LocalTransferManager, PendingTransfer, TransferBatcher, TransferManager};
use request::{BlockResult, OffloadRequest, OffloadRequestKey, OnboardRequest}; use request::{BlockResult, OffloadRequest, OffloadRequestKey, OnboardRequest};
use derive_builder::Builder;
use derive_getters::Getters;
use dynamo_runtime::utils::task::CriticalTaskExecutionHandle; use dynamo_runtime::utils::task::CriticalTaskExecutionHandle;
pub const MAX_CONCURRENT_TRANSFERS: usize = 4; pub const MAX_CONCURRENT_TRANSFERS: usize = 4;
...@@ -94,16 +100,18 @@ pub struct OffloadManager<Locality: LocalityProvider, Metadata: BlockMetadata> { ...@@ -94,16 +100,18 @@ pub struct OffloadManager<Locality: LocalityProvider, Metadata: BlockMetadata> {
mpsc::UnboundedSender<OnboardRequest<DiskStorage, DeviceStorage, Locality, Metadata>>, mpsc::UnboundedSender<OnboardRequest<DiskStorage, DeviceStorage, Locality, Metadata>>,
/// An incrementing counter for offloaded blocks. Within the same priority, blocks with lower tick values are processed first. /// An incrementing counter for offloaded blocks. Within the same priority, blocks with lower tick values are processed first.
tick: Arc<Mutex<u64>>, tick: Arc<AtomicU64>,
} }
impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata> impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
OffloadManager<Locality, Metadata> OffloadManager<Locality, Metadata>
{ {
#[allow(clippy::too_many_arguments)]
pub fn new( pub fn new(
disk: Option<Arc<dyn BlockPool<DiskStorage, Locality, Metadata>>>, disk: Option<Arc<dyn BlockPool<DiskStorage, Locality, Metadata>>>,
host: Option<Arc<dyn BlockPool<PinnedStorage, Locality, Metadata>>>, host: Option<Arc<dyn BlockPool<PinnedStorage, Locality, Metadata>>>,
device: Option<Arc<dyn BlockPool<DeviceStorage, Locality, Metadata>>>, device: Option<Arc<dyn BlockPool<DeviceStorage, Locality, Metadata>>>,
filters: OffloadFilters,
config: OffloadManagerConfig, config: OffloadManagerConfig,
) -> Result<Arc<Self>> { ) -> Result<Arc<Self>> {
let (device_offload_tx, device_offload_rx) = mpsc::unbounded_channel(); let (device_offload_tx, device_offload_rx) = mpsc::unbounded_channel();
...@@ -120,7 +128,7 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata> ...@@ -120,7 +128,7 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
host_offload_tx, host_offload_tx,
host_onboard_tx, host_onboard_tx,
disk_onboard_tx, disk_onboard_tx,
tick: Arc::new(Mutex::new(0)), tick: Arc::new(AtomicU64::new(0)),
}); });
let cuda_ctx = Cuda::device_or_create(0)?; let cuda_ctx = Cuda::device_or_create(0)?;
...@@ -163,6 +171,7 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata> ...@@ -163,6 +171,7 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
&config.async_rt_handle, &config.async_rt_handle,
config.cancellation_token.clone(), config.cancellation_token.clone(),
)), )),
filters.device.clone(),
device_metrics.clone(), device_metrics.clone(),
config.cancellation_token.clone(), config.cancellation_token.clone(),
); );
...@@ -199,6 +208,7 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata> ...@@ -199,6 +208,7 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
&config.async_rt_handle, &config.async_rt_handle,
config.cancellation_token.clone(), config.cancellation_token.clone(),
)), )),
filters.host.clone(),
host_metrics.clone(), host_metrics.clone(),
config.cancellation_token.clone(), config.cancellation_token.clone(),
); );
...@@ -276,6 +286,7 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata> ...@@ -276,6 +286,7 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
target_pool: Option<Arc<dyn BlockPool<Target, Locality, Metadata>>>, target_pool: Option<Arc<dyn BlockPool<Target, Locality, Metadata>>>,
mut offload_rx: mpsc::UnboundedReceiver<OffloadRequest<Source, Locality, Metadata>>, mut offload_rx: mpsc::UnboundedReceiver<OffloadRequest<Source, Locality, Metadata>>,
transfer_manager: Arc<dyn TransferManager<Source, Target, Locality, Metadata>>, transfer_manager: Arc<dyn TransferManager<Source, Target, Locality, Metadata>>,
offload_filter: Option<Arc<dyn OffloadFilter>>,
pool_metrics: Arc<PoolMetrics>, pool_metrics: Arc<PoolMetrics>,
cancellation_token: CancellationToken, cancellation_token: CancellationToken,
) -> Result<()> { ) -> Result<()> {
...@@ -331,6 +342,12 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata> ...@@ -331,6 +342,12 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
continue; continue;
} }
if let Some(offload_filter) = offload_filter.as_ref()
&& !offload_filter.should_offload(request.sequence_hash)
{
continue;
}
let target_block = 'target_block: { let target_block = 'target_block: {
if let Ok(blocks) = target_pool.allocate_blocks(1).await if let Ok(blocks) = target_pool.allocate_blocks(1).await
&& let Some(block) = blocks.into_iter().next() && let Some(block) = blocks.into_iter().next()
...@@ -443,14 +460,11 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata> ...@@ -443,14 +460,11 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
} }
} }
let mut tick = self.tick.lock().await; let tick = self.tick.fetch_add(1, Ordering::Relaxed);
let key = OffloadRequestKey { let key = OffloadRequestKey {
priority, priority,
timestamp: *tick, timestamp: tick,
}; };
// Increment a counter for each block. Within the same priority, blocks with lower counter values are processed first.
*tick += 1;
drop(tick);
// This can get called by all pools, regardless of whether or not they have a place to offload to. // This can get called by all pools, regardless of whether or not they have a place to offload to.
// Because of this, we need to check the block type here. // Because of this, we need to check the block type here.
...@@ -584,6 +598,47 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata> ...@@ -584,6 +598,47 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
} }
} }
#[derive(Debug, Clone, Getters, Builder)]
#[builder(pattern = "owned", build_fn(validate = "Self::validate"))]
pub struct OffloadFilters {
#[builder(default)]
device: Option<Arc<dyn OffloadFilter>>,
#[builder(default)]
host: Option<Arc<dyn OffloadFilter>>,
#[builder(default)]
disk: Option<Arc<dyn OffloadFilter>>,
}
impl OffloadFilters {
pub fn builder() -> OffloadFiltersBuilder {
OffloadFiltersBuilder::default()
}
}
impl OffloadFiltersBuilder {
pub fn validate(&self) -> Result<(), String> {
if let Some(disk) = self.disk.as_ref()
&& disk.is_some()
{
return Err("Disk offload filter is not supported.".to_string());
}
let host_is_none = if let Some(host) = self.host.as_ref() {
host.is_none()
} else {
true
};
if host_is_none {
tracing::warn!(
"Host to Disk offload filter is not provided. All blocks in host will be offloaded to disk. This may result in excessive disk offloading and accelerated SSD degradation."
);
}
Ok(())
}
}
#[cfg(all(test, feature = "testing-cuda"))] #[cfg(all(test, feature = "testing-cuda"))]
mod tests { mod tests {
use super::*; use super::*;
...@@ -771,6 +826,7 @@ mod tests { ...@@ -771,6 +826,7 @@ mod tests {
disk_pool.clone(), disk_pool.clone(),
host_pool.clone(), host_pool.clone(),
device_pool.clone(), device_pool.clone(),
OffloadFilters::builder().build()?,
config, config,
)?; )?;
......
// SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::{Arc, Mutex, MutexGuard};
use tokio::runtime::Handle;
use tokio::sync::Notify;
use tokio::time::Duration;
use tokio_util::sync::CancellationToken;
use crate::tokens::SequenceHash;
use dynamo_runtime::utils::task::CriticalTaskExecutionHandle;
pub trait OffloadFilter: Send + Sync + Debug {
fn should_offload(&self, sequence_hash: SequenceHash) -> bool;
}
/// A filter that offloads blocks based on their frequency of use.
///
/// The frequency of use is tracked in a map, and the filter will offload blocks that have been used more than the minimum offload frequency.
///
/// The map is pruned periodically, and will be notified if the map is too large.
///
/// The overall strategy is to double the count on increment and decrement by 1 on each decay step.
#[derive(Debug, Clone)]
pub struct FrequencyFilter {
min_offload_frequency: i64,
frequency_map: Arc<Mutex<HashMap<SequenceHash, i64>>>,
max_num_entries: usize,
oversize_notify: Arc<Notify>,
}
impl FrequencyFilter {
pub fn new(
min_offload_frequency: i64,
flush_interval: Duration,
max_num_entries: usize,
cancel_token: CancellationToken,
runtime: Handle,
) -> anyhow::Result<Self> {
let frequency_map = Arc::new(Mutex::new(HashMap::new()));
let frequency_map_clone = frequency_map.clone();
let oversize_notify = Arc::new(Notify::new());
let oversize_notify_clone = oversize_notify.clone();
CriticalTaskExecutionHandle::new_with_runtime(
move |cancel_token| async move {
let mut interval = tokio::time::interval(flush_interval);
loop {
tokio::select! {
// Observe cancellation and exit the loop.
_ = cancel_token.cancelled() => {
break;
}
// Prune the frequency map upon the flush interval.
_ = interval.tick() => {
let mut frequency_map = frequency_map_clone.lock().unwrap();
Self::decrement_and_prune(&mut frequency_map);
}
// Trigger a prune if we're notified that the frequency map is too large.
_ = oversize_notify_clone.notified() => {
let mut frequency_map = frequency_map_clone.lock().unwrap();
// It may take multiple rounds of pruning to sufficiently reduce the size.
while frequency_map.len() > max_num_entries {
Self::decrement_and_prune(&mut frequency_map);
}
// Reset our flush interval.
interval.reset();
}
}
}
Ok(())
},
cancel_token,
"Frequency Decay Handler",
&runtime,
)?
.detach();
Ok(Self {
min_offload_frequency,
frequency_map,
max_num_entries,
oversize_notify,
})
}
fn decrement_and_prune(frequency_map: &mut MutexGuard<HashMap<SequenceHash, i64>>) {
// Decrement all values and prune the entries with value 0.
frequency_map.retain(|_, count| {
*count -= 1;
*count > 0
});
}
}
impl OffloadFilter for FrequencyFilter {
fn should_offload(&self, sequence_hash: SequenceHash) -> bool {
let mut frequency_map = self.frequency_map.lock().unwrap();
// Double the value of the entry, or initialize it to 1.
let entry = frequency_map
.entry(sequence_hash)
.and_modify(|count| {
*count = count.saturating_mul(2);
})
.or_insert(1);
let should_offload = *entry >= self.min_offload_frequency;
// Notify the offload manager that the frequency map is too large.
if frequency_map.len() > self.max_num_entries {
self.oversize_notify.notify_one();
}
should_offload
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_filter(min_offload_frequency: i64, max_num_entries: usize) -> FrequencyFilter {
let cancel_token = CancellationToken::new();
let runtime = Handle::current();
FrequencyFilter::new(
min_offload_frequency,
Duration::from_secs(3600),
max_num_entries,
cancel_token,
runtime,
)
.unwrap()
}
fn hash(x: u32) -> SequenceHash {
SequenceHash::from(x)
}
#[tokio::test]
async fn test_basic_frequency_filter() {
let filter = make_filter(2, 100);
assert!(!filter.should_offload(hash(0)));
assert!(filter.should_offload(hash(0)));
assert!(!filter.should_offload(hash(1)));
assert!(!filter.should_offload(hash(2)));
}
#[tokio::test]
async fn test_decay() {
let filter = make_filter(4, 2);
// Add the first hashes, and bump it up to 2.
assert!(!filter.should_offload(hash(0)));
assert!(!filter.should_offload(hash(0)));
// Add the second hash
assert!(!filter.should_offload(hash(1)));
assert!(!filter.should_offload(hash(1)));
// Now, the value of the first hash is 4, so we should offload it.
assert!(filter.should_offload(hash(0)));
// This will cause a decay.
assert!(!filter.should_offload(hash(2)));
tokio::time::sleep(Duration::from_millis(100)).await;
// Now, the priority of 1
assert!(!filter.should_offload(hash(1)));
}
#[tokio::test]
async fn test_time_based_decay() {
let cancel_token = CancellationToken::new();
let runtime = Handle::current();
let filter = FrequencyFilter::new(
4,
Duration::from_millis(250),
100,
cancel_token.clone(),
runtime,
)
.unwrap();
assert!(!filter.should_offload(hash(0)));
assert!(!filter.should_offload(hash(0)));
assert!(filter.should_offload(hash(0)));
tokio::time::sleep(Duration::from_millis(300)).await;
// The count should have decayed from 4 to 2.
{
let frequency_map = filter.frequency_map.lock().unwrap();
assert_eq!(*frequency_map.get(&hash(0)).unwrap(), 2);
}
tokio::time::sleep(Duration::from_millis(250)).await;
// The count should have decayed from 2 to 1, and should be pruned.
{
let frequency_map = filter.frequency_map.lock().unwrap();
assert_eq!(*frequency_map.get(&hash(0)).unwrap(), 1);
}
tokio::time::sleep(Duration::from_millis(250)).await;
// The count should have decayed from 1 to 0, and should be pruned.
{
let frequency_map = filter.frequency_map.lock().unwrap();
assert!(frequency_map.get(&hash(0)).is_none());
}
}
#[tokio::test]
async fn test_multi_prune_decay() {
let filter = make_filter(10, 2);
assert!(!filter.should_offload(hash(0)));
assert!(!filter.should_offload(hash(1)));
assert_eq!(filter.frequency_map.lock().unwrap().len(), 2);
assert!(!filter.should_offload(hash(2)));
tokio::time::sleep(Duration::from_millis(100)).await;
assert!(filter.frequency_map.lock().unwrap().is_empty());
assert!(!filter.should_offload(hash(3)));
tokio::time::sleep(Duration::from_millis(100)).await;
assert_eq!(filter.frequency_map.lock().unwrap().len(), 1);
}
}
...@@ -5,22 +5,22 @@ mod local; ...@@ -5,22 +5,22 @@ mod local;
mod logical; mod logical;
mod resources; mod resources;
use crate::block_manager::block::{MutableBlock, factory::IntoBlocks};
use crate::block_manager::locality::LogicalResources;
use crate::block_manager::offload::request::BlockResult;
use super::*; use super::*;
// use super::offload::{OffloadManager, OffloadManagerConfig}; // use super::offload::{OffloadManager, OffloadManagerConfig};
use super::{ use super::{
block::{ block::{
Block, GlobalRegistry, ImmutableBlock, factory::LocalBlockDataFactory, Block, GlobalRegistry, ImmutableBlock, MutableBlock, factory::IntoBlocks,
locality::LocalityProvider, factory::LocalBlockDataFactory, locality::LocalityProvider,
}, },
config::NixlOptions, config::NixlOptions,
events::{EventManager, NullEventManager}, events::{EventManager, NullEventManager},
locality::LogicalResources,
metrics::BlockManagerMetrics, metrics::BlockManagerMetrics,
offload::{OffloadManager, OffloadManagerConfig}, offload::{
OffloadFilters, OffloadManager, OffloadManagerConfig, filter::OffloadFilter,
request::BlockResult,
},
}; };
use derive_getters::Dissolve; use derive_getters::Dissolve;
use std::sync::Arc; use std::sync::Arc;
...@@ -110,42 +110,48 @@ impl<R: LogicalResources, Metadata: BlockMetadata> ...@@ -110,42 +110,48 @@ impl<R: LogicalResources, Metadata: BlockMetadata>
let (disk_factory, host_factory, device_factory) = block_data_factories.dissolve(); let (disk_factory, host_factory, device_factory) = block_data_factories.dissolve();
let (disk_pool, disk_blocks) = match disk_factory { let (disk_pool, disk_blocks, disk_offload_filter) = match disk_factory {
Some(factory) => { Some(factory) => {
let (pool, blocks) = let (pool, blocks, offload_filter) =
create_block_pool::<_, _, Metadata>(factory, &resources, "disk")?; create_block_pool::<_, _, Metadata>(factory, &resources, "disk")?;
(Some(pool), Some(blocks)) (Some(pool), Some(blocks), offload_filter)
} }
None => { None => {
tracing::debug!("No disk layout provided; will not allocate disk blocks."); tracing::debug!("No disk layout provided; will not allocate disk blocks.");
(None, None) (None, None, None)
} }
}; };
let (host_pool, host_blocks) = match host_factory { let (host_pool, host_blocks, host_offload_filter) = match host_factory {
Some(factory) => { Some(factory) => {
let (pool, blocks) = let (pool, blocks, offload_filter) =
create_block_pool::<_, _, Metadata>(factory, &resources, "host")?; create_block_pool::<_, _, Metadata>(factory, &resources, "host")?;
(Some(pool), Some(blocks)) (Some(pool), Some(blocks), offload_filter)
} }
None => { None => {
tracing::debug!("No host layout provided; will not allocate host blocks."); tracing::debug!("No host layout provided; will not allocate host blocks.");
(None, None) (None, None, None)
} }
}; };
let (device_pool, device_blocks) = match device_factory { let (device_pool, device_blocks, device_offload_filter) = match device_factory {
Some(factory) => { Some(factory) => {
let (pool, blocks) = let (pool, blocks, offload_filter) =
create_block_pool::<_, _, Metadata>(factory, &resources, "device")?; create_block_pool::<_, _, Metadata>(factory, &resources, "device")?;
(Some(pool), Some(blocks)) (Some(pool), Some(blocks), offload_filter)
} }
None => { None => {
tracing::debug!("No device layout provided; will not allocate device blocks."); tracing::debug!("No device layout provided; will not allocate device blocks.");
(None, None) (None, None, None)
} }
}; };
let offload_filters = OffloadFilters::builder()
.device(device_offload_filter)
.host(host_offload_filter)
.disk(disk_offload_filter)
.build()?;
let offload_config = OffloadManagerConfig { let offload_config = OffloadManagerConfig {
nixl_agent: resources.nixl_agent.clone(), nixl_agent: resources.nixl_agent.clone(),
async_rt_handle: resources.async_rt_handle.clone(), async_rt_handle: resources.async_rt_handle.clone(),
...@@ -158,6 +164,7 @@ impl<R: LogicalResources, Metadata: BlockMetadata> ...@@ -158,6 +164,7 @@ impl<R: LogicalResources, Metadata: BlockMetadata>
disk_pool.clone(), disk_pool.clone(),
host_pool.clone(), host_pool.clone(),
device_pool.clone(), device_pool.clone(),
offload_filters,
offload_config, offload_config,
)?; )?;
...@@ -219,39 +226,39 @@ impl<Metadata: BlockMetadata> KvBlockManagerState<locality::Local, Metadata> { ...@@ -219,39 +226,39 @@ impl<Metadata: BlockMetadata> KvBlockManagerState<locality::Local, Metadata> {
let (mut local_block_set, disk_factory, host_factory, device_factory) = let (mut local_block_set, disk_factory, host_factory, device_factory) =
block_data_factories.dissolve(); block_data_factories.dissolve();
let (disk_pool, disk_blocks) = match disk_factory { let (disk_pool, disk_blocks, disk_offload_filter) = match disk_factory {
Some(factory) => { Some(factory) => {
let (pool, blocks) = let (pool, blocks, offload_filter) =
create_block_pool::<_, _, Metadata>(factory, &resources, "disk")?; create_block_pool::<_, _, Metadata>(factory, &resources, "disk")?;
(Some(pool), Some(blocks)) (Some(pool), Some(blocks), offload_filter)
} }
None => { None => {
tracing::debug!("No disk layout provided; will not allocate disk blocks."); tracing::debug!("No disk layout provided; will not allocate disk blocks.");
(None, None) (None, None, None)
} }
}; };
let (host_pool, host_blocks) = match host_factory { let (host_pool, host_blocks, host_offload_filter) = match host_factory {
Some(factory) => { Some(factory) => {
let (pool, blocks) = let (pool, blocks, offload_filter) =
create_block_pool::<_, _, Metadata>(factory, &resources, "host")?; create_block_pool::<_, _, Metadata>(factory, &resources, "host")?;
(Some(pool), Some(blocks)) (Some(pool), Some(blocks), offload_filter)
} }
None => { None => {
tracing::debug!("No disk layout provided; will not allocate disk blocks."); tracing::debug!("No host layout provided; will not allocate host blocks.");
(None, None) (None, None, None)
} }
}; };
let (device_pool, device_blocks) = match device_factory { let (device_pool, device_blocks, device_offload_filter) = match device_factory {
Some(factory) => { Some(factory) => {
let (pool, blocks) = let (pool, blocks, offload_filter) =
create_block_pool::<_, _, Metadata>(factory, &resources, "disk")?; create_block_pool::<_, _, Metadata>(factory, &resources, "disk")?;
(Some(pool), Some(blocks)) (Some(pool), Some(blocks), offload_filter)
} }
None => { None => {
tracing::debug!("No disk layout provided; will not allocate disk blocks."); tracing::debug!("No device layout provided; will not allocate device blocks.");
(None, None) (None, None, None)
} }
}; };
...@@ -261,6 +268,12 @@ impl<Metadata: BlockMetadata> KvBlockManagerState<locality::Local, Metadata> { ...@@ -261,6 +268,12 @@ impl<Metadata: BlockMetadata> KvBlockManagerState<locality::Local, Metadata> {
local_block_set.set_nixl_metadata(nixl_agent.get_local_md()?); local_block_set.set_nixl_metadata(nixl_agent.get_local_md()?);
} }
let offload_filters = OffloadFilters::builder()
.device(device_offload_filter)
.host(host_offload_filter)
.disk(disk_offload_filter)
.build()?;
let offload_config = OffloadManagerConfig { let offload_config = OffloadManagerConfig {
nixl_agent: resources.nixl_agent.clone(), nixl_agent: resources.nixl_agent.clone(),
async_rt_handle: resources.async_rt_handle.clone(), async_rt_handle: resources.async_rt_handle.clone(),
...@@ -273,6 +286,7 @@ impl<Metadata: BlockMetadata> KvBlockManagerState<locality::Local, Metadata> { ...@@ -273,6 +286,7 @@ impl<Metadata: BlockMetadata> KvBlockManagerState<locality::Local, Metadata> {
disk_pool.clone(), disk_pool.clone(),
host_pool.clone(), host_pool.clone(),
device_pool.clone(), device_pool.clone(),
offload_filters,
offload_config, offload_config,
)?; )?;
...@@ -506,7 +520,11 @@ pub(crate) fn create_block_pool<S: Storage, L: LocalityProvider, M: BlockMetadat ...@@ -506,7 +520,11 @@ pub(crate) fn create_block_pool<S: Storage, L: LocalityProvider, M: BlockMetadat
factory: impl IntoBlocks<S, L>, factory: impl IntoBlocks<S, L>,
resources: &Resources, resources: &Resources,
pool_name: &str, pool_name: &str,
) -> Result<(Arc<dyn BlockPool<S, L, M>>, Vec<Block<S, L, M>>)> { ) -> Result<(
Arc<dyn BlockPool<S, L, M>>,
Vec<Block<S, L, M>>,
Option<Arc<dyn OffloadFilter>>,
)> {
let pool = ManagedBlockPool::<S, L, M>::builder() let pool = ManagedBlockPool::<S, L, M>::builder()
.cancel_token(resources.cancellation_token.clone()) .cancel_token(resources.cancellation_token.clone())
.global_registry(resources.global_registry.clone()) .global_registry(resources.global_registry.clone())
...@@ -515,8 +533,8 @@ pub(crate) fn create_block_pool<S: Storage, L: LocalityProvider, M: BlockMetadat ...@@ -515,8 +533,8 @@ pub(crate) fn create_block_pool<S: Storage, L: LocalityProvider, M: BlockMetadat
.pool_metrics(resources.metrics.pool(pool_name)) .pool_metrics(resources.metrics.pool(pool_name))
.build()?; .build()?;
let offload_filter = factory.offload_filter();
let blocks = factory.into_blocks()?; let blocks = factory.into_blocks()?;
Ok((Arc::new(pool), blocks))
}
// Block state operations moved to block.rs for better organization and private field access Ok((Arc::new(pool), blocks, offload_filter))
}
...@@ -26,6 +26,9 @@ impl LocalBlockDataFactories { ...@@ -26,6 +26,9 @@ impl LocalBlockDataFactories {
let device_factory = if let Some(config) = resources.config.device_layout.take() { let device_factory = if let Some(config) = resources.config.device_layout.take() {
next_block_set_idx += 1; next_block_set_idx += 1;
let offload_filter = config.offload_filter.clone();
tracing::debug!("Constructing device pool."); tracing::debug!("Constructing device pool.");
let layout = create_layout( let layout = create_layout(
layout_builder.clone(), layout_builder.clone(),
...@@ -37,6 +40,7 @@ impl LocalBlockDataFactories { ...@@ -37,6 +40,7 @@ impl LocalBlockDataFactories {
layout, layout,
next_block_set_idx, next_block_set_idx,
resources.worker_id, resources.worker_id,
offload_filter,
)) ))
} else { } else {
None None
...@@ -44,6 +48,9 @@ impl LocalBlockDataFactories { ...@@ -44,6 +48,9 @@ impl LocalBlockDataFactories {
let host_factory = if let Some(config) = resources.config.host_layout.take() { let host_factory = if let Some(config) = resources.config.host_layout.take() {
next_block_set_idx += 1; next_block_set_idx += 1;
let offload_filter = config.offload_filter.clone();
tracing::debug!("Constructing host pool."); tracing::debug!("Constructing host pool.");
let layout = create_layout( let layout = create_layout(
layout_builder.clone(), layout_builder.clone(),
...@@ -55,12 +62,15 @@ impl LocalBlockDataFactories { ...@@ -55,12 +62,15 @@ impl LocalBlockDataFactories {
layout, layout,
next_block_set_idx, next_block_set_idx,
resources.worker_id, resources.worker_id,
offload_filter,
)) ))
} else { } else {
None None
}; };
let disk_factory = if let Some(config) = resources.config.disk_layout.take() { let disk_factory = if let Some(config) = resources.config.disk_layout.take() {
let offload_filter = config.offload_filter.clone();
if resources.nixl_agent.is_none() { if resources.nixl_agent.is_none() {
tracing::warn!("NIXL is disabled; will not allocate disk blocks."); tracing::warn!("NIXL is disabled; will not allocate disk blocks.");
None None
...@@ -77,6 +87,7 @@ impl LocalBlockDataFactories { ...@@ -77,6 +87,7 @@ impl LocalBlockDataFactories {
layout, layout,
next_block_set_idx, next_block_set_idx,
resources.worker_id, resources.worker_id,
offload_filter,
)) ))
} }
} else { } else {
......
...@@ -28,6 +28,9 @@ impl<R: LogicalResources> LogicalBlockFactories<R> { ...@@ -28,6 +28,9 @@ impl<R: LogicalResources> LogicalBlockFactories<R> {
let device_factory = if let Some(config) = resources.config.device_layout.take() { let device_factory = if let Some(config) = resources.config.device_layout.take() {
next_block_set_idx += 1; next_block_set_idx += 1;
let offload_filter = config.offload_filter.clone();
let mut builder = layout_builder.clone(); let mut builder = layout_builder.clone();
let config = Arc::new(builder.num_blocks(config.num_blocks).build()?); let config = Arc::new(builder.num_blocks(config.num_blocks).build()?);
...@@ -37,6 +40,7 @@ impl<R: LogicalResources> LogicalBlockFactories<R> { ...@@ -37,6 +40,7 @@ impl<R: LogicalResources> LogicalBlockFactories<R> {
resources.worker_id, resources.worker_id,
logical_resources.clone(), logical_resources.clone(),
StorageType::Device(0), StorageType::Device(0),
offload_filter,
); );
Some(factory) Some(factory)
...@@ -46,6 +50,9 @@ impl<R: LogicalResources> LogicalBlockFactories<R> { ...@@ -46,6 +50,9 @@ impl<R: LogicalResources> LogicalBlockFactories<R> {
let host_factory = if let Some(config) = resources.config.host_layout.take() { let host_factory = if let Some(config) = resources.config.host_layout.take() {
next_block_set_idx += 1; next_block_set_idx += 1;
let offload_filter = config.offload_filter.clone();
let mut builder = layout_builder.clone(); let mut builder = layout_builder.clone();
let config = Arc::new(builder.num_blocks(config.num_blocks).build()?); let config = Arc::new(builder.num_blocks(config.num_blocks).build()?);
let factory = LogicalBlockFactory::new( let factory = LogicalBlockFactory::new(
...@@ -54,6 +61,7 @@ impl<R: LogicalResources> LogicalBlockFactories<R> { ...@@ -54,6 +61,7 @@ impl<R: LogicalResources> LogicalBlockFactories<R> {
resources.worker_id, resources.worker_id,
logical_resources.clone(), logical_resources.clone(),
StorageType::Pinned, StorageType::Pinned,
offload_filter,
); );
Some(factory) Some(factory)
...@@ -63,6 +71,9 @@ impl<R: LogicalResources> LogicalBlockFactories<R> { ...@@ -63,6 +71,9 @@ impl<R: LogicalResources> LogicalBlockFactories<R> {
let disk_factory = if let Some(config) = resources.config.disk_layout.take() { let disk_factory = if let Some(config) = resources.config.disk_layout.take() {
next_block_set_idx += 1; next_block_set_idx += 1;
let offload_filter = config.offload_filter.clone();
let mut builder = layout_builder.clone(); let mut builder = layout_builder.clone();
let config = Arc::new(builder.num_blocks(config.num_blocks).build()?); let config = Arc::new(builder.num_blocks(config.num_blocks).build()?);
let factory = LogicalBlockFactory::new( let factory = LogicalBlockFactory::new(
...@@ -71,6 +82,7 @@ impl<R: LogicalResources> LogicalBlockFactories<R> { ...@@ -71,6 +82,7 @@ impl<R: LogicalResources> LogicalBlockFactories<R> {
resources.worker_id, resources.worker_id,
logical_resources.clone(), logical_resources.clone(),
StorageType::Disk(0), StorageType::Disk(0),
offload_filter,
); );
Some(factory) Some(factory)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment