kernels.py 1.79 KB
Newer Older
Woosuk Kwon's avatar
Woosuk Kwon committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import random

import torch

from cacheflow.ops import reshape_and_cache


def test_reshape_and_cache(
    num_tokens: int,
    num_heads: int,
    head_size: int,
    block_size: int,
    num_blocks: int,
    dtype: torch.dtype,
) -> None:
    num_slots = block_size * num_blocks
    slot_mapping = random.sample(range(num_slots), num_tokens)
    slot_mapping = torch.tensor(slot_mapping, dtype=torch.int, device='cuda')

    kv_shape = (num_tokens, num_heads, head_size)
    key = torch.randn(size=kv_shape, dtype=dtype, device='cuda')
    value = torch.randn(size=kv_shape, dtype=dtype, device='cuda')
    
    x = 16 // torch.tensor([], dtype=dtype).element_size()
    key_cache_shape = (num_blocks, num_heads, head_size // x, block_size, x)
    key_cache = torch.randn(size=key_cache_shape, dtype=dtype, device='cuda')
    cloned_key_cache = key_cache.clone()

    value_cache_shape = (num_blocks, num_heads, block_size, head_size)
    value_cache = torch.randn(
        size=value_cache_shape, dtype=dtype, device='cuda')
    cloned_value_cache = value_cache.clone()

    reshape_and_cache(key, value, key_cache, value_cache, slot_mapping)

    for i in range(num_tokens):
        reshaped_key = key.reshape(num_tokens, num_heads, head_size // x, x)
        block_idx = slot_mapping[i] // block_size
        block_offset = slot_mapping[i] % block_size
        cloned_key_cache[block_idx, :, :, block_offset, :] = reshaped_key[i]
        cloned_value_cache[block_idx, :, block_offset, :] = value[i]

    assert torch.allclose(key_cache, cloned_key_cache)
    assert torch.allclose(value_cache, cloned_value_cache)


@torch.no_grad()
def test_kernels():
    test_reshape_and_cache(
        num_tokens=3, num_heads=2, head_size=16, block_size=2, num_blocks=2,
        dtype=torch.half)


if __name__ == '__main__':
    test_kernels()