Unverified Commit 235e1f93 authored by Or Ozeri's avatar Or Ozeri Committed by GitHub
Browse files

[kv_offload+HMA][3/N]: Remove block_size from KVEvents (#36644)


Signed-off-by: default avatarOr Ozeri <oro@il.ibm.com>
parent 431cea3e
...@@ -124,12 +124,8 @@ def test_offloading_connector(request_runner, async_scheduling: bool): ...@@ -124,12 +124,8 @@ def test_offloading_connector(request_runner, async_scheduling: bool):
return [BlockHash(str(i).encode()) for i in int_hashes] return [BlockHash(str(i).encode()) for i in int_hashes]
def take_events() -> Iterable[OffloadingEvent]: def take_events() -> Iterable[OffloadingEvent]:
yield OffloadingEvent( yield OffloadingEvent(keys=to_keys([1, 2, 3]), medium="A", removed=False)
keys=to_keys([1, 2, 3]), block_size=16, medium="A", removed=False yield OffloadingEvent(keys=to_keys([4, 5, 6]), medium="B", removed=True)
)
yield OffloadingEvent(
keys=to_keys([4, 5, 6]), block_size=32, medium="B", removed=True
)
runner.manager.take_events.side_effect = take_events runner.manager.take_events.side_effect = take_events
events = list(runner.scheduler_connector.take_events()) events = list(runner.scheduler_connector.take_events())
...@@ -137,7 +133,7 @@ def test_offloading_connector(request_runner, async_scheduling: bool): ...@@ -137,7 +133,7 @@ def test_offloading_connector(request_runner, async_scheduling: bool):
event = events[0] event = events[0]
assert isinstance(event, BlockStored) assert isinstance(event, BlockStored)
assert event.block_hashes == to_hashes([1, 2, 3]) assert event.block_hashes == to_hashes([1, 2, 3])
assert event.block_size == 16 assert event.block_size == 0
assert event.medium == "A" assert event.medium == "A"
assert event.token_ids == [] assert event.token_ids == []
assert event.parent_block_hash is None assert event.parent_block_hash is None
......
...@@ -59,7 +59,6 @@ def verify_load_output( ...@@ -59,7 +59,6 @@ def verify_load_output(
def verify_events( def verify_events(
events: Iterable[OffloadingEvent], events: Iterable[OffloadingEvent],
block_size: int,
expected_stores: tuple[set[int], ...] = (), expected_stores: tuple[set[int], ...] = (),
expected_evictions: tuple[set[int], ...] = (), expected_evictions: tuple[set[int], ...] = (),
): ):
...@@ -67,7 +66,6 @@ def verify_events( ...@@ -67,7 +66,6 @@ def verify_events(
evictions: list[set[OffloadKey]] = [] evictions: list[set[OffloadKey]] = []
for event in events: for event in events:
assert event.medium == CPULoadStoreSpec.medium() assert event.medium == CPULoadStoreSpec.medium()
assert event.block_size == block_size
if event.removed: if event.removed:
evictions.append(set(event.keys)) evictions.append(set(event.keys))
else: else:
...@@ -98,9 +96,7 @@ def test_already_stored_block_not_evicted_during_prepare_store(eviction_policy): ...@@ -98,9 +96,7 @@ def test_already_stored_block_not_evicted_during_prepare_store(eviction_policy):
candidate to make room for [3, 4, 5] candidate to make room for [3, 4, 5]
- After complete_store([2, 3, 4, 5]), block 2 must still be present. - After complete_store([2, 3, 4, 5]), block 2 must still be present.
""" """
block_size = 256
manager = CPUOffloadingManager( manager = CPUOffloadingManager(
block_size=block_size,
num_blocks=4, num_blocks=4,
cache_policy=eviction_policy, cache_policy=eviction_policy,
enable_events=True, enable_events=True,
...@@ -138,10 +134,9 @@ def test_cpu_manager(): ...@@ -138,10 +134,9 @@ def test_cpu_manager():
""" """
Tests CPUOffloadingManager with lru policy. Tests CPUOffloadingManager with lru policy.
""" """
# initialize a CPU backend with a capacity of 4 blocks # initialize a CPU manager with a capacity of 4 blocks
block_size = 256
cpu_manager = CPUOffloadingManager( cpu_manager = CPUOffloadingManager(
block_size=block_size, num_blocks=4, cache_policy="lru", enable_events=True num_blocks=4, cache_policy="lru", enable_events=True
) )
# prepare store [1, 2] # prepare store [1, 2]
...@@ -163,9 +158,7 @@ def test_cpu_manager(): ...@@ -163,9 +158,7 @@ def test_cpu_manager():
# complete store [1, 2] # complete store [1, 2]
cpu_manager.complete_store(to_keys([1, 2])) cpu_manager.complete_store(to_keys([1, 2]))
verify_events( verify_events(cpu_manager.take_events(), expected_stores=({1, 2},))
cpu_manager.take_events(), block_size=block_size, expected_stores=({1, 2},)
)
# lookup [1, 2] # lookup [1, 2]
assert cpu_manager.lookup(to_keys([1])) == 1 assert cpu_manager.lookup(to_keys([1])) == 1
...@@ -184,9 +177,7 @@ def test_cpu_manager(): ...@@ -184,9 +177,7 @@ def test_cpu_manager():
) )
# verify eviction event # verify eviction event
verify_events( verify_events(cpu_manager.take_events(), expected_evictions=({1},))
cpu_manager.take_events(), block_size=block_size, expected_evictions=({1},)
)
# prepare store with no space # prepare store with no space
assert cpu_manager.prepare_store(to_keys([1, 6])) is None assert cpu_manager.prepare_store(to_keys([1, 6])) is None
...@@ -241,7 +232,6 @@ def test_cpu_manager(): ...@@ -241,7 +232,6 @@ def test_cpu_manager():
verify_events( verify_events(
cpu_manager.take_events(), cpu_manager.take_events(),
block_size=block_size,
expected_stores=({3, 4, 5}, {6, 7, 8}), expected_stores=({3, 4, 5}, {6, 7, 8}),
expected_evictions=({2, 3, 4}, {8}), expected_evictions=({2, 3, 4}, {8}),
) )
...@@ -254,7 +244,6 @@ class TestARCPolicy: ...@@ -254,7 +244,6 @@ class TestARCPolicy:
self, num_blocks: int = 4, enable_events: bool = True self, num_blocks: int = 4, enable_events: bool = True
) -> tuple[CPUOffloadingManager, ARCCachePolicy]: ) -> tuple[CPUOffloadingManager, ARCCachePolicy]:
manager = CPUOffloadingManager( manager = CPUOffloadingManager(
block_size=256,
num_blocks=num_blocks, num_blocks=num_blocks,
cache_policy="arc", cache_policy="arc",
enable_events=enable_events, enable_events=enable_events,
...@@ -289,9 +278,7 @@ class TestARCPolicy: ...@@ -289,9 +278,7 @@ class TestARCPolicy:
# complete store [1, 2] # complete store [1, 2]
cpu_manager.complete_store(to_keys([1, 2])) cpu_manager.complete_store(to_keys([1, 2]))
verify_events( verify_events(cpu_manager.take_events(), expected_stores=({1, 2},))
cpu_manager.take_events(), block_size=256, expected_stores=({1, 2},)
)
# lookup [1, 2] # lookup [1, 2]
assert cpu_manager.lookup(to_keys([1])) == 1 assert cpu_manager.lookup(to_keys([1])) == 1
...@@ -547,9 +534,8 @@ def test_filter_reused_manager(): ...@@ -547,9 +534,8 @@ def test_filter_reused_manager():
""" """
Tests FilterReusedOffloadingManager with a CPUOffloadingManager. Tests FilterReusedOffloadingManager with a CPUOffloadingManager.
""" """
block_size = 256
lru_manager = CPUOffloadingManager( lru_manager = CPUOffloadingManager(
block_size=block_size, num_blocks=4, cache_policy="lru", enable_events=True num_blocks=4, cache_policy="lru", enable_events=True
) )
manager = FilterReusedOffloadingManager( manager = FilterReusedOffloadingManager(
......
...@@ -424,7 +424,7 @@ class OffloadingConnectorScheduler: ...@@ -424,7 +424,7 @@ class OffloadingConnectorScheduler:
parent_block_hash=None, parent_block_hash=None,
token_ids=[], token_ids=[],
lora_id=None, lora_id=None,
block_size=event.block_size, block_size=0,
medium=event.medium, medium=event.medium,
lora_name=None, lora_name=None,
) )
......
...@@ -79,7 +79,6 @@ class PrepareStoreOutput: ...@@ -79,7 +79,6 @@ class PrepareStoreOutput:
@dataclass @dataclass
class OffloadingEvent: class OffloadingEvent:
keys: list[OffloadKey] keys: list[OffloadKey]
block_size: int
medium: str medium: str
# True if blocks are removed, False if stored # True if blocks are removed, False if stored
removed: bool removed: bool
......
...@@ -33,12 +33,10 @@ class CPUOffloadingManager(OffloadingManager): ...@@ -33,12 +33,10 @@ class CPUOffloadingManager(OffloadingManager):
def __init__( def __init__(
self, self,
block_size: int,
num_blocks: int, num_blocks: int,
cache_policy: Literal["lru", "arc"] = "lru", cache_policy: Literal["lru", "arc"] = "lru",
enable_events: bool = False, enable_events: bool = False,
): ):
self.block_size: int = block_size
self.medium: str = CPULoadStoreSpec.medium() self.medium: str = CPULoadStoreSpec.medium()
self._num_blocks: int = num_blocks self._num_blocks: int = num_blocks
self._num_allocated_blocks: int = 0 self._num_allocated_blocks: int = 0
...@@ -145,7 +143,6 @@ class CPUOffloadingManager(OffloadingManager): ...@@ -145,7 +143,6 @@ class CPUOffloadingManager(OffloadingManager):
self.events.append( self.events.append(
OffloadingEvent( OffloadingEvent(
keys=to_evict, keys=to_evict,
block_size=self.block_size,
medium=self.medium, medium=self.medium,
removed=True, removed=True,
) )
...@@ -188,7 +185,6 @@ class CPUOffloadingManager(OffloadingManager): ...@@ -188,7 +185,6 @@ class CPUOffloadingManager(OffloadingManager):
self.events.append( self.events.append(
OffloadingEvent( OffloadingEvent(
keys=stored_keys, keys=stored_keys,
block_size=self.block_size,
medium=self.medium, medium=self.medium,
removed=False, removed=False,
) )
......
...@@ -60,12 +60,7 @@ class CPUOffloadingSpec(OffloadingSpec): ...@@ -60,12 +60,7 @@ class CPUOffloadingSpec(OffloadingSpec):
kv_events_config is not None and kv_events_config.enable_kv_cache_events kv_events_config is not None and kv_events_config.enable_kv_cache_events
) )
assert len(self.gpu_block_size) == 1
gpu_block_size = self.gpu_block_size[0]
offloaded_block_size = gpu_block_size * self.block_size_factor
self._manager = CPUOffloadingManager( self._manager = CPUOffloadingManager(
block_size=offloaded_block_size,
num_blocks=self.num_blocks, num_blocks=self.num_blocks,
cache_policy=self.eviction_policy, # type: ignore[arg-type] cache_policy=self.eviction_policy, # type: ignore[arg-type]
enable_events=enable_events, enable_events=enable_events,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment