test_cpu_manager.py 21.8 KB
Newer Older
1
2
3
4
5
6
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
from collections.abc import Iterable
from dataclasses import dataclass

import numpy as np
7
import pytest
8

9
10
11
from vllm.v1.kv_offload.abstract import (
    LoadStoreSpec,
    OffloadingEvent,
12
    OffloadKey,
13
    PrepareStoreOutput,
14
    ReqContext,
15
    make_offload_key,
16
)
17
18
from vllm.v1.kv_offload.cpu.manager import CPUOffloadingManager
from vllm.v1.kv_offload.cpu.policies.arc import ARCCachePolicy
19
from vllm.v1.kv_offload.mediums import CPULoadStoreSpec
20
from vllm.v1.kv_offload.reuse_manager import FilterReusedOffloadingManager
21
22


23
24
25
26
27
28
29
30
def make_req_context(kv_transfer_params: dict | None = None) -> ReqContext:
    """Create a ReqContext as production code would, from a request's params."""
    return ReqContext(kv_transfer_params=kv_transfer_params)


_EMPTY_REQ_CTX = make_req_context()


31
32
@dataclass
class ExpectedPrepareStoreOutput:
33
    keys_to_store: list[int]
34
    store_block_ids: list[int]
35
    evicted_keys: list[int]
36
37


38
39
40
41
42
43
def to_key(int_hash: int) -> OffloadKey:
    return make_offload_key(str(int_hash).encode(), 0)


def to_keys(int_hashes: list[int]) -> list[OffloadKey]:
    return [to_key(i) for i in int_hashes]
44
45
46


def verify_store_output(
47
    prepare_store_output: PrepareStoreOutput | None,
48
49
    expected_prepare_store_output: ExpectedPrepareStoreOutput,
):
50
    assert prepare_store_output is not None
51
52
    assert prepare_store_output.keys_to_store == to_keys(
        expected_prepare_store_output.keys_to_store
53
    )
54
55
    assert prepare_store_output.evicted_keys == to_keys(
        expected_prepare_store_output.evicted_keys
56
    )
57
58
    store_spec = prepare_store_output.store_spec
    assert isinstance(store_spec, CPULoadStoreSpec)
59
60
61
    expected_array = np.array(
        expected_prepare_store_output.store_block_ids, dtype=np.int64
    )
62
63
64
    assert np.array_equal(expected_array, store_spec.block_ids)


65
66
67
def verify_load_output(
    prepare_load_output: LoadStoreSpec, expected_prepare_load_output: list[int]
):
68
69
70
71
72
    assert isinstance(prepare_load_output, CPULoadStoreSpec)
    expected_array = np.array(expected_prepare_load_output, dtype=np.int64)
    assert np.array_equal(expected_array, prepare_load_output.block_ids)


73
74
75
76
77
def verify_events(
    events: Iterable[OffloadingEvent],
    expected_stores: tuple[set[int], ...] = (),
    expected_evictions: tuple[set[int], ...] = (),
):
78
79
    stores: list[set[OffloadKey]] = []
    evictions: list[set[OffloadKey]] = []
80
81
82
    for event in events:
        assert event.medium == CPULoadStoreSpec.medium()
        if event.removed:
83
            evictions.append(set(event.keys))
84
        else:
85
            stores.append(set(event.keys))
86

87
88
89
90
    def to_key_sets(
        int_sets: tuple[set[int], ...],
    ) -> tuple[set[OffloadKey], ...]:
        return tuple([set(to_keys(list(int_set))) for int_set in int_sets])
91

92
93
    assert tuple(evictions) == to_key_sets(expected_evictions)
    assert tuple(stores) == to_key_sets(expected_stores)
94
95


96
97
@pytest.mark.parametrize("eviction_policy", ["lru", "arc"])
def test_already_stored_block_not_evicted_during_prepare_store(eviction_policy):
98
99
100
    """
    Regression test: a block that is already stored must not be evicted
    by prepare_store() when it needs to make room for new blocks.
101
    Applies to both lru and arc policies.
102
103
104
105
106
107
108
109
110
111

    Scenario:
        - Store blocks [1, 2] and complete.
        - touch([1]) makes block 2 the LRU candidate.
        - prepare_store([2, 3, 4, 5]):
            * block 2 is filtered out as "already stored"
            * but without the fix, block 2 would be evicted as the LRU
              candidate to make room for [3, 4, 5]
        - After complete_store([2, 3, 4, 5]), block 2 must still be present.
    """
112
113
114
115
116
    manager = CPUOffloadingManager(
        num_blocks=4,
        cache_policy=eviction_policy,
        enable_events=True,
    )
117
118

    # store [1, 2] and complete
119
    manager.prepare_store(to_keys([1, 2]), _EMPTY_REQ_CTX)
120
    manager.complete_store(to_keys([1, 2]))
121
122

    # touch [1] to make block 2 the LRU candidate
123
    manager.touch(to_keys([1]))
124
125

    # prepare_store([2, 3, 4, 5]):
126
    #   - block 2 is already stored -> filtered out of keys_to_store
127
128
    #   - block 2 must NOT be evicted even though it is the LRU candidate
    #   - block 1 (ID 0) is evicted instead; new blocks [3,4,5] get IDs 2,3,0
129
    prepare_store_output = manager.prepare_store(to_keys([2, 3, 4, 5]), _EMPTY_REQ_CTX)
130
131
132
    verify_store_output(
        prepare_store_output,
        ExpectedPrepareStoreOutput(
133
            keys_to_store=[3, 4, 5],
134
            store_block_ids=[2, 3, 0],
135
            evicted_keys=[1],  # block 1 evicted, not block 2
136
137
138
139
        ),
    )

    # complete_store must not silently drop block 2
140
    manager.complete_store(to_keys([2, 3, 4, 5]))
141
142

    # block 2 must still be present in the cache
143
    assert manager.lookup(to_key(2), _EMPTY_REQ_CTX) is True
144
145


146
147
def test_cpu_manager():
    """
148
    Tests CPUOffloadingManager with lru policy.
149
    """
150
    # initialize a CPU manager with a capacity of 4 blocks
151
    cpu_manager = CPUOffloadingManager(
152
        num_blocks=4, cache_policy="lru", enable_events=True
153
    )
154
155

    # prepare store [1, 2]
156
    prepare_store_output = cpu_manager.prepare_store(to_keys([1, 2]), _EMPTY_REQ_CTX)
157
158
159
    verify_store_output(
        prepare_store_output,
        ExpectedPrepareStoreOutput(
160
            keys_to_store=[1, 2],
161
            store_block_ids=[0, 1],
162
            evicted_keys=[],
163
164
        ),
    )
165
166

    # lookup [1, 2] -> not ready
167
168
    assert cpu_manager.lookup(to_key(1), _EMPTY_REQ_CTX) is False
    assert cpu_manager.lookup(to_key(2), _EMPTY_REQ_CTX) is False
169
170
171
172
173

    # no events so far
    assert list(cpu_manager.take_events()) == []

    # complete store [1, 2]
174
    cpu_manager.complete_store(to_keys([1, 2]))
175
    verify_events(cpu_manager.take_events(), expected_stores=({1, 2},))
176
177

    # lookup [1, 2]
178
179
180
    assert cpu_manager.lookup(to_key(1), _EMPTY_REQ_CTX) is True
    assert cpu_manager.lookup(to_key(2), _EMPTY_REQ_CTX) is True
    assert cpu_manager.lookup(to_key(3), _EMPTY_REQ_CTX) is False
181
182

    # prepare store [2, 3, 4, 5] -> evicts [1]
183
184
185
    prepare_store_output = cpu_manager.prepare_store(
        to_keys([2, 3, 4, 5]), _EMPTY_REQ_CTX
    )
186
187
188
    verify_store_output(
        prepare_store_output,
        ExpectedPrepareStoreOutput(
189
            keys_to_store=[3, 4, 5],
190
            store_block_ids=[2, 3, 0],
191
            evicted_keys=[1],
192
193
        ),
    )
194
195

    # verify eviction event
196
    verify_events(cpu_manager.take_events(), expected_evictions=({1},))
197
198

    # prepare store with no space
199
    assert cpu_manager.prepare_store(to_keys([1, 6]), _EMPTY_REQ_CTX) is None
200
201

    # complete store [2, 3, 4, 5]
202
    cpu_manager.complete_store(to_keys([2, 3, 4, 5]))
203

204
205
206
207
208
209
210
211
    # lookup (now that we have [2, 3, 4, 5])
    assert cpu_manager.lookup(to_key(1), _EMPTY_REQ_CTX) is False
    assert cpu_manager.lookup(to_key(2), _EMPTY_REQ_CTX) is True
    assert cpu_manager.lookup(to_key(3), _EMPTY_REQ_CTX) is True
    assert cpu_manager.lookup(to_key(4), _EMPTY_REQ_CTX) is True
    assert cpu_manager.lookup(to_key(5), _EMPTY_REQ_CTX) is True
    assert cpu_manager.lookup(to_key(0), _EMPTY_REQ_CTX) is False

212
    # prepare load [2, 3]
213
    prepare_load_output = cpu_manager.prepare_load(to_keys([2, 3]), _EMPTY_REQ_CTX)
214
215
216
    verify_load_output(prepare_load_output, [1, 2])

    # prepare store with no space ([2, 3] is being loaded)
217
    assert cpu_manager.prepare_store(to_keys([6, 7, 8]), _EMPTY_REQ_CTX) is None
218
219

    # complete load [2, 3]
220
    cpu_manager.complete_load(to_keys([2, 3]))
221
222

    # prepare store [6, 7, 8] -> evicts [2, 3, 4] (oldest)
223
    prepare_store_output = cpu_manager.prepare_store(to_keys([6, 7, 8]), _EMPTY_REQ_CTX)
224
225
226
    verify_store_output(
        prepare_store_output,
        ExpectedPrepareStoreOutput(
227
            keys_to_store=[6, 7, 8],
228
            store_block_ids=[3, 2, 1],
229
            evicted_keys=[2, 3, 4],
230
231
        ),
    )
232
233

    # complete store [6, 7, 8]
234
    cpu_manager.complete_store(to_keys([6, 7, 8]))
235
236

    # touch [5, 6, 7] (move to end of LRU order)
237
    cpu_manager.touch(to_keys([5, 6, 7]))
238
239

    # prepare store [7, 9] -> evicts [8] (oldest following previous touch)
240
    prepare_store_output = cpu_manager.prepare_store(to_keys([9]), _EMPTY_REQ_CTX)
241
242
243
    verify_store_output(
        prepare_store_output,
        ExpectedPrepareStoreOutput(
244
            keys_to_store=[9],
245
            store_block_ids=[1],
246
            evicted_keys=[8],
247
248
        ),
    )
249
250

    # complete store [7, 9] with failure
251
    cpu_manager.complete_store(to_keys([7, 9]), success=False)
252
253

    # assert [7] is still stored, but [9] is not
254
255
    assert cpu_manager.lookup(to_key(7), _EMPTY_REQ_CTX) is True
    assert cpu_manager.lookup(to_key(9), _EMPTY_REQ_CTX) is False
256

257
258
259
260
261
    verify_events(
        cpu_manager.take_events(),
        expected_stores=({3, 4, 5}, {6, 7, 8}),
        expected_evictions=({2, 3, 4}, {8}),
    )
262
263


264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
class TestARCPolicy:
    """Unit tests for CPUOffloadingManager with ARC eviction policy."""

    def _make_manager(
        self, num_blocks: int = 4, enable_events: bool = True
    ) -> tuple[CPUOffloadingManager, ARCCachePolicy]:
        manager = CPUOffloadingManager(
            num_blocks=num_blocks,
            cache_policy="arc",
            enable_events=enable_events,
        )
        policy = manager._policy
        assert isinstance(policy, ARCCachePolicy)
        return manager, policy

    def test_basic(self):
        """
        Tests CPUOffloadingManager with arc policy.
        Verifies that ARC handles store, load, and lookup operations correctly.
        """
        cpu_manager, arc_policy = self._make_manager()

        # prepare store [1, 2]
287
288
289
        prepare_store_output = cpu_manager.prepare_store(
            to_keys([1, 2]), _EMPTY_REQ_CTX
        )
290
291
292
        verify_store_output(
            prepare_store_output,
            ExpectedPrepareStoreOutput(
293
                keys_to_store=[1, 2],
294
                store_block_ids=[0, 1],
295
                evicted_keys=[],
296
297
298
299
            ),
        )

        # lookup [1, 2] -> not ready
300
301
        assert cpu_manager.lookup(to_key(1), _EMPTY_REQ_CTX) is False
        assert cpu_manager.lookup(to_key(2), _EMPTY_REQ_CTX) is False
302
303
304
305
306

        # no events so far
        assert list(cpu_manager.take_events()) == []

        # complete store [1, 2]
307
        cpu_manager.complete_store(to_keys([1, 2]))
308
        verify_events(cpu_manager.take_events(), expected_stores=({1, 2},))
309
310

        # lookup [1, 2]
311
312
313
        assert cpu_manager.lookup(to_key(1), _EMPTY_REQ_CTX) is True
        assert cpu_manager.lookup(to_key(2), _EMPTY_REQ_CTX) is True
        assert cpu_manager.lookup(to_key(3), _EMPTY_REQ_CTX) is False
314
315
316
317
318
319
320
321
322
323
324
325
326

        # blocks should be in T1 (recent)
        assert len(arc_policy.t1) == 2
        assert len(arc_policy.t2) == 0

    def test_t1_to_t2_promotion(self):
        """
        Tests that accessing a block in T1 promotes it to T2 (frequent).
        This is a key feature of ARC's adaptive behavior.
        """
        cpu_manager, arc_policy = self._make_manager(enable_events=False)

        # store and complete block 1
327
        cpu_manager.prepare_store(to_keys([1]), _EMPTY_REQ_CTX)
328
        cpu_manager.complete_store(to_keys([1]))
329
330

        # block 1 starts in T1 (recent)
331
332
        assert to_keys([1])[0] in arc_policy.t1
        assert to_keys([1])[0] not in arc_policy.t2
333
334

        # touch block 1 (simulate second access)
335
        cpu_manager.touch(to_keys([1]))
336
337

        # block 1 should now be in T2 (frequent)
338
339
        assert to_keys([1])[0] not in arc_policy.t1
        assert to_keys([1])[0] in arc_policy.t2
340
341
342
343
344
345
346
347
348

    def test_eviction_with_load(self):
        """
        Tests ARC eviction behavior similar to LRU test.
        Verifies that blocks being loaded (ref_cnt > 0) cannot be evicted.
        """
        cpu_manager, _ = self._make_manager()

        # prepare and complete store [1, 2, 3, 4]
349
350
351
        prepare_store_output = cpu_manager.prepare_store(
            to_keys([1, 2, 3, 4]), _EMPTY_REQ_CTX
        )
352
353
354
        verify_store_output(
            prepare_store_output,
            ExpectedPrepareStoreOutput(
355
                keys_to_store=[1, 2, 3, 4],
356
                store_block_ids=[0, 1, 2, 3],
357
                evicted_keys=[],
358
359
            ),
        )
360
        cpu_manager.complete_store(to_keys([1, 2, 3, 4]))
361
362

        # prepare load [2, 3] (increases ref_cnt)
363
        prepare_load_output = cpu_manager.prepare_load(to_keys([2, 3]), _EMPTY_REQ_CTX)
364
365
366
367
        verify_load_output(prepare_load_output, [1, 2])

        # prepare store [5, 6, 7] with [2, 3] being loaded
        # should fail because [2, 3] have ref_cnt > 0
368
        assert cpu_manager.prepare_store(to_keys([5, 6, 7]), _EMPTY_REQ_CTX) is None
369
370

        # complete load [2, 3]
371
        cpu_manager.complete_load(to_keys([2, 3]))
372
373
374

        # now prepare store [5, 6, 7] should succeed
        # ARC will evict blocks one at a time from T1 as needed
375
376
377
        prepare_store_output = cpu_manager.prepare_store(
            to_keys([5, 6, 7]), _EMPTY_REQ_CTX
        )
378
379
        assert prepare_store_output is not None
        # Should successfully evict enough blocks to make room (at least 1)
380
        assert len(prepare_store_output.evicted_keys) >= 1
381
382
383
384
385
386
387
388
389
390

    def test_adaptive_target(self):
        """
        Tests ARC's adaptive target adjustment via ghost lists.
        When a block in B1 (ghost list) is accessed, target_t1_size increases.
        When a block in B2 is accessed, target_t1_size decreases.
        """
        cpu_manager, arc_policy = self._make_manager(num_blocks=2, enable_events=False)

        # store blocks 1, 2 (fills cache)
391
        cpu_manager.prepare_store(to_keys([1, 2]), _EMPTY_REQ_CTX)
392
        cpu_manager.complete_store(to_keys([1, 2]))
393
394
395
396

        initial_target = arc_policy.target_t1_size

        # store block 3, evicting block 1 (moves to B1 ghost list)
397
        cpu_manager.prepare_store(to_keys([3]), _EMPTY_REQ_CTX)
398
        cpu_manager.complete_store(to_keys([3]))
399
400

        # block 1 should be in B1 (ghost list)
401
        assert to_keys([1])[0] in arc_policy.b1
402
403
404

        # touch block 1 (cache miss, but in B1)
        # this should increase target_t1_size (favor recency)
405
        cpu_manager.touch(to_keys([1]))
406
407
408
409
410
411
412
413
414
415
416
417

        # target should have increased
        assert arc_policy.target_t1_size > initial_target

    def test_t1_t2_eviction_policy(self):
        """
        Tests that ARC evicts from T1 or T2 based on target_t1_size.
        If |T1| >= target_t1_size, evict from T1, otherwise from T2.
        """
        cpu_manager, arc_policy = self._make_manager(enable_events=False)

        # store blocks 1, 2, 3, 4
418
        cpu_manager.prepare_store(to_keys([1, 2, 3, 4]), _EMPTY_REQ_CTX)
419
        cpu_manager.complete_store(to_keys([1, 2, 3, 4]))
420
421

        # promote blocks 3, 4 to T2 by touching them
422
        cpu_manager.touch(to_keys([3, 4]))
423
424
425
426
427
428
429
430
431
432

        # now: T1 = {1, 2}, T2 = {3, 4}
        assert len(arc_policy.t1) == 2
        assert len(arc_policy.t2) == 2

        # set target_t1_size to prefer evicting from T1
        # (when |T1| >= target, evict from T1)
        arc_policy.target_t1_size = 1

        # store block 5, should evict from T1 (block 1, LRU in T1)
433
        output = cpu_manager.prepare_store(to_keys([5]), _EMPTY_REQ_CTX)
434
        assert output is not None
435
        assert to_keys([1]) == output.evicted_keys
436

437
        cpu_manager.complete_store(to_keys([5]))
438
439

        # block 1 should be in B1 (ghost list)
440
        assert to_keys([1])[0] in arc_policy.b1
441
        # block 5 should be in T1
442
        assert to_keys([5])[0] in arc_policy.t1
443
444
445
446
447
448
449
450
451

    def test_ghost_list_bounds(self):
        """
        Tests that ghost lists (B1, B2) don't grow unbounded.
        They should be capped at cache_capacity.
        """
        cpu_manager, arc_policy = self._make_manager(num_blocks=2, enable_events=False)

        # fill cache with blocks 1, 2
452
        cpu_manager.prepare_store(to_keys([1, 2]), _EMPTY_REQ_CTX)
453
        cpu_manager.complete_store(to_keys([1, 2]))
454
455
456

        # store many blocks to fill ghost lists
        for i in range(3, 20):
457
            cpu_manager.prepare_store(to_keys([i]), _EMPTY_REQ_CTX)
458
            cpu_manager.complete_store(to_keys([i]))
459
460
461
462
463
464
465
466
467
468
469
470
471

        # ghost lists should not exceed cache_capacity
        assert len(arc_policy.b1) <= arc_policy.cache_capacity
        assert len(arc_policy.b2) <= arc_policy.cache_capacity

    def test_touch_ordering(self):
        """
        Tests that touch() correctly updates access patterns.
        Similar to LRU test but verifies T1/T2 ordering.
        """
        cpu_manager, arc_policy = self._make_manager()

        # store blocks 1, 2, 3, 4
472
        cpu_manager.prepare_store(to_keys([1, 2, 3, 4]), _EMPTY_REQ_CTX)
473
        cpu_manager.complete_store(to_keys([1, 2, 3, 4]))
474
475

        # promote 3, 4 to T2
476
        cpu_manager.touch(to_keys([3, 4]))
477
478
479

        # T1 = {1, 2}, T2 = {3, 4}
        # touch [1, 3, 4] - should promote 1 to T2, and move 3,4 to end of T2
480
        cpu_manager.touch(to_keys([1, 3, 4]))
481
482
483
484
485
486

        # T1 = {2}, T2 = {1, 3, 4} (in that order, with 4 most recent)
        assert len(arc_policy.t1) == 1
        assert len(arc_policy.t2) == 3

        # store block 5, should evict from T1 (block 2, only one in T1)
487
        prepare_store_output = cpu_manager.prepare_store(to_keys([5]), _EMPTY_REQ_CTX)
488
489
490
        verify_store_output(
            prepare_store_output,
            ExpectedPrepareStoreOutput(
491
                keys_to_store=[5],
492
                store_block_ids=[1],  # reuses block 2's storage
493
                evicted_keys=[2],
494
495
496
497
498
499
500
501
502
503
504
            ),
        )

    def test_failed_store(self):
        """
        Tests that failed store operations clean up correctly.
        Similar to LRU test but for ARC.
        """
        cpu_manager, arc_policy = self._make_manager()

        # store blocks 1, 2, 3, 4
505
        cpu_manager.prepare_store(to_keys([1, 2, 3, 4]), _EMPTY_REQ_CTX)
506
        cpu_manager.complete_store(to_keys([1, 2, 3, 4]))
507
508

        # prepare store block 5 (will evict block 1)
509
        prepare_store_output = cpu_manager.prepare_store(to_keys([5]), _EMPTY_REQ_CTX)
510
        assert prepare_store_output is not None
511
        assert len(prepare_store_output.evicted_keys) == 1
512
513

        # complete store with failure
514
        cpu_manager.complete_store(to_keys([5]), success=False)
515
516

        # block 5 should not be in cache
517
        assert cpu_manager.lookup(to_key(5), _EMPTY_REQ_CTX) is False
518
        # block 5 should not be in T1 or T2
519
520
        assert to_keys([5])[0] not in arc_policy.t1
        assert to_keys([5])[0] not in arc_policy.t2
521
522

        # evicted block should still be gone (in B1 ghost list)
523
        evicted_hash = prepare_store_output.evicted_keys[0]
524
525
526
527
528
529
530
531
532
533
        assert evicted_hash in arc_policy.b1

    def test_full_scenario(self):
        """
        Comprehensive test covering multiple ARC operations in sequence.
        Similar to the full LRU test but adapted for ARC behavior.
        """
        cpu_manager, arc_policy = self._make_manager()

        # store [1, 2]
534
        cpu_manager.prepare_store(to_keys([1, 2]), _EMPTY_REQ_CTX)
535
        cpu_manager.complete_store(to_keys([1, 2]))
536
537

        # store [3, 4, 5] -> evicts [1]
538
539
540
        prepare_store_output = cpu_manager.prepare_store(
            to_keys([3, 4, 5]), _EMPTY_REQ_CTX
        )
541
        assert prepare_store_output is not None
542
543
        assert len(prepare_store_output.evicted_keys) == 1
        cpu_manager.complete_store(to_keys([3, 4, 5]))
544
545

        # promote some blocks to T2
546
        cpu_manager.touch(to_keys([2, 3]))
547
548
549
550
551
552

        # T1 has {4, 5}, T2 has {2, 3}
        assert len(arc_policy.t1) == 2
        assert len(arc_policy.t2) == 2

        # store [6] -> should evict from T1 (4 is oldest in T1)
553
        prepare_store_output = cpu_manager.prepare_store(to_keys([6]), _EMPTY_REQ_CTX)
554
        assert prepare_store_output is not None
555
        cpu_manager.complete_store(to_keys([6]))
556
557

        # verify blocks 2, 3 (in T2) are still present
558
559
        assert cpu_manager.lookup(to_key(2), _EMPTY_REQ_CTX) is True
        assert cpu_manager.lookup(to_key(3), _EMPTY_REQ_CTX) is True
560
561
562
563

        # verify events
        events = list(cpu_manager.take_events())
        assert len(events) > 0  # should have store and eviction events
564
565
566
567


def test_filter_reused_manager():
    """
568
    Tests FilterReusedOffloadingManager with a CPUOffloadingManager.
569
    """
570
    lru_manager = CPUOffloadingManager(
571
        num_blocks=4, cache_policy="lru", enable_events=True
572
    )
573
574
575
576
577
578

    manager = FilterReusedOffloadingManager(
        backing=lru_manager, store_threshold=2, max_tracker_size=3
    )

    # Lookup [1, 2] -> 1st time, added to tracker but not eligible for store yet
579
580
    assert manager.lookup(to_key(1), _EMPTY_REQ_CTX) is False
    assert manager.lookup(to_key(2), _EMPTY_REQ_CTX) is False
581
582

    # prepare store [1, 2] -> should be filtered
583
    prepare_store_output = manager.prepare_store(to_keys([1, 2]), _EMPTY_REQ_CTX)
584
    assert prepare_store_output is not None
585
    assert prepare_store_output.keys_to_store == []
586
587

    # Lookup [1] -> 2nd time, eligible now
588
    assert manager.lookup(to_key(1), _EMPTY_REQ_CTX) is False
589
590

    # prepare store [1, 2] -> [1] should be eligible, [2] should be filtered
591
    prepare_store_output = manager.prepare_store(to_keys([1, 2]), _EMPTY_REQ_CTX)
592
    assert prepare_store_output is not None
593
    assert prepare_store_output.keys_to_store == to_keys([1])
594
595
596

    # Lookup [3, 4] -> 1st time
    # (evicts [2] from tracker since max_size is 3 and tracker has [1])
597
598
    assert manager.lookup(to_key(3), _EMPTY_REQ_CTX) is False
    assert manager.lookup(to_key(4), _EMPTY_REQ_CTX) is False
599
    # Verify [2] was evicted from the tracker (tracker now has: [1], [3], [4])
600
    assert to_keys([2])[0] not in manager.counts
601
602

    # Lookup [2] again -> (this adds [2] back to the tracker as 1st time)
603
    assert manager.lookup(to_key(2), _EMPTY_REQ_CTX) is False
604
    # Verify [2] was re-added with count=1 (not eligible yet)
605
    assert manager.counts.get(to_keys([2])[0]) == 1
606
607

    # prepare store [2] -> should still be filtered out since count was reset
608
    prepare_store_output = manager.prepare_store(to_keys([2]), _EMPTY_REQ_CTX)
609
    assert prepare_store_output is not None
610
    assert prepare_store_output.keys_to_store == []
611

612
    manager.complete_store(to_keys([1]))