test_cpu_manager.py 20.7 KB
Newer Older
1
2
3
4
5
6
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
from collections.abc import Iterable
from dataclasses import dataclass

import numpy as np
7
import pytest
8
9

from vllm.v1.core.kv_cache_utils import BlockHash
10
11
12
13
14
from vllm.v1.kv_offload.abstract import (
    LoadStoreSpec,
    OffloadingEvent,
    PrepareStoreOutput,
)
15
16
from vllm.v1.kv_offload.cpu.manager import CPUOffloadingManager
from vllm.v1.kv_offload.cpu.policies.arc import ARCCachePolicy
17
from vllm.v1.kv_offload.mediums import CPULoadStoreSpec
18
from vllm.v1.kv_offload.reuse_manager import FilterReusedOffloadingManager
19
20
21
22
23
24
25
26
27
28
29
30
31
32


@dataclass
class ExpectedPrepareStoreOutput:
    block_hashes_to_store: list[int]
    store_block_ids: list[int]
    block_hashes_evicted: list[int]


def to_hashes(int_hashes: list[int]) -> list[BlockHash]:
    return [BlockHash(str(i).encode()) for i in int_hashes]


def verify_store_output(
33
    prepare_store_output: PrepareStoreOutput | None,
34
35
    expected_prepare_store_output: ExpectedPrepareStoreOutput,
):
36
    assert prepare_store_output is not None
37
38
39
40
41
42
    assert prepare_store_output.block_hashes_to_store == to_hashes(
        expected_prepare_store_output.block_hashes_to_store
    )
    assert prepare_store_output.block_hashes_evicted == to_hashes(
        expected_prepare_store_output.block_hashes_evicted
    )
43
44
    store_spec = prepare_store_output.store_spec
    assert isinstance(store_spec, CPULoadStoreSpec)
45
46
47
    expected_array = np.array(
        expected_prepare_store_output.store_block_ids, dtype=np.int64
    )
48
49
50
    assert np.array_equal(expected_array, store_spec.block_ids)


51
52
53
def verify_load_output(
    prepare_load_output: LoadStoreSpec, expected_prepare_load_output: list[int]
):
54
55
56
57
58
    assert isinstance(prepare_load_output, CPULoadStoreSpec)
    expected_array = np.array(expected_prepare_load_output, dtype=np.int64)
    assert np.array_equal(expected_array, prepare_load_output.block_ids)


59
60
61
62
63
64
def verify_events(
    events: Iterable[OffloadingEvent],
    block_size: int,
    expected_stores: tuple[set[int], ...] = (),
    expected_evictions: tuple[set[int], ...] = (),
):
65
66
67
68
69
70
71
72
73
74
    stores: list[set[BlockHash]] = []
    evictions: list[set[BlockHash]] = []
    for event in events:
        assert event.medium == CPULoadStoreSpec.medium()
        assert event.block_size == block_size
        if event.removed:
            evictions.append(set(event.block_hashes))
        else:
            stores.append(set(event.block_hashes))

75
    def to_hash_sets(int_sets: tuple[set[int], ...]) -> tuple[set[BlockHash], ...]:
76
77
78
79
80
81
        return tuple([set(to_hashes(list(int_set))) for int_set in int_sets])

    assert tuple(evictions) == to_hash_sets(expected_evictions)
    assert tuple(stores) == to_hash_sets(expected_stores)


82
83
@pytest.mark.parametrize("eviction_policy", ["lru", "arc"])
def test_already_stored_block_not_evicted_during_prepare_store(eviction_policy):
84
85
86
    """
    Regression test: a block that is already stored must not be evicted
    by prepare_store() when it needs to make room for new blocks.
87
    Applies to both lru and arc policies.
88
89
90
91
92
93
94
95
96
97
98

    Scenario:
        - Store blocks [1, 2] and complete.
        - touch([1]) makes block 2 the LRU candidate.
        - prepare_store([2, 3, 4, 5]):
            * block 2 is filtered out as "already stored"
            * but without the fix, block 2 would be evicted as the LRU
              candidate to make room for [3, 4, 5]
        - After complete_store([2, 3, 4, 5]), block 2 must still be present.
    """
    block_size = 256
99
100
101
102
103
104
    manager = CPUOffloadingManager(
        block_size=block_size,
        num_blocks=4,
        cache_policy=eviction_policy,
        enable_events=True,
    )
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133

    # store [1, 2] and complete
    manager.prepare_store(to_hashes([1, 2]))
    manager.complete_store(to_hashes([1, 2]))

    # touch [1] to make block 2 the LRU candidate
    manager.touch(to_hashes([1]))

    # prepare_store([2, 3, 4, 5]):
    #   - block 2 is already stored → filtered out of block_hashes_to_store
    #   - block 2 must NOT be evicted even though it is the LRU candidate
    #   - block 1 (ID 0) is evicted instead; new blocks [3,4,5] get IDs 2,3,0
    prepare_store_output = manager.prepare_store(to_hashes([2, 3, 4, 5]))
    verify_store_output(
        prepare_store_output,
        ExpectedPrepareStoreOutput(
            block_hashes_to_store=[3, 4, 5],
            store_block_ids=[2, 3, 0],
            block_hashes_evicted=[1],  # block 1 evicted, not block 2
        ),
    )

    # complete_store must not silently drop block 2
    manager.complete_store(to_hashes([2, 3, 4, 5]))

    # block 2 must still be present in the cache
    assert manager.lookup(to_hashes([2])) == 1


134
135
def test_cpu_manager():
    """
136
    Tests CPUOffloadingManager with lru policy.
137
138
139
    """
    # initialize a CPU backend with a capacity of 4 blocks
    block_size = 256
140
141
142
    cpu_manager = CPUOffloadingManager(
        block_size=block_size, num_blocks=4, cache_policy="lru", enable_events=True
    )
143
144
145
146
147
148
149
150
151

    # prepare store [1, 2]
    prepare_store_output = cpu_manager.prepare_store(to_hashes([1, 2]))
    verify_store_output(
        prepare_store_output,
        ExpectedPrepareStoreOutput(
            block_hashes_to_store=[1, 2],
            store_block_ids=[0, 1],
            block_hashes_evicted=[],
152
153
        ),
    )
154
155
156
157
158
159
160
161
162

    # lookup [1, 2] -> not ready
    assert cpu_manager.lookup(to_hashes([1, 2])) == 0

    # no events so far
    assert list(cpu_manager.take_events()) == []

    # complete store [1, 2]
    cpu_manager.complete_store(to_hashes([1, 2]))
163
164
165
    verify_events(
        cpu_manager.take_events(), block_size=block_size, expected_stores=({1, 2},)
    )
166
167
168
169
170
171
172
173
174
175
176
177
178
179

    # lookup [1, 2]
    assert cpu_manager.lookup(to_hashes([1])) == 1
    assert cpu_manager.lookup(to_hashes([1, 2])) == 2
    assert cpu_manager.lookup(to_hashes([1, 2, 3])) == 2

    # prepare store [2, 3, 4, 5] -> evicts [1]
    prepare_store_output = cpu_manager.prepare_store(to_hashes([2, 3, 4, 5]))
    verify_store_output(
        prepare_store_output,
        ExpectedPrepareStoreOutput(
            block_hashes_to_store=[3, 4, 5],
            store_block_ids=[2, 3, 0],
            block_hashes_evicted=[1],
180
181
        ),
    )
182
183

    # verify eviction event
184
185
186
    verify_events(
        cpu_manager.take_events(), block_size=block_size, expected_evictions=({1},)
    )
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211

    # prepare store with no space
    assert cpu_manager.prepare_store(to_hashes([1, 6])) is None

    # complete store [2, 3, 4, 5]
    cpu_manager.complete_store(to_hashes([2, 3, 4, 5]))

    # prepare load [2, 3]
    prepare_load_output = cpu_manager.prepare_load(to_hashes([2, 3]))
    verify_load_output(prepare_load_output, [1, 2])

    # prepare store with no space ([2, 3] is being loaded)
    assert cpu_manager.prepare_store(to_hashes([6, 7, 8])) is None

    # complete load [2, 3]
    cpu_manager.complete_load(to_hashes([2, 3]))

    # prepare store [6, 7, 8] -> evicts [2, 3, 4] (oldest)
    prepare_store_output = cpu_manager.prepare_store(to_hashes([6, 7, 8]))
    verify_store_output(
        prepare_store_output,
        ExpectedPrepareStoreOutput(
            block_hashes_to_store=[6, 7, 8],
            store_block_ids=[3, 2, 1],
            block_hashes_evicted=[2, 3, 4],
212
213
        ),
    )
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228

    # complete store [6, 7, 8]
    cpu_manager.complete_store(to_hashes([6, 7, 8]))

    # touch [5, 6, 7] (move to end of LRU order)
    cpu_manager.touch(to_hashes([5, 6, 7]))

    # prepare store [7, 9] -> evicts [8] (oldest following previous touch)
    prepare_store_output = cpu_manager.prepare_store(to_hashes([9]))
    verify_store_output(
        prepare_store_output,
        ExpectedPrepareStoreOutput(
            block_hashes_to_store=[9],
            store_block_ids=[1],
            block_hashes_evicted=[8],
229
230
        ),
    )
231
232
233
234
235
236
237
238

    # complete store [7, 9] with failure
    cpu_manager.complete_store(to_hashes([7, 9]), success=False)

    # assert [7] is still stored, but [9] is not
    assert cpu_manager.lookup(to_hashes([7])) == 1
    assert cpu_manager.lookup(to_hashes([9])) == 0

239
240
241
242
243
244
    verify_events(
        cpu_manager.take_events(),
        block_size=block_size,
        expected_stores=({3, 4, 5}, {6, 7, 8}),
        expected_evictions=({2, 3, 4}, {8}),
    )
245
246


247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
class TestARCPolicy:
    """Unit tests for CPUOffloadingManager with ARC eviction policy."""

    def _make_manager(
        self, num_blocks: int = 4, enable_events: bool = True
    ) -> tuple[CPUOffloadingManager, ARCCachePolicy]:
        manager = CPUOffloadingManager(
            block_size=256,
            num_blocks=num_blocks,
            cache_policy="arc",
            enable_events=enable_events,
        )
        policy = manager._policy
        assert isinstance(policy, ARCCachePolicy)
        return manager, policy

    def test_basic(self):
        """
        Tests CPUOffloadingManager with arc policy.
        Verifies that ARC handles store, load, and lookup operations correctly.
        """
        cpu_manager, arc_policy = self._make_manager()

        # prepare store [1, 2]
        prepare_store_output = cpu_manager.prepare_store(to_hashes([1, 2]))
        verify_store_output(
            prepare_store_output,
            ExpectedPrepareStoreOutput(
                block_hashes_to_store=[1, 2],
                store_block_ids=[0, 1],
                block_hashes_evicted=[],
            ),
        )

        # lookup [1, 2] -> not ready
        assert cpu_manager.lookup(to_hashes([1, 2])) == 0

        # no events so far
        assert list(cpu_manager.take_events()) == []

        # complete store [1, 2]
        cpu_manager.complete_store(to_hashes([1, 2]))
        verify_events(
            cpu_manager.take_events(), block_size=256, expected_stores=({1, 2},)
        )

        # lookup [1, 2]
        assert cpu_manager.lookup(to_hashes([1])) == 1
        assert cpu_manager.lookup(to_hashes([1, 2])) == 2
        assert cpu_manager.lookup(to_hashes([1, 2, 3])) == 2

        # blocks should be in T1 (recent)
        assert len(arc_policy.t1) == 2
        assert len(arc_policy.t2) == 0

    def test_t1_to_t2_promotion(self):
        """
        Tests that accessing a block in T1 promotes it to T2 (frequent).
        This is a key feature of ARC's adaptive behavior.
        """
        cpu_manager, arc_policy = self._make_manager(enable_events=False)

        # store and complete block 1
        cpu_manager.prepare_store(to_hashes([1]))
        cpu_manager.complete_store(to_hashes([1]))

        # block 1 starts in T1 (recent)
        assert to_hashes([1])[0] in arc_policy.t1
        assert to_hashes([1])[0] not in arc_policy.t2

        # touch block 1 (simulate second access)
        cpu_manager.touch(to_hashes([1]))

        # block 1 should now be in T2 (frequent)
        assert to_hashes([1])[0] not in arc_policy.t1
        assert to_hashes([1])[0] in arc_policy.t2

    def test_eviction_with_load(self):
        """
        Tests ARC eviction behavior similar to LRU test.
        Verifies that blocks being loaded (ref_cnt > 0) cannot be evicted.
        """
        cpu_manager, _ = self._make_manager()

        # prepare and complete store [1, 2, 3, 4]
        prepare_store_output = cpu_manager.prepare_store(to_hashes([1, 2, 3, 4]))
        verify_store_output(
            prepare_store_output,
            ExpectedPrepareStoreOutput(
                block_hashes_to_store=[1, 2, 3, 4],
                store_block_ids=[0, 1, 2, 3],
                block_hashes_evicted=[],
            ),
        )
        cpu_manager.complete_store(to_hashes([1, 2, 3, 4]))

        # prepare load [2, 3] (increases ref_cnt)
        prepare_load_output = cpu_manager.prepare_load(to_hashes([2, 3]))
        verify_load_output(prepare_load_output, [1, 2])

        # prepare store [5, 6, 7] with [2, 3] being loaded
        # should fail because [2, 3] have ref_cnt > 0
        assert cpu_manager.prepare_store(to_hashes([5, 6, 7])) is None

        # complete load [2, 3]
        cpu_manager.complete_load(to_hashes([2, 3]))

        # now prepare store [5, 6, 7] should succeed
        # ARC will evict blocks one at a time from T1 as needed
        prepare_store_output = cpu_manager.prepare_store(to_hashes([5, 6, 7]))
        assert prepare_store_output is not None
        # Should successfully evict enough blocks to make room (at least 1)
        assert len(prepare_store_output.block_hashes_evicted) >= 1

    def test_adaptive_target(self):
        """
        Tests ARC's adaptive target adjustment via ghost lists.
        When a block in B1 (ghost list) is accessed, target_t1_size increases.
        When a block in B2 is accessed, target_t1_size decreases.
        """
        cpu_manager, arc_policy = self._make_manager(num_blocks=2, enable_events=False)

        # store blocks 1, 2 (fills cache)
        cpu_manager.prepare_store(to_hashes([1, 2]))
        cpu_manager.complete_store(to_hashes([1, 2]))

        initial_target = arc_policy.target_t1_size

        # store block 3, evicting block 1 (moves to B1 ghost list)
        cpu_manager.prepare_store(to_hashes([3]))
        cpu_manager.complete_store(to_hashes([3]))

        # block 1 should be in B1 (ghost list)
        assert to_hashes([1])[0] in arc_policy.b1

        # touch block 1 (cache miss, but in B1)
        # this should increase target_t1_size (favor recency)
        cpu_manager.touch(to_hashes([1]))

        # target should have increased
        assert arc_policy.target_t1_size > initial_target

    def test_t1_t2_eviction_policy(self):
        """
        Tests that ARC evicts from T1 or T2 based on target_t1_size.
        If |T1| >= target_t1_size, evict from T1, otherwise from T2.
        """
        cpu_manager, arc_policy = self._make_manager(enable_events=False)

        # store blocks 1, 2, 3, 4
        cpu_manager.prepare_store(to_hashes([1, 2, 3, 4]))
        cpu_manager.complete_store(to_hashes([1, 2, 3, 4]))

        # promote blocks 3, 4 to T2 by touching them
        cpu_manager.touch(to_hashes([3, 4]))

        # now: T1 = {1, 2}, T2 = {3, 4}
        assert len(arc_policy.t1) == 2
        assert len(arc_policy.t2) == 2

        # set target_t1_size to prefer evicting from T1
        # (when |T1| >= target, evict from T1)
        arc_policy.target_t1_size = 1

        # store block 5, should evict from T1 (block 1, LRU in T1)
        output = cpu_manager.prepare_store(to_hashes([5]))
        assert output is not None
        assert to_hashes([1]) == output.block_hashes_evicted

        cpu_manager.complete_store(to_hashes([5]))

        # block 1 should be in B1 (ghost list)
        assert to_hashes([1])[0] in arc_policy.b1
        # block 5 should be in T1
        assert to_hashes([5])[0] in arc_policy.t1

    def test_ghost_list_bounds(self):
        """
        Tests that ghost lists (B1, B2) don't grow unbounded.
        They should be capped at cache_capacity.
        """
        cpu_manager, arc_policy = self._make_manager(num_blocks=2, enable_events=False)

        # fill cache with blocks 1, 2
        cpu_manager.prepare_store(to_hashes([1, 2]))
        cpu_manager.complete_store(to_hashes([1, 2]))

        # store many blocks to fill ghost lists
        for i in range(3, 20):
            cpu_manager.prepare_store(to_hashes([i]))
            cpu_manager.complete_store(to_hashes([i]))

        # ghost lists should not exceed cache_capacity
        assert len(arc_policy.b1) <= arc_policy.cache_capacity
        assert len(arc_policy.b2) <= arc_policy.cache_capacity

    def test_touch_ordering(self):
        """
        Tests that touch() correctly updates access patterns.
        Similar to LRU test but verifies T1/T2 ordering.
        """
        cpu_manager, arc_policy = self._make_manager()

        # store blocks 1, 2, 3, 4
        cpu_manager.prepare_store(to_hashes([1, 2, 3, 4]))
        cpu_manager.complete_store(to_hashes([1, 2, 3, 4]))

        # promote 3, 4 to T2
        cpu_manager.touch(to_hashes([3, 4]))

        # T1 = {1, 2}, T2 = {3, 4}
        # touch [1, 3, 4] - should promote 1 to T2, and move 3,4 to end of T2
        cpu_manager.touch(to_hashes([1, 3, 4]))

        # T1 = {2}, T2 = {1, 3, 4} (in that order, with 4 most recent)
        assert len(arc_policy.t1) == 1
        assert len(arc_policy.t2) == 3

        # store block 5, should evict from T1 (block 2, only one in T1)
        prepare_store_output = cpu_manager.prepare_store(to_hashes([5]))
        verify_store_output(
            prepare_store_output,
            ExpectedPrepareStoreOutput(
                block_hashes_to_store=[5],
                store_block_ids=[1],  # reuses block 2's storage
                block_hashes_evicted=[2],
            ),
        )

    def test_failed_store(self):
        """
        Tests that failed store operations clean up correctly.
        Similar to LRU test but for ARC.
        """
        cpu_manager, arc_policy = self._make_manager()

        # store blocks 1, 2, 3, 4
        cpu_manager.prepare_store(to_hashes([1, 2, 3, 4]))
        cpu_manager.complete_store(to_hashes([1, 2, 3, 4]))

        # prepare store block 5 (will evict block 1)
        prepare_store_output = cpu_manager.prepare_store(to_hashes([5]))
        assert prepare_store_output is not None
        assert len(prepare_store_output.block_hashes_evicted) == 1

        # complete store with failure
        cpu_manager.complete_store(to_hashes([5]), success=False)

        # block 5 should not be in cache
        assert cpu_manager.lookup(to_hashes([5])) == 0
        # block 5 should not be in T1 or T2
        assert to_hashes([5])[0] not in arc_policy.t1
        assert to_hashes([5])[0] not in arc_policy.t2

        # evicted block should still be gone (in B1 ghost list)
        evicted_hash = prepare_store_output.block_hashes_evicted[0]
        assert evicted_hash in arc_policy.b1

    def test_full_scenario(self):
        """
        Comprehensive test covering multiple ARC operations in sequence.
        Similar to the full LRU test but adapted for ARC behavior.
        """
        cpu_manager, arc_policy = self._make_manager()

        # store [1, 2]
        cpu_manager.prepare_store(to_hashes([1, 2]))
        cpu_manager.complete_store(to_hashes([1, 2]))

        # store [3, 4, 5] -> evicts [1]
        prepare_store_output = cpu_manager.prepare_store(to_hashes([3, 4, 5]))
        assert prepare_store_output is not None
        assert len(prepare_store_output.block_hashes_evicted) == 1
        cpu_manager.complete_store(to_hashes([3, 4, 5]))

        # promote some blocks to T2
        cpu_manager.touch(to_hashes([2, 3]))

        # T1 has {4, 5}, T2 has {2, 3}
        assert len(arc_policy.t1) == 2
        assert len(arc_policy.t2) == 2

        # store [6] -> should evict from T1 (4 is oldest in T1)
        prepare_store_output = cpu_manager.prepare_store(to_hashes([6]))
        assert prepare_store_output is not None
        cpu_manager.complete_store(to_hashes([6]))

        # verify blocks 2, 3 (in T2) are still present
        assert cpu_manager.lookup(to_hashes([2])) == 1
        assert cpu_manager.lookup(to_hashes([3])) == 1

        # verify events
        events = list(cpu_manager.take_events())
        assert len(events) > 0  # should have store and eviction events
541
542
543
544


def test_filter_reused_manager():
    """
545
    Tests FilterReusedOffloadingManager with a CPUOffloadingManager.
546
547
    """
    block_size = 256
548
549
550
    lru_manager = CPUOffloadingManager(
        block_size=block_size, num_blocks=4, cache_policy="lru", enable_events=True
    )
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588

    manager = FilterReusedOffloadingManager(
        backing=lru_manager, store_threshold=2, max_tracker_size=3
    )

    # Lookup [1, 2] -> 1st time, added to tracker but not eligible for store yet
    assert manager.lookup(to_hashes([1, 2])) == 0

    # prepare store [1, 2] -> should be filtered
    prepare_store_output = manager.prepare_store(to_hashes([1, 2]))
    assert prepare_store_output is not None
    assert prepare_store_output.block_hashes_to_store == []

    # Lookup [1] -> 2nd time, eligible now
    assert manager.lookup(to_hashes([1])) == 0

    # prepare store [1, 2] -> [1] should be eligible, [2] should be filtered
    prepare_store_output = manager.prepare_store(to_hashes([1, 2]))
    assert prepare_store_output is not None
    assert prepare_store_output.block_hashes_to_store == to_hashes([1])

    # Lookup [3, 4] -> 1st time
    # (evicts [2] from tracker since max_size is 3 and tracker has [1])
    assert manager.lookup(to_hashes([3, 4])) == 0
    # Verify [2] was evicted from the tracker (tracker now has: [1], [3], [4])
    assert to_hashes([2])[0] not in manager.counts

    # Lookup [2] again -> (this adds [2] back to the tracker as 1st time)
    assert manager.lookup(to_hashes([2])) == 0
    # Verify [2] was re-added with count=1 (not eligible yet)
    assert manager.counts.get(to_hashes([2])[0]) == 1

    # prepare store [2] -> should still be filtered out since count was reset
    prepare_store_output = manager.prepare_store(to_hashes([2]))
    assert prepare_store_output is not None
    assert prepare_store_output.block_hashes_to_store == []

    manager.complete_store(to_hashes([1]))