test_cpu_manager.py 20.9 KB
Newer Older
1
2
3
4
5
6
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
from collections.abc import Iterable
from dataclasses import dataclass

import numpy as np
7
import pytest
8
9

from vllm.v1.core.kv_cache_utils import BlockHash
10
11
12
13
14
from vllm.v1.kv_offload.abstract import (
    LoadStoreSpec,
    OffloadingEvent,
    PrepareStoreOutput,
)
15
16
from vllm.v1.kv_offload.cpu.manager import CPUOffloadingManager
from vllm.v1.kv_offload.cpu.policies.arc import ARCCachePolicy
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from vllm.v1.kv_offload.mediums import CPULoadStoreSpec


@dataclass
class ExpectedPrepareStoreOutput:
    block_hashes_to_store: list[int]
    store_block_ids: list[int]
    block_hashes_evicted: list[int]


def to_hashes(int_hashes: list[int]) -> list[BlockHash]:
    return [BlockHash(str(i).encode()) for i in int_hashes]


def verify_store_output(
32
    prepare_store_output: PrepareStoreOutput | None,
33
34
    expected_prepare_store_output: ExpectedPrepareStoreOutput,
):
35
    assert prepare_store_output is not None
36
37
38
39
40
41
    assert prepare_store_output.block_hashes_to_store == to_hashes(
        expected_prepare_store_output.block_hashes_to_store
    )
    assert prepare_store_output.block_hashes_evicted == to_hashes(
        expected_prepare_store_output.block_hashes_evicted
    )
42
43
    store_spec = prepare_store_output.store_spec
    assert isinstance(store_spec, CPULoadStoreSpec)
44
45
46
    expected_array = np.array(
        expected_prepare_store_output.store_block_ids, dtype=np.int64
    )
47
48
49
    assert np.array_equal(expected_array, store_spec.block_ids)


50
51
52
def verify_load_output(
    prepare_load_output: LoadStoreSpec, expected_prepare_load_output: list[int]
):
53
54
55
56
57
    assert isinstance(prepare_load_output, CPULoadStoreSpec)
    expected_array = np.array(expected_prepare_load_output, dtype=np.int64)
    assert np.array_equal(expected_array, prepare_load_output.block_ids)


58
59
60
61
62
63
def verify_events(
    events: Iterable[OffloadingEvent],
    block_size: int,
    expected_stores: tuple[set[int], ...] = (),
    expected_evictions: tuple[set[int], ...] = (),
):
64
65
66
67
68
69
70
71
72
73
    stores: list[set[BlockHash]] = []
    evictions: list[set[BlockHash]] = []
    for event in events:
        assert event.medium == CPULoadStoreSpec.medium()
        assert event.block_size == block_size
        if event.removed:
            evictions.append(set(event.block_hashes))
        else:
            stores.append(set(event.block_hashes))

74
    def to_hash_sets(int_sets: tuple[set[int], ...]) -> tuple[set[BlockHash], ...]:
75
76
77
78
79
80
        return tuple([set(to_hashes(list(int_set))) for int_set in int_sets])

    assert tuple(evictions) == to_hash_sets(expected_evictions)
    assert tuple(stores) == to_hash_sets(expected_stores)


81
82
@pytest.mark.parametrize("eviction_policy", ["lru", "arc"])
def test_already_stored_block_not_evicted_during_prepare_store(eviction_policy):
83
84
85
    """
    Regression test: a block that is already stored must not be evicted
    by prepare_store() when it needs to make room for new blocks.
86
    Applies to both lru and arc policies.
87
88
89
90
91
92
93
94
95
96
97

    Scenario:
        - Store blocks [1, 2] and complete.
        - touch([1]) makes block 2 the LRU candidate.
        - prepare_store([2, 3, 4, 5]):
            * block 2 is filtered out as "already stored"
            * but without the fix, block 2 would be evicted as the LRU
              candidate to make room for [3, 4, 5]
        - After complete_store([2, 3, 4, 5]), block 2 must still be present.
    """
    block_size = 256
98
99
100
101
102
103
    manager = CPUOffloadingManager(
        block_size=block_size,
        num_blocks=4,
        cache_policy=eviction_policy,
        enable_events=True,
    )
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132

    # store [1, 2] and complete
    manager.prepare_store(to_hashes([1, 2]))
    manager.complete_store(to_hashes([1, 2]))

    # touch [1] to make block 2 the LRU candidate
    manager.touch(to_hashes([1]))

    # prepare_store([2, 3, 4, 5]):
    #   - block 2 is already stored → filtered out of block_hashes_to_store
    #   - block 2 must NOT be evicted even though it is the LRU candidate
    #   - block 1 (ID 0) is evicted instead; new blocks [3,4,5] get IDs 2,3,0
    prepare_store_output = manager.prepare_store(to_hashes([2, 3, 4, 5]))
    verify_store_output(
        prepare_store_output,
        ExpectedPrepareStoreOutput(
            block_hashes_to_store=[3, 4, 5],
            store_block_ids=[2, 3, 0],
            block_hashes_evicted=[1],  # block 1 evicted, not block 2
        ),
    )

    # complete_store must not silently drop block 2
    manager.complete_store(to_hashes([2, 3, 4, 5]))

    # block 2 must still be present in the cache
    assert manager.lookup(to_hashes([2])) == 1


133
134
def test_cpu_manager():
    """
135
    Tests CPUOffloadingManager with lru policy.
136
137
138
    """
    # initialize a CPU backend with a capacity of 4 blocks
    block_size = 256
139
140
141
    cpu_manager = CPUOffloadingManager(
        block_size=block_size, num_blocks=4, cache_policy="lru", enable_events=True
    )
142
143
144
145
146
147
148
149
150

    # prepare store [1, 2]
    prepare_store_output = cpu_manager.prepare_store(to_hashes([1, 2]))
    verify_store_output(
        prepare_store_output,
        ExpectedPrepareStoreOutput(
            block_hashes_to_store=[1, 2],
            store_block_ids=[0, 1],
            block_hashes_evicted=[],
151
152
        ),
    )
153
154
155
156
157
158
159
160
161

    # lookup [1, 2] -> not ready
    assert cpu_manager.lookup(to_hashes([1, 2])) == 0

    # no events so far
    assert list(cpu_manager.take_events()) == []

    # complete store [1, 2]
    cpu_manager.complete_store(to_hashes([1, 2]))
162
163
164
    verify_events(
        cpu_manager.take_events(), block_size=block_size, expected_stores=({1, 2},)
    )
165
166
167
168
169
170
171
172
173
174
175
176
177
178

    # lookup [1, 2]
    assert cpu_manager.lookup(to_hashes([1])) == 1
    assert cpu_manager.lookup(to_hashes([1, 2])) == 2
    assert cpu_manager.lookup(to_hashes([1, 2, 3])) == 2

    # prepare store [2, 3, 4, 5] -> evicts [1]
    prepare_store_output = cpu_manager.prepare_store(to_hashes([2, 3, 4, 5]))
    verify_store_output(
        prepare_store_output,
        ExpectedPrepareStoreOutput(
            block_hashes_to_store=[3, 4, 5],
            store_block_ids=[2, 3, 0],
            block_hashes_evicted=[1],
179
180
        ),
    )
181
182

    # verify eviction event
183
184
185
    verify_events(
        cpu_manager.take_events(), block_size=block_size, expected_evictions=({1},)
    )
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210

    # prepare store with no space
    assert cpu_manager.prepare_store(to_hashes([1, 6])) is None

    # complete store [2, 3, 4, 5]
    cpu_manager.complete_store(to_hashes([2, 3, 4, 5]))

    # prepare load [2, 3]
    prepare_load_output = cpu_manager.prepare_load(to_hashes([2, 3]))
    verify_load_output(prepare_load_output, [1, 2])

    # prepare store with no space ([2, 3] is being loaded)
    assert cpu_manager.prepare_store(to_hashes([6, 7, 8])) is None

    # complete load [2, 3]
    cpu_manager.complete_load(to_hashes([2, 3]))

    # prepare store [6, 7, 8] -> evicts [2, 3, 4] (oldest)
    prepare_store_output = cpu_manager.prepare_store(to_hashes([6, 7, 8]))
    verify_store_output(
        prepare_store_output,
        ExpectedPrepareStoreOutput(
            block_hashes_to_store=[6, 7, 8],
            store_block_ids=[3, 2, 1],
            block_hashes_evicted=[2, 3, 4],
211
212
        ),
    )
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227

    # complete store [6, 7, 8]
    cpu_manager.complete_store(to_hashes([6, 7, 8]))

    # touch [5, 6, 7] (move to end of LRU order)
    cpu_manager.touch(to_hashes([5, 6, 7]))

    # prepare store [7, 9] -> evicts [8] (oldest following previous touch)
    prepare_store_output = cpu_manager.prepare_store(to_hashes([9]))
    verify_store_output(
        prepare_store_output,
        ExpectedPrepareStoreOutput(
            block_hashes_to_store=[9],
            store_block_ids=[1],
            block_hashes_evicted=[8],
228
229
        ),
    )
230
231
232
233
234
235
236
237

    # complete store [7, 9] with failure
    cpu_manager.complete_store(to_hashes([7, 9]), success=False)

    # assert [7] is still stored, but [9] is not
    assert cpu_manager.lookup(to_hashes([7])) == 1
    assert cpu_manager.lookup(to_hashes([9])) == 0

238
239
240
241
242
243
    verify_events(
        cpu_manager.take_events(),
        block_size=block_size,
        expected_stores=({3, 4, 5}, {6, 7, 8}),
        expected_evictions=({2, 3, 4}, {8}),
    )
244
245
246
247


def test_arc_manager_basic():
    """
248
    Tests CPUOffloadingManager with arc policy.
249
250
251
    Verifies that ARC handles store, load, and lookup operations correctly.
    """
    block_size = 256
252
253
254
255
256
    arc_manager = CPUOffloadingManager(
        block_size=block_size, num_blocks=4, cache_policy="arc", enable_events=True
    )
    arc_policy = arc_manager._policy
    assert isinstance(arc_policy, ARCCachePolicy)
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286

    # prepare store [1, 2]
    prepare_store_output = arc_manager.prepare_store(to_hashes([1, 2]))
    verify_store_output(
        prepare_store_output,
        ExpectedPrepareStoreOutput(
            block_hashes_to_store=[1, 2],
            store_block_ids=[0, 1],
            block_hashes_evicted=[],
        ),
    )

    # lookup [1, 2] -> not ready
    assert arc_manager.lookup(to_hashes([1, 2])) == 0

    # no events so far
    assert list(arc_manager.take_events()) == []

    # complete store [1, 2]
    arc_manager.complete_store(to_hashes([1, 2]))
    verify_events(
        arc_manager.take_events(), block_size=block_size, expected_stores=({1, 2},)
    )

    # lookup [1, 2]
    assert arc_manager.lookup(to_hashes([1])) == 1
    assert arc_manager.lookup(to_hashes([1, 2])) == 2
    assert arc_manager.lookup(to_hashes([1, 2, 3])) == 2

    # blocks should be in T1 (recent)
287
288
    assert len(arc_policy.t1) == 2
    assert len(arc_policy.t2) == 0
289
290
291
292
293
294
295
296


def test_arc_manager_t1_to_t2_promotion():
    """
    Tests that accessing a block in T1 promotes it to T2 (frequent).
    This is a key feature of ARC's adaptive behavior.
    """
    block_size = 256
297
298
299
300
301
    arc_manager = CPUOffloadingManager(
        block_size=block_size, num_blocks=4, cache_policy="arc", enable_events=False
    )
    arc_policy = arc_manager._policy
    assert isinstance(arc_policy, ARCCachePolicy)
302
303
304
305
306
307

    # store and complete block 1
    arc_manager.prepare_store(to_hashes([1]))
    arc_manager.complete_store(to_hashes([1]))

    # block 1 starts in T1 (recent)
308
309
    assert to_hashes([1])[0] in arc_policy.t1
    assert to_hashes([1])[0] not in arc_policy.t2
310
311
312
313
314

    # touch block 1 (simulate second access)
    arc_manager.touch(to_hashes([1]))

    # block 1 should now be in T2 (frequent)
315
316
    assert to_hashes([1])[0] not in arc_policy.t1
    assert to_hashes([1])[0] in arc_policy.t2
317
318
319
320
321
322
323
324


def test_arc_manager_eviction_with_load():
    """
    Tests ARC eviction behavior similar to LRU test.
    Verifies that blocks being loaded (ref_cnt > 0) cannot be evicted.
    """
    block_size = 256
325
326
327
    arc_manager = CPUOffloadingManager(
        block_size=block_size, num_blocks=4, cache_policy="arc", enable_events=True
    )
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366

    # prepare and complete store [1, 2, 3, 4]
    prepare_store_output = arc_manager.prepare_store(to_hashes([1, 2, 3, 4]))
    verify_store_output(
        prepare_store_output,
        ExpectedPrepareStoreOutput(
            block_hashes_to_store=[1, 2, 3, 4],
            store_block_ids=[0, 1, 2, 3],
            block_hashes_evicted=[],
        ),
    )
    arc_manager.complete_store(to_hashes([1, 2, 3, 4]))

    # prepare load [2, 3] (increases ref_cnt)
    prepare_load_output = arc_manager.prepare_load(to_hashes([2, 3]))
    verify_load_output(prepare_load_output, [1, 2])

    # prepare store [5, 6, 7] with [2, 3] being loaded
    # should fail because [2, 3] have ref_cnt > 0
    assert arc_manager.prepare_store(to_hashes([5, 6, 7])) is None

    # complete load [2, 3]
    arc_manager.complete_load(to_hashes([2, 3]))

    # now prepare store [5, 6, 7] should succeed
    # ARC will evict blocks one at a time from T1 as needed
    prepare_store_output = arc_manager.prepare_store(to_hashes([5, 6, 7]))
    assert prepare_store_output is not None
    # Should successfully evict enough blocks to make room (at least 1)
    assert len(prepare_store_output.block_hashes_evicted) >= 1


def test_arc_manager_adaptive_target():
    """
    Tests ARC's adaptive target adjustment via ghost lists.
    When a block in B1 (ghost list) is accessed, target_t1_size increases.
    When a block in B2 is accessed, target_t1_size decreases.
    """
    block_size = 256
367
368
369
370
371
    arc_manager = CPUOffloadingManager(
        block_size=block_size, num_blocks=2, cache_policy="arc", enable_events=False
    )
    arc_policy = arc_manager._policy
    assert isinstance(arc_policy, ARCCachePolicy)
372
373
374
375
376

    # store blocks 1, 2 (fills cache)
    arc_manager.prepare_store(to_hashes([1, 2]))
    arc_manager.complete_store(to_hashes([1, 2]))

377
    initial_target = arc_policy.target_t1_size
378
379
380
381
382
383

    # store block 3, evicting block 1 (moves to B1 ghost list)
    arc_manager.prepare_store(to_hashes([3]))
    arc_manager.complete_store(to_hashes([3]))

    # block 1 should be in B1 (ghost list)
384
    assert to_hashes([1])[0] in arc_policy.b1
385
386
387
388
389
390

    # touch block 1 (cache miss, but in B1)
    # this should increase target_t1_size (favor recency)
    arc_manager.touch(to_hashes([1]))

    # target should have increased
391
    assert arc_policy.target_t1_size > initial_target
392
393
394
395
396
397
398
399


def test_arc_manager_t1_t2_eviction_policy():
    """
    Tests that ARC evicts from T1 or T2 based on target_t1_size.
    If |T1| >= target_t1_size, evict from T1, otherwise from T2.
    """
    block_size = 256
400
401
402
403
404
    arc_manager = CPUOffloadingManager(
        block_size=block_size, num_blocks=4, cache_policy="arc", enable_events=False
    )
    arc_policy = arc_manager._policy
    assert isinstance(arc_policy, ARCCachePolicy)
405
406
407
408
409
410
411
412
413

    # store blocks 1, 2, 3, 4
    arc_manager.prepare_store(to_hashes([1, 2, 3, 4]))
    arc_manager.complete_store(to_hashes([1, 2, 3, 4]))

    # promote blocks 3, 4 to T2 by touching them
    arc_manager.touch(to_hashes([3, 4]))

    # now: T1 = {1, 2}, T2 = {3, 4}
414
415
    assert len(arc_policy.t1) == 2
    assert len(arc_policy.t2) == 2
416
417
418

    # set target_t1_size to prefer evicting from T1
    # (when |T1| >= target, evict from T1)
419
    arc_policy.target_t1_size = 1
420
421
422
423
424
425
426
427
428

    # store block 5, should evict from T1 (block 1, LRU in T1)
    output = arc_manager.prepare_store(to_hashes([5]))
    assert output is not None
    assert to_hashes([1]) == output.block_hashes_evicted

    arc_manager.complete_store(to_hashes([5]))

    # block 1 should be in B1 (ghost list)
429
    assert to_hashes([1])[0] in arc_policy.b1
430
    # block 5 should be in T1
431
    assert to_hashes([5])[0] in arc_policy.t1
432
433
434
435
436
437
438
439


def test_arc_manager_ghost_list_bounds():
    """
    Tests that ghost lists (B1, B2) don't grow unbounded.
    They should be capped at cache_capacity.
    """
    block_size = 256
440
441
442
443
444
    arc_manager = CPUOffloadingManager(
        block_size=block_size, num_blocks=2, cache_policy="arc", enable_events=False
    )
    arc_policy = arc_manager._policy
    assert isinstance(arc_policy, ARCCachePolicy)
445
446
447
448
449
450
451
452
453
454
455

    # fill cache with blocks 1, 2
    arc_manager.prepare_store(to_hashes([1, 2]))
    arc_manager.complete_store(to_hashes([1, 2]))

    # store many blocks to fill ghost lists
    for i in range(3, 20):
        arc_manager.prepare_store(to_hashes([i]))
        arc_manager.complete_store(to_hashes([i]))

    # ghost lists should not exceed cache_capacity
456
457
    assert len(arc_policy.b1) <= arc_policy.cache_capacity
    assert len(arc_policy.b2) <= arc_policy.cache_capacity
458
459
460
461
462
463
464
465


def test_arc_manager_touch_ordering():
    """
    Tests that touch() correctly updates access patterns.
    Similar to LRU test but verifies T1/T2 ordering.
    """
    block_size = 256
466
467
468
469
470
    arc_manager = CPUOffloadingManager(
        block_size=block_size, num_blocks=4, cache_policy="arc", enable_events=True
    )
    arc_policy = arc_manager._policy
    assert isinstance(arc_policy, ARCCachePolicy)
471
472
473
474
475
476
477
478
479
480
481
482
483

    # store blocks 1, 2, 3, 4
    arc_manager.prepare_store(to_hashes([1, 2, 3, 4]))
    arc_manager.complete_store(to_hashes([1, 2, 3, 4]))

    # promote 3, 4 to T2
    arc_manager.touch(to_hashes([3, 4]))

    # T1 = {1, 2}, T2 = {3, 4}
    # touch [1, 3, 4] - should promote 1 to T2, and move 3,4 to end of T2
    arc_manager.touch(to_hashes([1, 3, 4]))

    # T1 = {2}, T2 = {1, 3, 4} (in that order, with 4 most recent)
484
485
    assert len(arc_policy.t1) == 1
    assert len(arc_policy.t2) == 3
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504

    # store block 5, should evict from T1 (block 2, only one in T1)
    prepare_store_output = arc_manager.prepare_store(to_hashes([5]))
    verify_store_output(
        prepare_store_output,
        ExpectedPrepareStoreOutput(
            block_hashes_to_store=[5],
            store_block_ids=[1],  # reuses block 2's storage
            block_hashes_evicted=[2],
        ),
    )


def test_arc_manager_failed_store():
    """
    Tests that failed store operations clean up correctly.
    Similar to LRU test but for ARC.
    """
    block_size = 256
505
506
507
508
509
    arc_manager = CPUOffloadingManager(
        block_size=block_size, num_blocks=4, cache_policy="arc", enable_events=True
    )
    arc_policy = arc_manager._policy
    assert isinstance(arc_policy, ARCCachePolicy)
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525

    # store blocks 1, 2, 3, 4
    arc_manager.prepare_store(to_hashes([1, 2, 3, 4]))
    arc_manager.complete_store(to_hashes([1, 2, 3, 4]))

    # prepare store block 5 (will evict block 1)
    prepare_store_output = arc_manager.prepare_store(to_hashes([5]))
    assert prepare_store_output is not None
    assert len(prepare_store_output.block_hashes_evicted) == 1

    # complete store with failure
    arc_manager.complete_store(to_hashes([5]), success=False)

    # block 5 should not be in cache
    assert arc_manager.lookup(to_hashes([5])) == 0
    # block 5 should not be in T1 or T2
526
527
    assert to_hashes([5])[0] not in arc_policy.t1
    assert to_hashes([5])[0] not in arc_policy.t2
528
529
530

    # evicted block should still be gone (in B1 ghost list)
    evicted_hash = prepare_store_output.block_hashes_evicted[0]
531
    assert evicted_hash in arc_policy.b1
532
533
534
535
536
537
538
539


def test_arc_manager_full_scenario():
    """
    Comprehensive test covering multiple ARC operations in sequence.
    Similar to the full LRU test but adapted for ARC behavior.
    """
    block_size = 256
540
541
542
543
544
    arc_manager = CPUOffloadingManager(
        block_size=block_size, num_blocks=4, cache_policy="arc", enable_events=True
    )
    arc_policy = arc_manager._policy
    assert isinstance(arc_policy, ARCCachePolicy)
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559

    # store [1, 2]
    arc_manager.prepare_store(to_hashes([1, 2]))
    arc_manager.complete_store(to_hashes([1, 2]))

    # store [3, 4, 5] -> evicts [1]
    prepare_store_output = arc_manager.prepare_store(to_hashes([3, 4, 5]))
    assert prepare_store_output is not None
    assert len(prepare_store_output.block_hashes_evicted) == 1
    arc_manager.complete_store(to_hashes([3, 4, 5]))

    # promote some blocks to T2
    arc_manager.touch(to_hashes([2, 3]))

    # T1 has {4, 5}, T2 has {2, 3}
560
561
    assert len(arc_policy.t1) == 2
    assert len(arc_policy.t2) == 2
562
563
564
565
566
567
568
569
570
571
572
573
574

    # store [6] -> should evict from T1 (4 is oldest in T1)
    prepare_store_output = arc_manager.prepare_store(to_hashes([6]))
    assert prepare_store_output is not None
    arc_manager.complete_store(to_hashes([6]))

    # verify blocks 2, 3 (in T2) are still present
    assert arc_manager.lookup(to_hashes([2])) == 1
    assert arc_manager.lookup(to_hashes([3])) == 1

    # verify events
    events = list(arc_manager.take_events())
    assert len(events) > 0  # should have store and eviction events
575
576
577
578


def test_filter_reused_manager():
    """
579
    Tests FilterReusedOffloadingManager with a CPUOffloadingManager.
580
581
    """
    block_size = 256
582
583
584
    lru_manager = CPUOffloadingManager(
        block_size=block_size, num_blocks=4, cache_policy="lru", enable_events=True
    )
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624

    from vllm.v1.kv_offload.reuse_manager import FilterReusedOffloadingManager

    manager = FilterReusedOffloadingManager(
        backing=lru_manager, store_threshold=2, max_tracker_size=3
    )

    # Lookup [1, 2] -> 1st time, added to tracker but not eligible for store yet
    assert manager.lookup(to_hashes([1, 2])) == 0

    # prepare store [1, 2] -> should be filtered
    prepare_store_output = manager.prepare_store(to_hashes([1, 2]))
    assert prepare_store_output is not None
    assert prepare_store_output.block_hashes_to_store == []

    # Lookup [1] -> 2nd time, eligible now
    assert manager.lookup(to_hashes([1])) == 0

    # prepare store [1, 2] -> [1] should be eligible, [2] should be filtered
    prepare_store_output = manager.prepare_store(to_hashes([1, 2]))
    assert prepare_store_output is not None
    assert prepare_store_output.block_hashes_to_store == to_hashes([1])

    # Lookup [3, 4] -> 1st time
    # (evicts [2] from tracker since max_size is 3 and tracker has [1])
    assert manager.lookup(to_hashes([3, 4])) == 0
    # Verify [2] was evicted from the tracker (tracker now has: [1], [3], [4])
    assert to_hashes([2])[0] not in manager.counts

    # Lookup [2] again -> (this adds [2] back to the tracker as 1st time)
    assert manager.lookup(to_hashes([2])) == 0
    # Verify [2] was re-added with count=1 (not eligible yet)
    assert manager.counts.get(to_hashes([2])[0]) == 1

    # prepare store [2] -> should still be filtered out since count was reset
    prepare_store_output = manager.prepare_store(to_hashes([2]))
    assert prepare_store_output is not None
    assert prepare_store_output.block_hashes_to_store == []

    manager.complete_store(to_hashes([1]))