Unverified Commit 7c139ab2 authored by Ronen Schaffer's avatar Ronen Schaffer Committed by GitHub
Browse files

[KV Offload] Clean up ARC/LRU refactoring leftovers: group ARC tests and fix stale comment (#38217)


Signed-off-by: default avatarRonen Schaffer <ronen.schaffer@ibm.com>
parent 0be9516e
...@@ -15,6 +15,7 @@ from vllm.v1.kv_offload.abstract import ( ...@@ -15,6 +15,7 @@ from vllm.v1.kv_offload.abstract import (
from vllm.v1.kv_offload.cpu.manager import CPUOffloadingManager from vllm.v1.kv_offload.cpu.manager import CPUOffloadingManager
from vllm.v1.kv_offload.cpu.policies.arc import ARCCachePolicy from vllm.v1.kv_offload.cpu.policies.arc import ARCCachePolicy
from vllm.v1.kv_offload.mediums import CPULoadStoreSpec from vllm.v1.kv_offload.mediums import CPULoadStoreSpec
from vllm.v1.kv_offload.reuse_manager import FilterReusedOffloadingManager
@dataclass @dataclass
...@@ -243,20 +244,31 @@ def test_cpu_manager(): ...@@ -243,20 +244,31 @@ def test_cpu_manager():
) )
def test_arc_manager_basic(): class TestARCPolicy:
"""Unit tests for CPUOffloadingManager with ARC eviction policy."""
def _make_manager(
self, num_blocks: int = 4, enable_events: bool = True
) -> tuple[CPUOffloadingManager, ARCCachePolicy]:
manager = CPUOffloadingManager(
block_size=256,
num_blocks=num_blocks,
cache_policy="arc",
enable_events=enable_events,
)
policy = manager._policy
assert isinstance(policy, ARCCachePolicy)
return manager, policy
def test_basic(self):
""" """
Tests CPUOffloadingManager with arc policy. Tests CPUOffloadingManager with arc policy.
Verifies that ARC handles store, load, and lookup operations correctly. Verifies that ARC handles store, load, and lookup operations correctly.
""" """
block_size = 256 cpu_manager, arc_policy = self._make_manager()
arc_manager = CPUOffloadingManager(
block_size=block_size, num_blocks=4, cache_policy="arc", enable_events=True
)
arc_policy = arc_manager._policy
assert isinstance(arc_policy, ARCCachePolicy)
# prepare store [1, 2] # prepare store [1, 2]
prepare_store_output = arc_manager.prepare_store(to_hashes([1, 2])) prepare_store_output = cpu_manager.prepare_store(to_hashes([1, 2]))
verify_store_output( verify_store_output(
prepare_store_output, prepare_store_output,
ExpectedPrepareStoreOutput( ExpectedPrepareStoreOutput(
...@@ -267,67 +279,57 @@ def test_arc_manager_basic(): ...@@ -267,67 +279,57 @@ def test_arc_manager_basic():
) )
# lookup [1, 2] -> not ready # lookup [1, 2] -> not ready
assert arc_manager.lookup(to_hashes([1, 2])) == 0 assert cpu_manager.lookup(to_hashes([1, 2])) == 0
# no events so far # no events so far
assert list(arc_manager.take_events()) == [] assert list(cpu_manager.take_events()) == []
# complete store [1, 2] # complete store [1, 2]
arc_manager.complete_store(to_hashes([1, 2])) cpu_manager.complete_store(to_hashes([1, 2]))
verify_events( verify_events(
arc_manager.take_events(), block_size=block_size, expected_stores=({1, 2},) cpu_manager.take_events(), block_size=256, expected_stores=({1, 2},)
) )
# lookup [1, 2] # lookup [1, 2]
assert arc_manager.lookup(to_hashes([1])) == 1 assert cpu_manager.lookup(to_hashes([1])) == 1
assert arc_manager.lookup(to_hashes([1, 2])) == 2 assert cpu_manager.lookup(to_hashes([1, 2])) == 2
assert arc_manager.lookup(to_hashes([1, 2, 3])) == 2 assert cpu_manager.lookup(to_hashes([1, 2, 3])) == 2
# blocks should be in T1 (recent) # blocks should be in T1 (recent)
assert len(arc_policy.t1) == 2 assert len(arc_policy.t1) == 2
assert len(arc_policy.t2) == 0 assert len(arc_policy.t2) == 0
def test_t1_to_t2_promotion(self):
def test_arc_manager_t1_to_t2_promotion():
""" """
Tests that accessing a block in T1 promotes it to T2 (frequent). Tests that accessing a block in T1 promotes it to T2 (frequent).
This is a key feature of ARC's adaptive behavior. This is a key feature of ARC's adaptive behavior.
""" """
block_size = 256 cpu_manager, arc_policy = self._make_manager(enable_events=False)
arc_manager = CPUOffloadingManager(
block_size=block_size, num_blocks=4, cache_policy="arc", enable_events=False
)
arc_policy = arc_manager._policy
assert isinstance(arc_policy, ARCCachePolicy)
# store and complete block 1 # store and complete block 1
arc_manager.prepare_store(to_hashes([1])) cpu_manager.prepare_store(to_hashes([1]))
arc_manager.complete_store(to_hashes([1])) cpu_manager.complete_store(to_hashes([1]))
# block 1 starts in T1 (recent) # block 1 starts in T1 (recent)
assert to_hashes([1])[0] in arc_policy.t1 assert to_hashes([1])[0] in arc_policy.t1
assert to_hashes([1])[0] not in arc_policy.t2 assert to_hashes([1])[0] not in arc_policy.t2
# touch block 1 (simulate second access) # touch block 1 (simulate second access)
arc_manager.touch(to_hashes([1])) cpu_manager.touch(to_hashes([1]))
# block 1 should now be in T2 (frequent) # block 1 should now be in T2 (frequent)
assert to_hashes([1])[0] not in arc_policy.t1 assert to_hashes([1])[0] not in arc_policy.t1
assert to_hashes([1])[0] in arc_policy.t2 assert to_hashes([1])[0] in arc_policy.t2
def test_eviction_with_load(self):
def test_arc_manager_eviction_with_load():
""" """
Tests ARC eviction behavior similar to LRU test. Tests ARC eviction behavior similar to LRU test.
Verifies that blocks being loaded (ref_cnt > 0) cannot be evicted. Verifies that blocks being loaded (ref_cnt > 0) cannot be evicted.
""" """
block_size = 256 cpu_manager, _ = self._make_manager()
arc_manager = CPUOffloadingManager(
block_size=block_size, num_blocks=4, cache_policy="arc", enable_events=True
)
# prepare and complete store [1, 2, 3, 4] # prepare and complete store [1, 2, 3, 4]
prepare_store_output = arc_manager.prepare_store(to_hashes([1, 2, 3, 4])) prepare_store_output = cpu_manager.prepare_store(to_hashes([1, 2, 3, 4]))
verify_store_output( verify_store_output(
prepare_store_output, prepare_store_output,
ExpectedPrepareStoreOutput( ExpectedPrepareStoreOutput(
...@@ -336,79 +338,67 @@ def test_arc_manager_eviction_with_load(): ...@@ -336,79 +338,67 @@ def test_arc_manager_eviction_with_load():
block_hashes_evicted=[], block_hashes_evicted=[],
), ),
) )
arc_manager.complete_store(to_hashes([1, 2, 3, 4])) cpu_manager.complete_store(to_hashes([1, 2, 3, 4]))
# prepare load [2, 3] (increases ref_cnt) # prepare load [2, 3] (increases ref_cnt)
prepare_load_output = arc_manager.prepare_load(to_hashes([2, 3])) prepare_load_output = cpu_manager.prepare_load(to_hashes([2, 3]))
verify_load_output(prepare_load_output, [1, 2]) verify_load_output(prepare_load_output, [1, 2])
# prepare store [5, 6, 7] with [2, 3] being loaded # prepare store [5, 6, 7] with [2, 3] being loaded
# should fail because [2, 3] have ref_cnt > 0 # should fail because [2, 3] have ref_cnt > 0
assert arc_manager.prepare_store(to_hashes([5, 6, 7])) is None assert cpu_manager.prepare_store(to_hashes([5, 6, 7])) is None
# complete load [2, 3] # complete load [2, 3]
arc_manager.complete_load(to_hashes([2, 3])) cpu_manager.complete_load(to_hashes([2, 3]))
# now prepare store [5, 6, 7] should succeed # now prepare store [5, 6, 7] should succeed
# ARC will evict blocks one at a time from T1 as needed # ARC will evict blocks one at a time from T1 as needed
prepare_store_output = arc_manager.prepare_store(to_hashes([5, 6, 7])) prepare_store_output = cpu_manager.prepare_store(to_hashes([5, 6, 7]))
assert prepare_store_output is not None assert prepare_store_output is not None
# Should successfully evict enough blocks to make room (at least 1) # Should successfully evict enough blocks to make room (at least 1)
assert len(prepare_store_output.block_hashes_evicted) >= 1 assert len(prepare_store_output.block_hashes_evicted) >= 1
def test_adaptive_target(self):
def test_arc_manager_adaptive_target():
""" """
Tests ARC's adaptive target adjustment via ghost lists. Tests ARC's adaptive target adjustment via ghost lists.
When a block in B1 (ghost list) is accessed, target_t1_size increases. When a block in B1 (ghost list) is accessed, target_t1_size increases.
When a block in B2 is accessed, target_t1_size decreases. When a block in B2 is accessed, target_t1_size decreases.
""" """
block_size = 256 cpu_manager, arc_policy = self._make_manager(num_blocks=2, enable_events=False)
arc_manager = CPUOffloadingManager(
block_size=block_size, num_blocks=2, cache_policy="arc", enable_events=False
)
arc_policy = arc_manager._policy
assert isinstance(arc_policy, ARCCachePolicy)
# store blocks 1, 2 (fills cache) # store blocks 1, 2 (fills cache)
arc_manager.prepare_store(to_hashes([1, 2])) cpu_manager.prepare_store(to_hashes([1, 2]))
arc_manager.complete_store(to_hashes([1, 2])) cpu_manager.complete_store(to_hashes([1, 2]))
initial_target = arc_policy.target_t1_size initial_target = arc_policy.target_t1_size
# store block 3, evicting block 1 (moves to B1 ghost list) # store block 3, evicting block 1 (moves to B1 ghost list)
arc_manager.prepare_store(to_hashes([3])) cpu_manager.prepare_store(to_hashes([3]))
arc_manager.complete_store(to_hashes([3])) cpu_manager.complete_store(to_hashes([3]))
# block 1 should be in B1 (ghost list) # block 1 should be in B1 (ghost list)
assert to_hashes([1])[0] in arc_policy.b1 assert to_hashes([1])[0] in arc_policy.b1
# touch block 1 (cache miss, but in B1) # touch block 1 (cache miss, but in B1)
# this should increase target_t1_size (favor recency) # this should increase target_t1_size (favor recency)
arc_manager.touch(to_hashes([1])) cpu_manager.touch(to_hashes([1]))
# target should have increased # target should have increased
assert arc_policy.target_t1_size > initial_target assert arc_policy.target_t1_size > initial_target
def test_t1_t2_eviction_policy(self):
def test_arc_manager_t1_t2_eviction_policy():
""" """
Tests that ARC evicts from T1 or T2 based on target_t1_size. Tests that ARC evicts from T1 or T2 based on target_t1_size.
If |T1| >= target_t1_size, evict from T1, otherwise from T2. If |T1| >= target_t1_size, evict from T1, otherwise from T2.
""" """
block_size = 256 cpu_manager, arc_policy = self._make_manager(enable_events=False)
arc_manager = CPUOffloadingManager(
block_size=block_size, num_blocks=4, cache_policy="arc", enable_events=False
)
arc_policy = arc_manager._policy
assert isinstance(arc_policy, ARCCachePolicy)
# store blocks 1, 2, 3, 4 # store blocks 1, 2, 3, 4
arc_manager.prepare_store(to_hashes([1, 2, 3, 4])) cpu_manager.prepare_store(to_hashes([1, 2, 3, 4]))
arc_manager.complete_store(to_hashes([1, 2, 3, 4])) cpu_manager.complete_store(to_hashes([1, 2, 3, 4]))
# promote blocks 3, 4 to T2 by touching them # promote blocks 3, 4 to T2 by touching them
arc_manager.touch(to_hashes([3, 4])) cpu_manager.touch(to_hashes([3, 4]))
# now: T1 = {1, 2}, T2 = {3, 4} # now: T1 = {1, 2}, T2 = {3, 4}
assert len(arc_policy.t1) == 2 assert len(arc_policy.t1) == 2
...@@ -419,73 +409,61 @@ def test_arc_manager_t1_t2_eviction_policy(): ...@@ -419,73 +409,61 @@ def test_arc_manager_t1_t2_eviction_policy():
arc_policy.target_t1_size = 1 arc_policy.target_t1_size = 1
# store block 5, should evict from T1 (block 1, LRU in T1) # store block 5, should evict from T1 (block 1, LRU in T1)
output = arc_manager.prepare_store(to_hashes([5])) output = cpu_manager.prepare_store(to_hashes([5]))
assert output is not None assert output is not None
assert to_hashes([1]) == output.block_hashes_evicted assert to_hashes([1]) == output.block_hashes_evicted
arc_manager.complete_store(to_hashes([5])) cpu_manager.complete_store(to_hashes([5]))
# block 1 should be in B1 (ghost list) # block 1 should be in B1 (ghost list)
assert to_hashes([1])[0] in arc_policy.b1 assert to_hashes([1])[0] in arc_policy.b1
# block 5 should be in T1 # block 5 should be in T1
assert to_hashes([5])[0] in arc_policy.t1 assert to_hashes([5])[0] in arc_policy.t1
def test_ghost_list_bounds(self):
def test_arc_manager_ghost_list_bounds():
""" """
Tests that ghost lists (B1, B2) don't grow unbounded. Tests that ghost lists (B1, B2) don't grow unbounded.
They should be capped at cache_capacity. They should be capped at cache_capacity.
""" """
block_size = 256 cpu_manager, arc_policy = self._make_manager(num_blocks=2, enable_events=False)
arc_manager = CPUOffloadingManager(
block_size=block_size, num_blocks=2, cache_policy="arc", enable_events=False
)
arc_policy = arc_manager._policy
assert isinstance(arc_policy, ARCCachePolicy)
# fill cache with blocks 1, 2 # fill cache with blocks 1, 2
arc_manager.prepare_store(to_hashes([1, 2])) cpu_manager.prepare_store(to_hashes([1, 2]))
arc_manager.complete_store(to_hashes([1, 2])) cpu_manager.complete_store(to_hashes([1, 2]))
# store many blocks to fill ghost lists # store many blocks to fill ghost lists
for i in range(3, 20): for i in range(3, 20):
arc_manager.prepare_store(to_hashes([i])) cpu_manager.prepare_store(to_hashes([i]))
arc_manager.complete_store(to_hashes([i])) cpu_manager.complete_store(to_hashes([i]))
# ghost lists should not exceed cache_capacity # ghost lists should not exceed cache_capacity
assert len(arc_policy.b1) <= arc_policy.cache_capacity assert len(arc_policy.b1) <= arc_policy.cache_capacity
assert len(arc_policy.b2) <= arc_policy.cache_capacity assert len(arc_policy.b2) <= arc_policy.cache_capacity
def test_touch_ordering(self):
def test_arc_manager_touch_ordering():
""" """
Tests that touch() correctly updates access patterns. Tests that touch() correctly updates access patterns.
Similar to LRU test but verifies T1/T2 ordering. Similar to LRU test but verifies T1/T2 ordering.
""" """
block_size = 256 cpu_manager, arc_policy = self._make_manager()
arc_manager = CPUOffloadingManager(
block_size=block_size, num_blocks=4, cache_policy="arc", enable_events=True
)
arc_policy = arc_manager._policy
assert isinstance(arc_policy, ARCCachePolicy)
# store blocks 1, 2, 3, 4 # store blocks 1, 2, 3, 4
arc_manager.prepare_store(to_hashes([1, 2, 3, 4])) cpu_manager.prepare_store(to_hashes([1, 2, 3, 4]))
arc_manager.complete_store(to_hashes([1, 2, 3, 4])) cpu_manager.complete_store(to_hashes([1, 2, 3, 4]))
# promote 3, 4 to T2 # promote 3, 4 to T2
arc_manager.touch(to_hashes([3, 4])) cpu_manager.touch(to_hashes([3, 4]))
# T1 = {1, 2}, T2 = {3, 4} # T1 = {1, 2}, T2 = {3, 4}
# touch [1, 3, 4] - should promote 1 to T2, and move 3,4 to end of T2 # touch [1, 3, 4] - should promote 1 to T2, and move 3,4 to end of T2
arc_manager.touch(to_hashes([1, 3, 4])) cpu_manager.touch(to_hashes([1, 3, 4]))
# T1 = {2}, T2 = {1, 3, 4} (in that order, with 4 most recent) # T1 = {2}, T2 = {1, 3, 4} (in that order, with 4 most recent)
assert len(arc_policy.t1) == 1 assert len(arc_policy.t1) == 1
assert len(arc_policy.t2) == 3 assert len(arc_policy.t2) == 3
# store block 5, should evict from T1 (block 2, only one in T1) # store block 5, should evict from T1 (block 2, only one in T1)
prepare_store_output = arc_manager.prepare_store(to_hashes([5])) prepare_store_output = cpu_manager.prepare_store(to_hashes([5]))
verify_store_output( verify_store_output(
prepare_store_output, prepare_store_output,
ExpectedPrepareStoreOutput( ExpectedPrepareStoreOutput(
...@@ -495,33 +473,27 @@ def test_arc_manager_touch_ordering(): ...@@ -495,33 +473,27 @@ def test_arc_manager_touch_ordering():
), ),
) )
def test_failed_store(self):
def test_arc_manager_failed_store():
""" """
Tests that failed store operations clean up correctly. Tests that failed store operations clean up correctly.
Similar to LRU test but for ARC. Similar to LRU test but for ARC.
""" """
block_size = 256 cpu_manager, arc_policy = self._make_manager()
arc_manager = CPUOffloadingManager(
block_size=block_size, num_blocks=4, cache_policy="arc", enable_events=True
)
arc_policy = arc_manager._policy
assert isinstance(arc_policy, ARCCachePolicy)
# store blocks 1, 2, 3, 4 # store blocks 1, 2, 3, 4
arc_manager.prepare_store(to_hashes([1, 2, 3, 4])) cpu_manager.prepare_store(to_hashes([1, 2, 3, 4]))
arc_manager.complete_store(to_hashes([1, 2, 3, 4])) cpu_manager.complete_store(to_hashes([1, 2, 3, 4]))
# prepare store block 5 (will evict block 1) # prepare store block 5 (will evict block 1)
prepare_store_output = arc_manager.prepare_store(to_hashes([5])) prepare_store_output = cpu_manager.prepare_store(to_hashes([5]))
assert prepare_store_output is not None assert prepare_store_output is not None
assert len(prepare_store_output.block_hashes_evicted) == 1 assert len(prepare_store_output.block_hashes_evicted) == 1
# complete store with failure # complete store with failure
arc_manager.complete_store(to_hashes([5]), success=False) cpu_manager.complete_store(to_hashes([5]), success=False)
# block 5 should not be in cache # block 5 should not be in cache
assert arc_manager.lookup(to_hashes([5])) == 0 assert cpu_manager.lookup(to_hashes([5])) == 0
# block 5 should not be in T1 or T2 # block 5 should not be in T1 or T2
assert to_hashes([5])[0] not in arc_policy.t1 assert to_hashes([5])[0] not in arc_policy.t1
assert to_hashes([5])[0] not in arc_policy.t2 assert to_hashes([5])[0] not in arc_policy.t2
...@@ -530,47 +502,41 @@ def test_arc_manager_failed_store(): ...@@ -530,47 +502,41 @@ def test_arc_manager_failed_store():
evicted_hash = prepare_store_output.block_hashes_evicted[0] evicted_hash = prepare_store_output.block_hashes_evicted[0]
assert evicted_hash in arc_policy.b1 assert evicted_hash in arc_policy.b1
def test_full_scenario(self):
def test_arc_manager_full_scenario():
""" """
Comprehensive test covering multiple ARC operations in sequence. Comprehensive test covering multiple ARC operations in sequence.
Similar to the full LRU test but adapted for ARC behavior. Similar to the full LRU test but adapted for ARC behavior.
""" """
block_size = 256 cpu_manager, arc_policy = self._make_manager()
arc_manager = CPUOffloadingManager(
block_size=block_size, num_blocks=4, cache_policy="arc", enable_events=True
)
arc_policy = arc_manager._policy
assert isinstance(arc_policy, ARCCachePolicy)
# store [1, 2] # store [1, 2]
arc_manager.prepare_store(to_hashes([1, 2])) cpu_manager.prepare_store(to_hashes([1, 2]))
arc_manager.complete_store(to_hashes([1, 2])) cpu_manager.complete_store(to_hashes([1, 2]))
# store [3, 4, 5] -> evicts [1] # store [3, 4, 5] -> evicts [1]
prepare_store_output = arc_manager.prepare_store(to_hashes([3, 4, 5])) prepare_store_output = cpu_manager.prepare_store(to_hashes([3, 4, 5]))
assert prepare_store_output is not None assert prepare_store_output is not None
assert len(prepare_store_output.block_hashes_evicted) == 1 assert len(prepare_store_output.block_hashes_evicted) == 1
arc_manager.complete_store(to_hashes([3, 4, 5])) cpu_manager.complete_store(to_hashes([3, 4, 5]))
# promote some blocks to T2 # promote some blocks to T2
arc_manager.touch(to_hashes([2, 3])) cpu_manager.touch(to_hashes([2, 3]))
# T1 has {4, 5}, T2 has {2, 3} # T1 has {4, 5}, T2 has {2, 3}
assert len(arc_policy.t1) == 2 assert len(arc_policy.t1) == 2
assert len(arc_policy.t2) == 2 assert len(arc_policy.t2) == 2
# store [6] -> should evict from T1 (4 is oldest in T1) # store [6] -> should evict from T1 (4 is oldest in T1)
prepare_store_output = arc_manager.prepare_store(to_hashes([6])) prepare_store_output = cpu_manager.prepare_store(to_hashes([6]))
assert prepare_store_output is not None assert prepare_store_output is not None
arc_manager.complete_store(to_hashes([6])) cpu_manager.complete_store(to_hashes([6]))
# verify blocks 2, 3 (in T2) are still present # verify blocks 2, 3 (in T2) are still present
assert arc_manager.lookup(to_hashes([2])) == 1 assert cpu_manager.lookup(to_hashes([2])) == 1
assert arc_manager.lookup(to_hashes([3])) == 1 assert cpu_manager.lookup(to_hashes([3])) == 1
# verify events # verify events
events = list(arc_manager.take_events()) events = list(cpu_manager.take_events())
assert len(events) > 0 # should have store and eviction events assert len(events) > 0 # should have store and eviction events
...@@ -583,8 +549,6 @@ def test_filter_reused_manager(): ...@@ -583,8 +549,6 @@ def test_filter_reused_manager():
block_size=block_size, num_blocks=4, cache_policy="lru", enable_events=True block_size=block_size, num_blocks=4, cache_policy="lru", enable_events=True
) )
from vllm.v1.kv_offload.reuse_manager import FilterReusedOffloadingManager
manager = FilterReusedOffloadingManager( manager = FilterReusedOffloadingManager(
backing=lru_manager, store_threshold=2, max_tracker_size=3 backing=lru_manager, store_threshold=2, max_tracker_size=3
) )
......
...@@ -93,9 +93,8 @@ class FilterReusedOffloadingManager(OffloadingManager): ...@@ -93,9 +93,8 @@ class FilterReusedOffloadingManager(OffloadingManager):
] ]
# Delegate to the backing manager with only the eligible hashes. # Delegate to the backing manager with only the eligible hashes.
# Passing an empty list is intentional and safe — both # Passing an empty list is intentional and safe — CPUOffloadingManager
# LRUOffloadingManager and ARCOffloadingManager handle it correctly, # handles it correctly, returning a PrepareStoreOutput with empty lists.
# returning a PrepareStoreOutput with empty lists.
return self._backing.prepare_store(eligible) return self._backing.prepare_store(eligible)
# ------------------------------------------------------------------ # ------------------------------------------------------------------
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment