// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-License-Identifier: Apache-2.0 //! Policy trait and built-in implementations for offload filtering. //! //! Policies determine which blocks should be offloaded. They are evaluated //! as filters - blocks that fail any filter are removed from the transfer. //! //! # Performance Optimization //! //! This module uses `Either` instead of `#[async_trait]` to //! avoid heap allocations for synchronous policies. Policies that only perform //! local, synchronous operations (like `PresenceFilter`, `PassAllPolicy`) return //! `Either::Left(ready(...))` which requires zero heap allocation. Policies that //! need actual async operations return `Either::Right(Box::pin(...))`. //! //! # Built-in Policies //! //! - `PresenceFilter`: Skip blocks already present in destination tier //! - `PresenceAndLFUFilter`: Presence check + LFU count threshold //! - `PassAllPolicy`: No filtering (pass all blocks) //! - `AllOfPolicy`: Composite AND policy //! - `AnyOfPolicy`: Composite OR policy use std::future::{Future, Ready, ready}; use std::marker::PhantomData; use std::pin::Pin; use std::sync::Arc; use anyhow::Result; use futures::future::Either; use kvbm_config::{PolicyType, TierOffloadConfig}; use crate::{BlockId, SequenceHash}; use kvbm_logical::blocks::{BlockMetadata, BlockRegistry, ImmutableBlock}; use super::pending::{PendingCheck, PendingTracker}; use crate::object::{ObjectBlockOps, ObjectLockManager}; /// Boxed future type for async policy evaluation. pub type BoxFuture<'a, T> = Pin + Send + 'a>>; /// Future type for single-block policy evaluation. /// /// - `Left(Ready<...>)`: Synchronous result, zero heap allocation /// - `Right(BoxFuture<...>)`: Async result, requires heap allocation pub type PolicyFuture<'a> = Either>, BoxFuture<'a, Result>>; /// Future type for batch policy evaluation. /// /// - `Left(Ready<...>)`: Synchronous result, zero heap allocation /// - `Right(BoxFuture<...>)`: Async result, requires heap allocation pub type PolicyBatchFuture<'a> = Either>>, BoxFuture<'a, Result>>>; /// Create a synchronous policy result (zero allocation). #[inline] pub fn sync_result(result: Result) -> PolicyFuture<'static> { Either::Left(ready(result)) } /// Create a synchronous batch policy result (zero allocation). #[inline] pub fn sync_batch_result(result: Result>) -> PolicyBatchFuture<'static> { Either::Left(ready(result)) } /// Create an async policy result (boxes the future). #[inline] pub fn async_result<'a, F>(future: F) -> PolicyFuture<'a> where F: Future> + Send + 'a, { Either::Right(Box::pin(future)) } /// Create an async batch policy result (boxes the future). #[inline] pub fn async_batch_result<'a, F>(future: F) -> PolicyBatchFuture<'a> where F: Future>> + Send + 'a, { Either::Right(Box::pin(future)) } // ============================================================================ // Presence Checker Trait // ============================================================================ /// Async presence checker for object storage or other external destinations. /// /// This trait abstracts presence checking for destinations that require async /// operations (like S3, caching services). Unlike `BlockRegistry::check_presence` /// which is synchronous, this is designed for remote/external destinations. /// /// # Implementations /// /// - `S3PresenceChecker`: Wraps `ObjectBlockOps::has_blocks()` for S3/object storage /// - Future: `CachedPresenceChecker` - local bloom filter / LRU cache layer /// - Future: `DistributedCacheChecker` - remote caching service pub trait PresenceChecker: Send + Sync { /// Check if blocks exist at the destination. /// /// Returns a vector of (SequenceHash, exists: bool) pairs. fn check_presence( &self, keys: Vec, ) -> BoxFuture<'static, Vec<(SequenceHash, bool)>>; } /// S3/Object storage presence checker. /// /// Wraps `ObjectBlockOps::has_blocks()` and converts `Option` → `bool`. /// This is the default presence checker for G2→G4 (object storage) pipelines. /// /// # Example /// ```ignore /// let object_ops: Arc = ...; /// let checker = S3PresenceChecker::new(object_ops); /// let results = checker.check_presence(keys).await; /// ``` pub struct S3PresenceChecker { object_ops: Arc, } impl S3PresenceChecker { /// Create a new S3 presence checker wrapping the given object operations. pub fn new(object_ops: Arc) -> Self { Self { object_ops } } } impl PresenceChecker for S3PresenceChecker { fn check_presence( &self, keys: Vec, ) -> BoxFuture<'static, Vec<(SequenceHash, bool)>> { let future = self.object_ops.has_blocks(keys); Box::pin(async move { let results = future.await; // Convert Option (size) → bool (exists) results .into_iter() .map(|(hash, size_opt)| (hash, size_opt.is_some())) .collect() }) } } // ============================================================================ // Evaluation Context // ============================================================================ /// Context provided to policies for block evaluation. #[derive(Debug)] pub struct EvalContext { /// Block ID pub block_id: BlockId, /// Sequence hash for this block pub sequence_hash: SequenceHash, /// Optional strong reference to the block. /// - Some: Strong blocks (held during evaluation) /// - None: Weak blocks (deferred upgrade) pub block: Option>, } impl EvalContext { /// Create a new evaluation context from a strong block reference. pub fn new(block: ImmutableBlock) -> Self { Self { block_id: block.block_id(), sequence_hash: block.sequence_hash(), block: Some(block), } } /// Create a context for weak block evaluation (deferred upgrade). /// /// Used when evaluating weak blocks - we have the metadata /// but defer the actual upgrade until just before transfer. pub fn from_weak(block_id: BlockId, sequence_hash: SequenceHash) -> Self { Self { block_id, sequence_hash, block: None, } } /// Create a context for external block evaluation. /// /// Used when evaluating external blocks (e.g., G1 from vLLM) - we have /// the block_id and sequence_hash but no ImmutableBlock reference. pub fn from_external(block_id: BlockId, sequence_hash: SequenceHash) -> Self { Self { block_id, sequence_hash, block: None, } } } /// Trait for offload policies that filter blocks. /// /// Policies are evaluated as a chain - a block must pass ALL policies to proceed. /// Each policy receives an `EvalContext` with block information and returns /// `Ok(true)` to pass or `Ok(false)` to filter out. /// /// # Performance /// /// This trait uses `Either` instead of `#[async_trait]` to /// avoid heap allocations for synchronous policies. Implement using: /// - `sync_result(Ok(true))` for synchronous policies (zero allocation) /// - `async_result(async { ... })` for async policies (boxes the future) /// /// # Batch Evaluation /// /// The `evaluate_batch` method provides a default implementation that calls /// `evaluate` for each block. Override for efficiency when the policy can /// benefit from batching (e.g., batch registry lookups). pub trait OffloadPolicy: Send + Sync { /// Unique name for this policy (for logging/debugging). fn name(&self) -> &str; /// Evaluate whether a block should be offloaded. /// /// Returns: /// - `Ok(true)`: Block passes this filter, continue to next policy /// - `Ok(false)`: Block filtered out, remove from transfer /// - `Err(_)`: Fatal error, fail the entire transfer fn evaluate<'a>(&'a self, ctx: &'a EvalContext) -> PolicyFuture<'a>; /// Batch evaluate multiple blocks. /// /// Default implementation calls `evaluate` for each block. /// Override for efficiency when batching is beneficial. fn evaluate_batch<'a>(&'a self, contexts: &'a [EvalContext]) -> PolicyBatchFuture<'a> { // Default: sequential evaluation let contexts_clone: Vec<_> = contexts.iter().collect(); async_batch_result(async move { let mut results = Vec::with_capacity(contexts_clone.len()); for ctx in contexts_clone { // This calls the sync or async evaluate let result = match self.evaluate(ctx) { Either::Left(ready) => ready.await, Either::Right(boxed) => boxed.await, }; results.push(result?); } Ok(results) }) } } /// G1→G2 filter: skip blocks already present in destination tier. /// /// Uses `BlockRegistry::check_presence` to determine if a block exists /// in the destination tier without acquiring a full block reference. /// This is efficient because it only checks the registry metadata. /// /// # Duplicate Prevention /// /// When a `PendingTracker` is configured, this filter also checks for blocks /// that are currently in-flight through the pipeline. This prevents duplicate /// transfers when overlapping sequences are enqueued at roughly the same time. /// /// # Performance /// /// This policy is fully synchronous and returns `Either::Left(Ready)`, /// avoiding any heap allocation per evaluation. /// /// # Example /// ```ignore /// let tracker = Arc::new(PendingTracker::new()); /// let filter = PresenceFilter::::new(registry.clone()) /// .with_pending_tracker(tracker); /// // Blocks already in G2 OR in-flight will be filtered out /// ``` pub struct PresenceFilter { registry: Arc, /// Optional tracker for pending (in-flight) transfers. /// When set, blocks that are already being transferred will be filtered out. pending_tracker: Option>, _marker: PhantomData<(Src, Dst)>, } impl PresenceFilter { /// Create a new presence filter without pending tracking. pub fn new(registry: Arc) -> Self { Self { registry, pending_tracker: None, _marker: PhantomData, } } /// Add a pending tracker for duplicate prevention. /// /// When set, blocks that are currently in-flight (passed policy but not /// yet registered in destination) will be filtered out. pub fn with_pending_tracker(mut self, tracker: Arc) -> Self { self.pending_tracker = Some(tracker); self } /// Get a reference to the pending tracker if configured. pub fn pending_tracker(&self) -> Option<&Arc> { self.pending_tracker.as_ref() } } impl OffloadPolicy for PresenceFilter { fn name(&self) -> &str { "PresenceFilter" } fn evaluate<'a>(&'a self, ctx: &'a EvalContext) -> PolicyFuture<'a> { // Purely synchronous - uses Left(Ready), zero heap allocation // 1. Check if already present in destination registry let presence = self.registry.check_presence::(&[ctx.sequence_hash]); if presence[0].1 { return sync_result(Ok(false)); // Already transferred } // 2. Check if currently in-flight (pending transfer) if self.pending_tracker.is_hash_pending(&ctx.sequence_hash) { return sync_result(Ok(false)); // Already being transferred } sync_result(Ok(true)) // Not present, not pending - pass } fn evaluate_batch<'a>(&'a self, contexts: &'a [EvalContext]) -> PolicyBatchFuture<'a> { if contexts.is_empty() { return sync_batch_result(Ok(Vec::new())); } // Batch lookup for efficiency - still synchronous let hashes: Vec = contexts.iter().map(|ctx| ctx.sequence_hash).collect(); let presence = self.registry.check_presence::(&hashes); // Build results checking both registry presence and pending status let results: Vec = presence .into_iter() .map(|(hash, present)| { if present { return false; } if self.pending_tracker.is_hash_pending(&hash) { return false; } true }) .collect(); sync_batch_result(Ok(results)) } } /// G2→G3 filter: presence check + LFU count threshold. /// /// Combines two filter conditions: /// 1. Skip blocks already present in destination tier /// 2. Only offload blocks with LFU count above threshold /// /// The LFU threshold ensures we only offload "hot" blocks that have been /// accessed frequently, avoiding wasted transfers for rarely-used blocks. /// /// # Duplicate Prevention /// /// When a `PendingTracker` is configured, this filter also checks for blocks /// that are currently in-flight through the pipeline. /// /// # Performance /// /// This policy is fully synchronous and returns `Either::Left(Ready)`, /// avoiding any heap allocation per evaluation. /// /// # Example /// ```ignore /// // Only offload blocks with LFU count > 8 that aren't in G3 or in-flight /// let tracker = Arc::new(PendingTracker::new()); /// let filter = PresenceAndLFUFilter::::new(registry.clone(), 8) /// .with_pending_tracker(tracker); /// ``` pub struct PresenceAndLFUFilter { registry: Arc, min_lfu_count: u32, /// Optional tracker for pending (in-flight) transfers. pending_tracker: Option>, _marker: PhantomData<(Src, Dst)>, } impl PresenceAndLFUFilter { /// Create a new presence + LFU filter with specified threshold. pub fn new(registry: Arc, min_lfu_count: u32) -> Self { Self { registry, min_lfu_count, pending_tracker: None, _marker: PhantomData, } } /// Create with default threshold of 8. pub fn with_default_threshold(registry: Arc) -> Self { Self::new(registry, 8) } /// Add a pending tracker for duplicate prevention. pub fn with_pending_tracker(mut self, tracker: Arc) -> Self { self.pending_tracker = Some(tracker); self } } impl OffloadPolicy for PresenceAndLFUFilter { fn name(&self) -> &str { "PresenceAndLFUFilter" } fn evaluate<'a>(&'a self, ctx: &'a EvalContext) -> PolicyFuture<'a> { // 1. Skip if already in Dst let presence = self.registry.check_presence::(&[ctx.sequence_hash]); if presence[0].1 { return sync_result(Ok(false)); } // 2. Skip if currently pending transfer if self.pending_tracker.is_hash_pending(&ctx.sequence_hash) { return sync_result(Ok(false)); } // 3. Check LFU count > threshold if let Some(tracker) = self.registry.frequency_tracker() { // Convert SequenceHash to u128 for the tracker let count = tracker.count(ctx.sequence_hash.as_u128()); return sync_result(Ok(count > self.min_lfu_count)); } // No frequency tracker = pass all (conservative default) sync_result(Ok(true)) } fn evaluate_batch<'a>(&'a self, contexts: &'a [EvalContext]) -> PolicyBatchFuture<'a> { if contexts.is_empty() { return sync_batch_result(Ok(Vec::new())); } // Batch presence lookup let hashes: Vec = contexts.iter().map(|ctx| ctx.sequence_hash).collect(); let presence = self.registry.check_presence::(&hashes); // Get trackers once let freq_tracker = self.registry.frequency_tracker(); let min_lfu = self.min_lfu_count; let results: Vec = presence .into_iter() .zip(contexts.iter()) .map(|((hash, present), ctx)| { // Skip if present in Dst if present { return false; } // Skip if currently pending if self.pending_tracker.is_hash_pending(&hash) { return false; } // Check LFU count if let Some(ref t) = freq_tracker { let count = t.count(ctx.sequence_hash.as_u128()); count > min_lfu } else { true // No tracker = pass } }) .collect(); sync_batch_result(Ok(results)) } } /// G2→G4 filter: async presence check for object storage destinations. /// /// Unlike `PresenceFilter` which checks local `BlockRegistry` synchronously, /// this filter queries object storage (S3, etc.) asynchronously via a /// `PresenceChecker` implementation. /// /// # Duplicate Prevention /// /// When a `PendingTracker` is configured, this filter also checks for blocks /// that are currently in-flight through the pipeline before querying object storage. /// /// # Performance /// /// This policy returns `Either::Right(BoxFuture)` since it requires async I/O. /// The pending tracker check is done synchronously first to avoid unnecessary /// object storage queries. /// /// # Example /// ```ignore /// let object_ops: Arc = ...; /// let checker = Arc::new(S3PresenceChecker::new(object_ops)); /// let tracker = Arc::new(PendingTracker::new()); /// let filter = ObjectPresenceFilter::::new(checker) /// .with_pending_tracker(tracker); /// // Blocks already in object storage OR in-flight will be filtered out /// ``` pub struct ObjectPresenceFilter { presence_checker: Arc, /// Optional tracker for pending (in-flight) transfers. pending_tracker: Option>, _marker: PhantomData, } impl ObjectPresenceFilter { /// Create a new object presence filter. pub fn new(presence_checker: Arc) -> Self { Self { presence_checker, pending_tracker: None, _marker: PhantomData, } } /// Add a pending tracker for duplicate prevention. /// /// When set, blocks that are currently in-flight (passed policy but not /// yet stored in object storage) will be filtered out. pub fn with_pending_tracker(mut self, tracker: Arc) -> Self { self.pending_tracker = Some(tracker); self } /// Get a reference to the pending tracker if configured. pub fn pending_tracker(&self) -> Option<&Arc> { self.pending_tracker.as_ref() } } impl OffloadPolicy for ObjectPresenceFilter { fn name(&self) -> &str { "ObjectPresenceFilter" } fn evaluate<'a>(&'a self, ctx: &'a EvalContext) -> PolicyFuture<'a> { // 1. Synchronous check: skip if currently pending if self.pending_tracker.is_hash_pending(&ctx.sequence_hash) { return sync_result(Ok(false)); // Already being transferred } // 2. Async check: query object storage for presence let checker = self.presence_checker.clone(); let hash = ctx.sequence_hash; async_result(async move { let results = checker.check_presence(vec![hash]).await; // If present in object storage, filter out let exists = results .into_iter() .next() .map(|(_, exists)| exists) .unwrap_or(false); Ok(!exists) // Pass if NOT present }) } fn evaluate_batch<'a>(&'a self, contexts: &'a [EvalContext]) -> PolicyBatchFuture<'a> { if contexts.is_empty() { return sync_batch_result(Ok(Vec::new())); } // Collect hashes, filtering out pending ones first (sync) let mut pending_status: Vec = Vec::with_capacity(contexts.len()); let mut hashes_to_check: Vec = Vec::new(); let mut hash_indices: Vec = Vec::new(); for (i, ctx) in contexts.iter().enumerate() { if self.pending_tracker.is_hash_pending(&ctx.sequence_hash) { pending_status.push(true); // Mark as pending (will be filtered) } else { pending_status.push(false); hashes_to_check.push(ctx.sequence_hash); hash_indices.push(i); } } // If all are pending, return immediately if hashes_to_check.is_empty() { return sync_batch_result(Ok(vec![false; contexts.len()])); } let checker = self.presence_checker.clone(); let num_contexts = contexts.len(); async_batch_result(async move { // Query object storage for non-pending hashes let presence_results = checker.check_presence(hashes_to_check).await; // Build final results let mut results = vec![false; num_contexts]; // Default: filtered out // Map presence results back to original indices for (check_idx, original_idx) in hash_indices.into_iter().enumerate() { if let Some((_, exists)) = presence_results.get(check_idx) { // Pass if NOT present in object storage results[original_idx] = !*exists; } } Ok(results) }) } } /// G2→G4 filter with distributed locking: check meta, acquire lock, track acquired locks. /// /// This filter implements the full locking protocol for object storage offloads: /// 1. Check if `.meta` file exists (block already offloaded) - skip if yes /// 2. Check if currently pending (in-flight transfer) - skip if yes /// 3. Try to acquire `.lock` file with conditional PUT /// - If lock doesn't exist, create it atomically /// - If lock exists and expired, overwrite it /// - If lock exists and valid (owned by another instance), skip /// 4. If we own the lock (either just acquired or already owned), pass the block /// /// # Lock Management /// /// Locks acquired during policy evaluation are tracked and must be: /// - Released after successful transfer (via `ObjectTransferExecutor`) /// - Released on error/cancellation (via guard or explicit cleanup) /// /// # Duplicate Prevention /// /// When a `PendingTracker` is configured, blocks currently in-flight are filtered /// out before checking object storage, avoiding redundant network calls. /// /// # Example /// ```ignore /// let lock_manager = Arc::new(S3LockManager::new(s3_client, instance_id)); /// let tracker = Arc::new(PendingTracker::new()); /// let filter = ObjectLockPresenceFilter::::new(lock_manager) /// .with_pending_tracker(tracker); /// // Blocks already offloaded, in-flight, or locked by others will be filtered out /// ``` pub struct ObjectLockPresenceFilter { lock_manager: Arc, /// Optional tracker for pending (in-flight) transfers. pending_tracker: Option>, _marker: PhantomData, } impl ObjectLockPresenceFilter { /// Create a new object lock presence filter. pub fn new(lock_manager: Arc) -> Self { Self { lock_manager, pending_tracker: None, _marker: PhantomData, } } /// Add a pending tracker for duplicate prevention. /// /// When set, blocks that are currently in-flight (passed policy but not /// yet stored in object storage) will be filtered out. pub fn with_pending_tracker(mut self, tracker: Arc) -> Self { self.pending_tracker = Some(tracker); self } /// Get a reference to the pending tracker if configured. pub fn pending_tracker(&self) -> Option<&Arc> { self.pending_tracker.as_ref() } /// Get a reference to the lock manager. pub fn lock_manager(&self) -> &Arc { &self.lock_manager } } impl OffloadPolicy for ObjectLockPresenceFilter { fn name(&self) -> &str { "ObjectLockPresenceFilter" } fn evaluate<'a>(&'a self, ctx: &'a EvalContext) -> PolicyFuture<'a> { // 1. Synchronous check: skip if currently pending if self.pending_tracker.is_hash_pending(&ctx.sequence_hash) { return sync_result(Ok(false)); // Already being transferred } // 2. Async checks: meta presence, then lock acquisition let lock_manager = self.lock_manager.clone(); let hash = ctx.sequence_hash; async_result(async move { // Check if meta file exists (already offloaded) match lock_manager.has_meta(hash).await { Ok(true) => { tracing::debug!(?hash, "Block already offloaded (meta exists)"); return Ok(false); // Already offloaded, skip } Ok(false) => { // Continue to lock acquisition } Err(e) => { tracing::warn!(?hash, error = %e, "Error checking meta file"); return Ok(false); // Error, skip to be safe } } // Try to acquire lock match lock_manager.try_acquire_lock(hash).await { Ok(true) => { tracing::debug!(?hash, "Lock acquired"); Ok(true) // Pass - we own the lock } Ok(false) => { tracing::debug!(?hash, "Lock held by another instance"); Ok(false) // Skip - another instance owns the lock } Err(e) => { tracing::warn!(?hash, error = %e, "Error acquiring lock"); Ok(false) // Error, skip to be safe } } }) } fn evaluate_batch<'a>(&'a self, contexts: &'a [EvalContext]) -> PolicyBatchFuture<'a> { if contexts.is_empty() { return sync_batch_result(Ok(Vec::new())); } // Filter out pending blocks first (sync) let mut pending_mask: Vec = Vec::with_capacity(contexts.len()); let mut to_check: Vec<(usize, SequenceHash)> = Vec::new(); for (i, ctx) in contexts.iter().enumerate() { if self.pending_tracker.is_hash_pending(&ctx.sequence_hash) { pending_mask.push(true); } else { pending_mask.push(false); to_check.push((i, ctx.sequence_hash)); } } // If all are pending, return immediately if to_check.is_empty() { return sync_batch_result(Ok(vec![false; contexts.len()])); } let lock_manager = self.lock_manager.clone(); let num_contexts = contexts.len(); async_batch_result(async move { let mut results = vec![false; num_contexts]; // Default: filtered out // Process each non-pending block for (original_idx, hash) in to_check { // Check meta first let has_meta = match lock_manager.has_meta(hash).await { Ok(has) => has, Err(e) => { tracing::warn!(?hash, error = %e, "Error checking meta file"); continue; // Skip this block } }; if has_meta { tracing::debug!(?hash, "Block already offloaded (meta exists)"); continue; // Already offloaded } // Try to acquire lock match lock_manager.try_acquire_lock(hash).await { Ok(true) => { tracing::debug!(?hash, "Lock acquired"); results[original_idx] = true; // Pass } Ok(false) => { tracing::debug!(?hash, "Lock held by another instance"); // Skip - another instance owns the lock } Err(e) => { tracing::warn!(?hash, error = %e, "Error acquiring lock"); // Skip on error } } } Ok(results) }) } } /// Composite policy that requires ALL sub-policies to pass (AND logic). pub struct AllOfPolicy { policies: Vec>>, } impl AllOfPolicy { /// Create a new AND composite policy. pub fn new(policies: Vec>>) -> Self { Self { policies } } /// Add a policy to the composite. pub fn with(mut self, policy: Arc>) -> Self { self.policies.push(policy); self } } impl OffloadPolicy for AllOfPolicy { fn name(&self) -> &str { "AllOfPolicy" } fn evaluate<'a>(&'a self, ctx: &'a EvalContext) -> PolicyFuture<'a> { // Must use async because sub-policies might be async let policies = &self.policies; async_result(async move { for policy in policies { let result = match policy.evaluate(ctx) { Either::Left(ready) => ready.await, Either::Right(boxed) => boxed.await, }; if !result? { return Ok(false); } } Ok(true) }) } } /// Composite policy that requires ANY sub-policy to pass (OR logic). pub struct AnyOfPolicy { policies: Vec>>, } impl AnyOfPolicy { /// Create a new OR composite policy. pub fn new(policies: Vec>>) -> Self { Self { policies } } /// Add a policy to the composite. pub fn with(mut self, policy: Arc>) -> Self { self.policies.push(policy); self } } impl OffloadPolicy for AnyOfPolicy { fn name(&self) -> &str { "AnyOfPolicy" } fn evaluate<'a>(&'a self, ctx: &'a EvalContext) -> PolicyFuture<'a> { if self.policies.is_empty() { return sync_result(Ok(true)); // No policies = pass } // Must use async because sub-policies might be async let policies = &self.policies; async_result(async move { for policy in policies { let result = match policy.evaluate(ctx) { Either::Left(ready) => ready.await, Either::Right(boxed) => boxed.await, }; if result? { return Ok(true); } } Ok(false) }) } } /// A pass-all policy (no filtering). /// /// # Performance /// /// This policy is fully synchronous and returns `Either::Left(Ready)`, /// avoiding any heap allocation per evaluation. pub struct PassAllPolicy { _marker: PhantomData, } impl PassAllPolicy { /// Create a new pass-all policy. pub fn new() -> Self { Self { _marker: PhantomData, } } } impl Default for PassAllPolicy { fn default() -> Self { Self::new() } } impl OffloadPolicy for PassAllPolicy { fn name(&self) -> &str { "PassAllPolicy" } fn evaluate<'a>(&'a self, _ctx: &'a EvalContext) -> PolicyFuture<'a> { // Zero allocation - just returns ready(Ok(true)) sync_result(Ok(true)) } fn evaluate_batch<'a>(&'a self, contexts: &'a [EvalContext]) -> PolicyBatchFuture<'a> { sync_batch_result(Ok(vec![true; contexts.len()])) } } /// Create a composite policy from tier configuration. /// /// Policies are applied in order with AND logic - blocks must pass all policies. /// Returns `PassAllPolicy` if no policies are configured. /// /// When a `pending_tracker` is provided, it is automatically wired into /// `Presence` and `PresenceLfu` policies to enable duplicate prevention /// for blocks currently in-flight through the pipeline. /// /// # Example /// /// ```ignore /// use kvbm_config::offload::TierOffloadConfig; /// /// let tracker = Arc::new(PendingTracker::new()); /// let config = TierOffloadConfig { /// policies: vec![PolicyType::Presence, PolicyType::PresenceLfu], /// presence_lfu: PresenceLfuFilterConfig { min_lfu_count: 8 }, /// ..Default::default() /// }; /// /// // Pending tracker is automatically wired into presence-based policies /// let policy = create_policy_from_config::(&config, registry.clone(), Some(tracker)); /// ``` pub fn create_policy_from_config( config: &TierOffloadConfig, registry: Arc, pending_tracker: Option>, ) -> Arc> where Src: BlockMetadata + 'static, Dst: BlockMetadata + 'static, { if config.policies.is_empty() { return Arc::new(PassAllPolicy::::new()); } let policies: Vec>> = config .policies .iter() .map(|policy_type| -> Arc> { match policy_type { PolicyType::PassAll => Arc::new(PassAllPolicy::::new()), PolicyType::Presence => { let mut filter = PresenceFilter::::new(registry.clone()); if let Some(tracker) = &pending_tracker { filter = filter.with_pending_tracker(tracker.clone()); } Arc::new(filter) } PolicyType::PresenceLfu => { let mut filter = PresenceAndLFUFilter::::new( registry.clone(), config.presence_lfu.min_lfu_count, ); if let Some(tracker) = &pending_tracker { filter = filter.with_pending_tracker(tracker.clone()); } Arc::new(filter) } } }) .collect(); if policies.len() == 1 { policies.into_iter().next().unwrap() } else { Arc::new(AllOfPolicy::new(policies)) } } #[cfg(test)] mod tests { use super::*; // Note: Full tests require BlockRegistry infrastructure which needs // tokio runtime and complex setup. Basic API tests here. #[test] fn test_pass_all_policy() { let _policy: PassAllPolicy<()> = PassAllPolicy::new(); // Would test evaluate with proper setup } #[test] fn test_all_of_policy_creation() { let policies: Vec>> = vec![Arc::new(PassAllPolicy::new())]; let composite = AllOfPolicy::new(policies); assert_eq!(composite.name(), "AllOfPolicy"); } #[test] fn test_any_of_policy_creation() { let policies: Vec>> = vec![Arc::new(PassAllPolicy::new())]; let composite = AnyOfPolicy::new(policies); assert_eq!(composite.name(), "AnyOfPolicy"); } #[tokio::test] async fn test_sync_result_zero_alloc() { // Verify sync_result returns Left variant let future = sync_result(Ok(true)); assert!(matches!(future, Either::Left(_))); let result = match future { Either::Left(ready) => ready.await, Either::Right(_) => unreachable!(), }; assert!(result.unwrap()); } #[tokio::test] async fn test_async_result_boxes() { // Verify async_result returns Right variant let future = async_result(async { Ok(false) }); assert!(matches!(future, Either::Right(_))); let result = match future { Either::Left(_) => unreachable!(), Either::Right(boxed) => boxed.await, }; assert!(!result.unwrap()); } #[test] fn test_pending_tracker_wiring() { use super::PendingTracker; // Verify pending_tracker can be set on PresenceFilter let tracker = Arc::new(PendingTracker::new()); let registry = Arc::new(BlockRegistry::new()); let filter: PresenceFilter<(), ()> = PresenceFilter::new(registry).with_pending_tracker(tracker.clone()); // Verify we can get the tracker back assert!(filter.pending_tracker().is_some()); assert!(Arc::ptr_eq(filter.pending_tracker().unwrap(), &tracker)); } #[test] fn test_pending_tracker_wiring_lfu() { use super::PendingTracker; // Verify pending_tracker can be set on PresenceAndLFUFilter let tracker = Arc::new(PendingTracker::new()); let registry = Arc::new(BlockRegistry::new()); let filter: PresenceAndLFUFilter<(), ()> = PresenceAndLFUFilter::new(registry, 8).with_pending_tracker(tracker); // Filter was successfully created with pending tracker assert_eq!(filter.name(), "PresenceAndLFUFilter"); } }