# SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright contributors to the vLLM project from collections.abc import Iterable from typing import Literal from vllm.v1.kv_offload.abstract import ( LoadStoreSpec, OffloadingEvent, OffloadingManager, OffloadKey, PrepareStoreOutput, ReqContext, ) from vllm.v1.kv_offload.cpu.policies.abstract import BlockStatus, CachePolicy from vllm.v1.kv_offload.cpu.policies.arc import ARCCachePolicy from vllm.v1.kv_offload.cpu.policies.lru import LRUCachePolicy from vllm.v1.kv_offload.mediums import CPULoadStoreSpec _CACHE_POLICIES: dict[str, type[CachePolicy]] = { "lru": LRUCachePolicy, "arc": ARCCachePolicy, } class CPUOffloadingManager(OffloadingManager): """ An OffloadingManager with a pluggable CachePolicy (LRU or ARC). The manager owns all shared logic: ref-counting, event emission, block pool management, and the prepare_store/complete_store skeletons. Policy-specific block organization and eviction decisions are delegated to the CachePolicy implementation. """ def __init__( self, num_blocks: int, cache_policy: Literal["lru", "arc"] = "lru", enable_events: bool = False, ): self.medium: str = CPULoadStoreSpec.medium() self._num_blocks: int = num_blocks self._num_allocated_blocks: int = 0 self._free_list: list[int] = [] self.events: list[OffloadingEvent] | None = [] if enable_events else None policy_cls = _CACHE_POLICIES.get(cache_policy) if policy_cls is None: raise ValueError( f"Unknown cache policy: {cache_policy!r}. " f"Supported: {list(_CACHE_POLICIES)}" ) self._policy: CachePolicy = policy_cls(cache_capacity=num_blocks) # --- block pool --- def _get_num_free_blocks(self) -> int: return len(self._free_list) + self._num_blocks - self._num_allocated_blocks def _allocate_blocks(self, keys: list[OffloadKey]) -> list[BlockStatus]: num_fresh = min(len(keys), self._num_blocks - self._num_allocated_blocks) num_reused = len(keys) - num_fresh assert len(self._free_list) >= num_reused # allocate fresh blocks blocks: list[BlockStatus] = [] for _ in range(num_fresh): blocks.append(BlockStatus(self._num_allocated_blocks)) self._num_allocated_blocks += 1 # allocate reused blocks for _ in range(num_reused): blocks.append(BlockStatus(self._free_list.pop())) return blocks def _free_block(self, block: BlockStatus) -> None: self._free_list.append(block.block_id) def _get_load_store_spec( self, keys: Iterable[OffloadKey], blocks: Iterable[BlockStatus], ) -> CPULoadStoreSpec: return CPULoadStoreSpec([block.block_id for block in blocks]) # --- OffloadingManager interface --- def lookup(self, key: OffloadKey, req_context: ReqContext) -> bool | None: block = self._policy.get(key) return block is not None and block.is_ready def prepare_load( self, keys: Iterable[OffloadKey], req_context: ReqContext, ) -> LoadStoreSpec: blocks = [] for key in keys: block = self._policy.get(key) assert block is not None, f"Block {key!r} not found in cache" assert block.is_ready, f"Block {key!r} is not ready for reading" block.ref_cnt += 1 blocks.append(block) return self._get_load_store_spec(keys, blocks) def touch(self, keys: Iterable[OffloadKey]) -> None: self._policy.touch(keys) def complete_load(self, keys: Iterable[OffloadKey]) -> None: for key in keys: block = self._policy.get(key) assert block is not None, f"Block {key!r} not found" assert block.ref_cnt > 0, f"Block {key!r} ref_cnt is already 0" block.ref_cnt -= 1 def prepare_store( self, keys: Iterable[OffloadKey], req_context: ReqContext, ) -> PrepareStoreOutput | None: keys_list = list(keys) # filter out blocks that are already stored keys_to_store = [k for k in keys_list if self._policy.get(k) is None] if not keys_to_store: return PrepareStoreOutput( keys_to_store=[], store_spec=self._get_load_store_spec([], []), evicted_keys=[], ) num_blocks_to_evict = len(keys_to_store) - self._get_num_free_blocks() to_evict: list[OffloadKey] = [] if num_blocks_to_evict > 0: # Blocks from the original input are excluded from eviction candidates: # a block that was already stored must remain in the cache after this call. protected = set(keys_list) evicted = self._policy.evict(num_blocks_to_evict, protected) if evicted is None: return None for key, block in evicted: self._free_block(block) to_evict.append(key) if to_evict and self.events is not None: self.events.append( OffloadingEvent( keys=to_evict, medium=self.medium, removed=True, ) ) blocks = self._allocate_blocks(keys_to_store) assert len(blocks) == len(keys_to_store), ( "Block pool did not allocate the expected number of blocks" ) for key, block in zip(keys_to_store, blocks): self._policy.insert(key, block) # build store specs for allocated blocks store_spec = self._get_load_store_spec(keys_to_store, blocks) return PrepareStoreOutput( keys_to_store=keys_to_store, store_spec=store_spec, evicted_keys=to_evict, ) def complete_store(self, keys: Iterable[OffloadKey], success: bool = True) -> None: stored_keys: list[OffloadKey] = [] if success: for key in keys: block = self._policy.get(key) if block is not None and not block.is_ready: block.ref_cnt = 0 stored_keys.append(key) else: for key in keys: block = self._policy.get(key) if block is not None and not block.is_ready: self._policy.remove(key) self._free_block(block) if stored_keys and self.events is not None: self.events.append( OffloadingEvent( keys=stored_keys, medium=self.medium, removed=False, ) ) def take_events(self) -> Iterable[OffloadingEvent]: if self.events is not None: yield from self.events self.events.clear()