test_cpu_gpu.py 6.94 KB
Newer Older
1
2
3
4
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import random
import time
5
import uuid
6
7
8
9

import pytest
import torch

10
from vllm.platforms import current_platform
11
from vllm.utils.torch_utils import set_random_seed
12
from vllm.v1.kv_offload.cpu.shared_offload_region import SharedOffloadRegion
13
from vllm.v1.kv_offload.mediums import CPULoadStoreSpec, GPULoadStoreSpec
14
15
16
17
18
from vllm.v1.kv_offload.spec import (
    CanonicalKVCacheRef,
    CanonicalKVCaches,
    CanonicalKVCacheTensor,
)
19
from vllm.v1.kv_offload.worker.cpu_gpu import CpuGpuOffloadingHandlers
20
21
22

NUM_GPU_BLOCKS = [64]
NUM_CPU_BLOCKS = [256]
23
24
25
GPU_PAGE_SIZES = [512, 1024]
BLOCK_SIZE_FACTORS = [1, 3]
NUM_TENSORS = [4]
26
SEEDS = [0]
27
28
DEVICE_TYPE = current_platform.device_type
DEVICES = [f"{DEVICE_TYPE}:0"]
29
30
31
32
33
NUM_MAPPINGS = [3]


@pytest.mark.parametrize("gpu_to_cpu", [True, False])
@pytest.mark.parametrize("num_mappings", NUM_MAPPINGS)
34
35
@pytest.mark.parametrize("gpu_page_size_bytes", GPU_PAGE_SIZES)
@pytest.mark.parametrize("block_size_factor", BLOCK_SIZE_FACTORS)
36
37
@pytest.mark.parametrize("num_gpu_blocks", NUM_GPU_BLOCKS)
@pytest.mark.parametrize("num_cpu_blocks", NUM_CPU_BLOCKS)
38
@pytest.mark.parametrize("num_tensors", NUM_TENSORS)
39
@pytest.mark.parametrize("seed", SEEDS)
40
@pytest.mark.parametrize("device", DEVICES)
41
@pytest.mark.parametrize("use_shared_memory", [False, True])
42
43
@torch.inference_mode()
def test_transfer(
44
    default_vllm_config,
45
46
    gpu_to_cpu: bool,
    num_mappings: int,
47
48
    gpu_page_size_bytes: int,
    block_size_factor: int,
49
50
    num_gpu_blocks: int,
    num_cpu_blocks: int,
51
    num_tensors: int,
52
53
    seed: int,
    device: str,
54
    use_shared_memory: bool,
55
) -> None:
56
    set_random_seed(seed)
57

58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
    # build CanonicalKVCacheTensor list: one per tensor
    kv_cache_tensors: list[CanonicalKVCacheTensor] = []
    for i in range(num_tensors):
        gpu_tensor = torch.randint(
            -128,
            127,
            (num_gpu_blocks, gpu_page_size_bytes),
            dtype=torch.int8,
            device=device,
        )
        kv_cache_tensors.append(
            CanonicalKVCacheTensor(
                tensor=gpu_tensor,
                page_size_bytes=gpu_page_size_bytes,
            )
73
        )
74

75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
    # one group containing all tensors, one data ref per tensor
    kv_cache_groups_data_refs: list[list[CanonicalKVCacheRef]] = [
        [
            CanonicalKVCacheRef(
                tensor_idx=i,
                page_size_bytes=gpu_page_size_bytes,
            )
            for i in range(num_tensors)
        ]
    ]

    kv_caches = CanonicalKVCaches(
        tensors=kv_cache_tensors,
        group_data_refs=kv_cache_groups_data_refs,
    )
90
91
92
93
94
95
96
97
98
99
100
101
102

    mmap_region: SharedOffloadRegion | None = None
    if use_shared_memory:
        cpu_page_size = gpu_page_size_bytes * num_tensors * block_size_factor
        mmap_region = SharedOffloadRegion(
            instance_id=str(uuid.uuid4()),
            total_size_bytes=num_cpu_blocks * cpu_page_size,
            num_blocks=num_cpu_blocks,
            rank=0,
            num_workers=1,
            cpu_page_size=cpu_page_size,
        )

103
    handlers = CpuGpuOffloadingHandlers(
104
105
        kv_caches=kv_caches,
        block_size_factor=block_size_factor,
106
        num_cpu_blocks=num_cpu_blocks,
107
        mmap_region=mmap_region,
108
    )
109
110

    # select block mappings
111
    gpu_blocks = random.sample(range(num_gpu_blocks), num_mappings * block_size_factor)
112
113
    cpu_blocks = random.sample(range(num_cpu_blocks), num_mappings)

114
115
116
117
118
119
120
121
122
    # expand cpu blocks to gpu-page granularity for uniform comparison:
    # each cpu block maps to block_size_factor consecutive sub-blocks
    cpu_blocks_expanded = [
        cpu_block * block_size_factor + j
        for cpu_block in cpu_blocks
        for j in range(block_size_factor)
    ]

    # maybe skip some GPU blocks to test reading from the middle of a CPU block
123
    if not gpu_to_cpu:
124
125
126
        blocks_to_skip = block_size_factor - 1
        gpu_blocks = gpu_blocks[blocks_to_skip:]
        cpu_blocks_expanded = cpu_blocks_expanded[blocks_to_skip:]
127
128
129

    # set transfer direction
    if gpu_to_cpu:
130
        handler = handlers.gpu_to_cpu_handler
131
132
133
134
        src_spec = GPULoadStoreSpec(gpu_blocks, group_sizes=(len(gpu_blocks),))
        dst_spec = CPULoadStoreSpec(cpu_blocks)
        dst_to_src = dict(zip(cpu_blocks_expanded, gpu_blocks))
        num_dst_sub_blocks = num_cpu_blocks * block_size_factor
135
    else:
136
        handler = handlers.cpu_to_gpu_handler
137
138
139
140
        src_spec = CPULoadStoreSpec(cpu_blocks)
        dst_spec = GPULoadStoreSpec(gpu_blocks, group_sizes=(len(gpu_blocks),))
        dst_to_src = dict(zip(gpu_blocks, cpu_blocks_expanded))
        num_dst_sub_blocks = num_gpu_blocks
141
142

    # clone src and dst tensors before transfer
143
144
    orig_src_tensors = [x.clone() for x in handler.src_tensors]
    orig_dst_tensors = [x.clone() for x in handler.dst_tensors]
145
146

    # call transfer function
147
    start_time = time.time()
148
    assert handler.transfer_async(1, (src_spec, dst_spec))
149
    assert set({x.job_id for x in handler._transfers}) == {1}
150
151
152
153
154
155

    # wait for transfer to complete
    end_time = time.time() + 10
    while time.time() < end_time:
        finished = handler.get_finished()
        if finished:
156
157
            assert finished[0].job_id == 1
            assert finished[0].success
158
159
            assert finished[0].transfer_type == (
                ("GPU", "CPU") if gpu_to_cpu else ("CPU", "GPU")
160
            )
161
162
            assert finished[0].transfer_size == (
                len(gpu_blocks) * handler.group_block_size_in_bytes[0]
163
164
165
            )
            assert finished[0].transfer_time > 0
            assert finished[0].transfer_time < (time.time() - start_time)
166
167
168
169
            break
        time.sleep(0.1)

    # verify src tensors did not change
170
    for orig_tensor, tensor in zip(orig_src_tensors, handler.src_tensors):
171
172
        assert torch.equal(orig_tensor, tensor)

173
174
175
176
177
178
179
    # verify dst tensors at gpu-page granularity.
    for src_tensor, dst_tensor, orig_dst_tensor in zip(
        handler.src_tensors,
        handler.dst_tensors,
        orig_dst_tensors,
    ):
        # view both GPU and CPU tensors as (n, gpu_page_size_bytes) for comparison.
180
181
182
        src_view = src_tensor.reshape(-1, gpu_page_size_bytes)
        dst_view = dst_tensor.reshape(-1, gpu_page_size_bytes)
        orig_dst_view = orig_dst_tensor.reshape(-1, gpu_page_size_bytes)
183
184
185
186
        for dst_sub_block in range(num_dst_sub_blocks):
            src_sub_block = dst_to_src.get(dst_sub_block)
            if src_sub_block is not None:
                expected = src_view[src_sub_block]
187
            else:
188
189
                expected = orig_dst_view[dst_sub_block]
            torch.testing.assert_close(dst_view[dst_sub_block].cpu(), expected.cpu())
190
191
192
193
194
195
196
197
198

    # Drop loop-variable refs so mmap_obj has no exported buffers at cleanup.
    del orig_tensor, tensor, src_tensor, dst_tensor, orig_dst_tensor
    del src_view, dst_view, orig_dst_view, expected

    handlers.cpu_to_gpu_handler.shutdown()
    handlers.gpu_to_cpu_handler.shutdown()
    if mmap_region:
        mmap_region.cleanup()