test_cpu_gpu.py 6.43 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import random
import time

import pytest
import torch

from vllm.platforms import current_platform
from vllm.v1.attention.backends.flash_attn import FlashAttentionBackend
from vllm.v1.attention.backends.flashinfer import FlashInferBackend
from vllm.v1.attention.backends.mla.flashattn_mla import FlashAttnMLABackend
from vllm.v1.kv_offload.mediums import CPULoadStoreSpec, GPULoadStoreSpec
from vllm.v1.kv_offload.worker.cpu_gpu import CpuGpuOffloadingHandler

NUM_GPU_BLOCKS = [64]
NUM_CPU_BLOCKS = [256]
GPU_BLOCK_SIZES = [16]
GPU_BLOCKS_PER_CPU_BLOCK = [1, 3]
HEAD_SIZES = [64]
NUM_HEADS = [8]
NUM_LAYERS = [4]
DTYPES = [torch.bfloat16]
SEEDS = [0]
25
CUDA_DEVICES = ["cuda:0"]
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
NUM_MAPPINGS = [3]


@pytest.mark.parametrize("gpu_to_cpu", [True, False])
@pytest.mark.parametrize("num_mappings", NUM_MAPPINGS)
@pytest.mark.parametrize("head_size", HEAD_SIZES)
@pytest.mark.parametrize("num_heads", NUM_HEADS)
@pytest.mark.parametrize("gpu_block_size", GPU_BLOCK_SIZES)
@pytest.mark.parametrize("gpu_blocks_per_cpu_block", GPU_BLOCKS_PER_CPU_BLOCK)
@pytest.mark.parametrize("num_gpu_blocks", NUM_GPU_BLOCKS)
@pytest.mark.parametrize("num_cpu_blocks", NUM_CPU_BLOCKS)
@pytest.mark.parametrize("num_layers", NUM_LAYERS)
@pytest.mark.parametrize("dtype", DTYPES)
@pytest.mark.parametrize("seed", SEEDS)
@pytest.mark.parametrize("device", CUDA_DEVICES)
@torch.inference_mode()
def test_transfer(
    gpu_to_cpu: bool,
    num_mappings: int,
    head_size: int,
    num_heads: int,
    gpu_block_size: int,
    gpu_blocks_per_cpu_block: int,
    num_gpu_blocks: int,
    num_cpu_blocks: int,
    num_layers: int,
    dtype: torch.dtype,
    seed: int,
    device: str,
) -> None:
    current_platform.seed_everything(seed)

    # create per-layer GPU KV caches
59
    attn_backends_list = [FlashAttentionBackend, FlashInferBackend, FlashAttnMLABackend]
60
61
62
63

    gpu_caches = {}
    attn_backends = {}
    for i in range(num_layers):
64
        layer_name = f"layer {i}"
65
66
67
68
69

        attn_backend = attn_backends_list[i % len(attn_backends_list)]
        attn_backends[layer_name] = attn_backend

        gpu_cache_shape = attn_backend.get_kv_cache_shape(
70
71
72
            num_gpu_blocks, gpu_block_size, num_heads, head_size
        )
        gpu_caches[layer_name] = torch.rand(gpu_cache_shape, dtype=dtype, device=device)
73
74
75

    # create handler
    cpu_block_size = gpu_blocks_per_cpu_block * gpu_block_size
76
77
78
79
80
81
82
    handler = CpuGpuOffloadingHandler(
        attn_backends=attn_backends,
        gpu_block_size=gpu_block_size,
        cpu_block_size=cpu_block_size,
        num_cpu_blocks=num_cpu_blocks,
        gpu_caches=gpu_caches,
    )
83
84

    # select block mappings
85
86
87
    gpu_blocks = random.sample(
        range(num_gpu_blocks), num_mappings * gpu_blocks_per_cpu_block
    )
88
89
90
91
92
93
94
95
96
97
98
    cpu_blocks = random.sample(range(num_cpu_blocks), num_mappings)

    # convert cpu blocks to gpu block size
    cpu_blocks_in_gpu_block_size = []
    for cpu_block in cpu_blocks:
        base_block_id = cpu_block * gpu_blocks_per_cpu_block
        for i in range(gpu_blocks_per_cpu_block):
            cpu_blocks_in_gpu_block_size.append(i + base_block_id)

    # maybe skip a GPU block to test writing to the middle of a CPU block
    if gpu_to_cpu:
99
        gpu_blocks = gpu_blocks[gpu_blocks_per_cpu_block - 1 :]
100
        cpu_blocks_in_gpu_block_size = cpu_blocks_in_gpu_block_size[
101
102
            gpu_blocks_per_cpu_block - 1 :
        ]
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127

    # set transfer direction
    if gpu_to_cpu:
        src_kv_caches = handler.gpu_tensors
        dst_kv_caches = handler.cpu_tensors
        src_spec_class = GPULoadStoreSpec
        dst_spec_class = CPULoadStoreSpec
        src_blocks = gpu_blocks
        dst_blocks = cpu_blocks
        src_blocks_in_gpu_block_size = gpu_blocks
        dst_blocks_in_gpu_block_size = cpu_blocks_in_gpu_block_size
        dst_size_in_gpu_blocks = num_cpu_blocks * gpu_blocks_per_cpu_block
    else:
        src_kv_caches = handler.cpu_tensors
        dst_kv_caches = handler.gpu_tensors
        src_spec_class = CPULoadStoreSpec
        dst_spec_class = GPULoadStoreSpec
        src_blocks = cpu_blocks
        dst_blocks = gpu_blocks
        src_blocks_in_gpu_block_size = cpu_blocks_in_gpu_block_size
        dst_blocks_in_gpu_block_size = gpu_blocks
        dst_size_in_gpu_blocks = num_gpu_blocks

    # build dst -> src mapping
    dst_to_src = {}
128
129
130
    for src_block, dst_block in zip(
        src_blocks_in_gpu_block_size, dst_blocks_in_gpu_block_size
    ):
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
        dst_to_src[dst_block] = src_block

    # build transfer specs
    src_spec = src_spec_class(src_blocks)
    dst_spec = dst_spec_class(dst_blocks)

    # clone src and dst tensors before transfer
    orig_src_caches = [x.clone() for x in src_kv_caches]
    orig_dst_caches = [x.clone() for x in dst_kv_caches]

    # call transfer function
    assert handler.transfer_async(1, (src_spec, dst_spec))
    assert set(handler.transfer_events.keys()) == {1}

    # wait for transfer to complete
    end_time = time.time() + 10
    while time.time() < end_time:
        finished = handler.get_finished()
        if finished:
            assert finished == [(1, True)]
            break
        time.sleep(0.1)

    # verify src tensors did not change
    for orig_tensor, tensor in zip(orig_src_caches, src_kv_caches):
        assert torch.equal(orig_tensor, tensor)

    # verify dst tensors
    for dst_block in range(dst_size_in_gpu_blocks):
        src_block_candidate = dst_to_src.get(dst_block)
        for src_cache, dst_cache, orig_dst_cache, kv_dim in zip(
162
163
164
165
166
            src_kv_caches,
            dst_kv_caches,
            orig_dst_caches,
            handler.kv_dim_before_num_blocks,
        ):
167
168
169
170
171
172
173
            if kv_dim:
                # iterate over key, value
                for i in range(2):
                    if src_block_candidate is not None:
                        expected_value = src_cache[i][src_block_candidate]
                    else:
                        expected_value = orig_dst_cache[i][dst_block]
174
175
176
                    torch.testing.assert_close(
                        dst_cache[i][dst_block].cpu(), expected_value.cpu()
                    )
177
178
179
180
181
            else:
                if src_block_candidate is not None:
                    expected_value = src_cache[src_block_candidate]
                else:
                    expected_value = orig_dst_cache[dst_block]
182
183
184
                torch.testing.assert_close(
                    dst_cache[dst_block].cpu(), expected_value.cpu()
                )