test_tilelang_language_clamp.py 2.93 KB
Newer Older
1
import tilelang.testing
2
from tilelang import language as T
3

4

5
def clamp_within_bounds(
6
7
8
9
10
11
12
13
14
15
    N,
    block_N,
    dtype,
    min_val=None,
    max_val=None,
):
    import tilelang.language as T

    @T.prim_func
    def main(
16
17
        A: T.Tensor((N,), dtype),
        B: T.Tensor((N,), dtype),
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
    ):
        with T.Kernel(T.ceildiv(N, block_N), threads=block_N) as bx:
            A_shared = T.alloc_shared([block_N], dtype)
            T.copy(A[bx * block_N], A_shared)
            for i in T.Parallel(block_N):
                A_shared[i] = T.clamp(A_shared[i], min_val=min_val, max_val=max_val)
            T.copy(A_shared, B[bx * block_N])

    return main


def run_clamp(
    N,
    block_N,
    dtype,
    min=None,
    max=None,
):
36
    program = clamp_within_bounds(N, block_N, dtype, min, max)
37

38
39
    kernel = tilelang.compile(program, out_idx=[1])
    profiler = kernel.get_profiler()
40
41
42
43
44
45
46
47
48
49

    def ref_program(A):
        import torch

        output = torch.clamp(A, min, max)
        return output

    profiler.assert_allclose(ref_program, atol=1e-2, rtol=1e-2)


50
def clamp_value_range(
51
52
53
54
55
56
57
58
    N,
    block_N,
    dtype,
):
    import tilelang.language as T

    @T.prim_func
    def main(
59
60
        A: T.Tensor((1, N), dtype),
        B: T.Tensor((1, N), dtype),
61
62
63
64
    ):
        with T.Kernel(T.ceildiv(N, block_N), threads=block_N) as bx:
            # A_shared = T.alloc_shared([1, block_N], dtype=dtype)
            A_frag = T.alloc_fragment([1, block_N], dtype=dtype)
65
66
            min_frag = T.alloc_fragment([1], dtype=dtype)
            max_frag = T.alloc_fragment([1], dtype=dtype)
67
68
69
70
71
72
73
74
75
76
77
78
            T.copy(A[0, bx * block_N], A_frag)
            T.reduce_min(A_frag, min_frag, dim=1)
            T.reduce_max(A_frag, max_frag, dim=1)
            for i in T.Parallel(block_N):
                # A_frag[0, i] = T.max(A_frag[0, i], min_frag[0] * 0.5)
                # A_frag[0, i] = T.min(A_frag[0, i], max_frag[0] * 0.5)
                A_frag[0, i] = T.clamp(A_frag[0, i], min_frag[0] * 0.5, max_frag[0] * 0.5)
            T.copy(A_frag, B[0, bx * block_N])

    return main


79
def run_clamp_value_range(
80
81
82
83
    N,
    block_N,
    dtype,
):
84
    program = clamp_value_range(
85
86
87
88
        N,
        block_N,
        dtype,
    )
89
    kernel = tilelang.compile(program, out_idx=[1])
90

91
92
93
    import torch

    # Convert string dtype to torch.dtype
94
    torch_dtype = dtype.as_torch()
95

96
97
98
99
100
101
    def ref_program(A):
        min_val = torch.min(A) * 0.5
        max_val = torch.max(A) * 0.5
        output = torch.clamp(A, min_val, max_val)
        return output

102
103
104
105
    A = torch.randint(-5, 5, (1, N)).cuda().to(dtype=torch_dtype)
    B = kernel(A)
    ref_b = ref_program(A)
    torch.testing.assert_close(B, ref_b)
106
107
108
109


def test_clamp():
    # clamp tests for float16 and float32
110
111
112
113
    run_clamp(1024, 128, T.float16, -0.05, 0.05)
    run_clamp(1024, 128, T.float32, -0.06, 0.05)
    run_clamp_value_range(1024, 128, T.float16)
    run_clamp_value_range(1024, 128, T.float32)
114
115
116
117


if __name__ == "__main__":
    tilelang.testing.main()