test_tilelang_language_copy.py 4.83 KB
Newer Older
1
2
3
import tilelang
import tilelang.language as T
import torch
4
import tilelang.testing
5
6


7
8
# add decorator @tilelang.jit if you want to return a torch function
# @tilelang.jit
9
10
11
def tilelang_copy(M, N, block_M, block_N, dtype="float16"):
    @T.prim_func
    def main(
12
13
        A: T.Tensor((M, N), dtype),
        B: T.Tensor((M, N), dtype),
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
    ):
        # Initialize Kernel Context
        with T.Kernel(T.ceildiv(N, block_N), T.ceildiv(M, block_M), threads=128) as (bx, by):
            for i, j in T.Parallel(block_M, block_N):
                B[by * block_M + i, bx * block_N + j] = A[by * block_M + i, bx * block_N + j]

    return main


def run_tilelang_copy(M=1024, N=1024, block_M=128, block_N=128, dtype="float16"):
    program = tilelang_copy(M, N, block_M, block_N, dtype)
    kernel = tilelang.compile(
        program,
        out_idx=[1],
        target="cuda",
29
30
        pass_configs={tilelang.PassConfigKey.TL_DISABLE_WARP_SPECIALIZED: True, tilelang.PassConfigKey.TL_DISABLE_TMA_LOWER: True},
    )
31
32
33
34
35
36
37
38
39
40
41
    a = torch.randn(M, N, device="cuda", dtype=getattr(torch, dtype))
    b = kernel(a)
    torch.testing.assert_close(b, a, rtol=1e-2, atol=1e-2)


def test_tilelang_copy():
    run_tilelang_copy(M=1024, N=1024, block_M=128, block_N=128)
    run_tilelang_copy(M=1024, N=576, block_M=32, block_N=576)
    run_tilelang_copy(M=1024, N=576, block_M=32, block_N=576, dtype="float")


42
43
44
def tilelang_copy_with_stride(M, N, NN, block_M, block_N, dtype="float16"):
    @T.prim_func
    def main(
45
46
        A: T.StridedTensor((M, N), (NN, 1), dtype),
        B: T.Tensor((M, N), dtype),
47
48
49
50
51
52
53
54
55
    ):
        # Initialize Kernel Context
        with T.Kernel(T.ceildiv(N, block_N), T.ceildiv(M, block_M), threads=128) as (bx, by):
            for i, j in T.Parallel(block_M, block_N):
                B[by * block_M + i, bx * block_N + j] = A[by * block_M + i, bx * block_N + j]

    return main


56
def run_tilelang_copy_with_stride(M=1024, N=1024, NN=2048, block_M=128, block_N=128, dtype="float16"):
57
58
59
60
61
62
63
64
65
66
    if isinstance(NN, int):
        assert NN > N, "NN must be greater than N"
    program = tilelang_copy_with_stride(M, N, NN, block_M, block_N, dtype)
    kernel = tilelang.compile(
        program,
        out_idx=[1],
        target="cuda",
        pass_configs={
            tilelang.PassConfigKey.TL_DISABLE_WARP_SPECIALIZED: True,
            tilelang.PassConfigKey.TL_DISABLE_TMA_LOWER: True,
67
68
        },
    )
69
70
71
72
73
74
75
76
77
    if isinstance(NN, T.Var):
        NN = N * 2
    a = torch.randn(M, NN, device="cuda", dtype=getattr(torch, dtype))
    b = kernel(a[:, :N])
    torch.testing.assert_close(b, a[:, :N], rtol=1e-2, atol=1e-2)


def test_tilelang_copy_with_stride():
    run_tilelang_copy_with_stride(M=1024, N=1024, NN=2048, block_M=128, block_N=128)
78
    run_tilelang_copy_with_stride(M=1024, N=1024, NN=T.dynamic("NN"), block_M=128, block_N=128)
79
80


81
82
83
def tilelang_copy_bufferload(num_tokens, dtype="float16"):
    @T.prim_func
    def main(
84
85
        indices: T.Tensor((num_tokens,), "int32"),
        x: T.Tensor((num_tokens,), dtype),
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
    ):
        with T.Kernel(num_tokens, threads=32) as pid:
            idx = T.alloc_local([1], "int32")
            T.copy(indices[pid], idx[0])
            x[idx[0]] = x[idx[0]] + 1

    return main


def run_tilelang_copy_bufferload(num_tokens=128, dtype="float16"):
    program = tilelang_copy_bufferload(num_tokens, dtype)
    # test compilation only
    tilelang.compile(
        program,
        out_idx=[1],
101
102
        pass_configs={tilelang.PassConfigKey.TL_DISABLE_WARP_SPECIALIZED: True, tilelang.PassConfigKey.TL_DISABLE_TMA_LOWER: True},
    )
103
104
105
106
107
108
109
110
111


def test_tilelang_copy_bufferload():
    run_tilelang_copy_bufferload(num_tokens=128)


def tilelang_copy_buffer_load_with_parallel(M, N, block_M, block_N, dtype="float16"):
    @T.prim_func
    def main(
112
113
        A: T.Tensor((M, N), dtype),
        B: T.Tensor((M, N), dtype),
114
115
116
117
118
119
120
121
122
    ):
        # Initialize Kernel Context
        with T.Kernel(T.ceildiv(N, block_N), T.ceildiv(M, block_M), threads=128) as (bx, by):
            for i, j in T.Parallel(block_M, block_N):
                T.copy(A[by * block_M + i, bx * block_N + j], B[by * block_M + i, bx * block_N + j])

    return main


123
def run_tilelang_copy_buffer_load_with_parallel(M=1024, N=1024, block_M=128, block_N=128, dtype="float16"):
124
125
126
127
128
    program = tilelang_copy_buffer_load_with_parallel(M, N, block_M, block_N, dtype)
    kernel = tilelang.compile(
        program,
        out_idx=[1],
        target="cuda",
129
130
        pass_configs={tilelang.PassConfigKey.TL_DISABLE_WARP_SPECIALIZED: True, tilelang.PassConfigKey.TL_DISABLE_TMA_LOWER: True},
    )
131
132
133
134
135
136
137
138
139
    a = torch.randn(M, N, device="cuda", dtype=getattr(torch, dtype))
    b = kernel(a)
    torch.testing.assert_close(b, a, rtol=1e-2, atol=1e-2)


def test_tilelang_copy_buffer_load_with_parallel():
    run_tilelang_copy_buffer_load_with_parallel(M=1024, N=1024, block_M=128, block_N=128)


140
141
if __name__ == "__main__":
    tilelang.testing.main()