test_multi_tensor.py 10.1 KB
Newer Older
1
# Copyright (c) 2022-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2
3
4
5
6
7
#
# See LICENSE for license information.

import pytest
import torch

8
import transformer_engine.pytorch
9
import transformer_engine_torch as tex
10
from transformer_engine.pytorch import is_mxfp8_available
11
12
from transformer_engine.pytorch.optimizers import MultiTensorApply

13
from references.quantize_scale_calc import scale_from_amax_tensor
14
15


16
17
18
19
20
21
22
23
24
25
26
input_size_pairs = [
    (7777 * 77, 555 * 555),
    (777, 555),
    (555, 2048 * 32 + 1),
    (2048 * 32 + 1, 555),
    (555, 2048 * 32),
    (2048 * 32, 555),
    (33333, 555),
    (555, 33333),
]
appliers = [MultiTensorApply(2048 * 32), MultiTensorApply(333), MultiTensorApply(33333)]
27
mxfp8_available, reason_for_no_mxfp8 = is_mxfp8_available(return_reason=True)
28
29
30
31
32
33
34
35


@pytest.mark.parametrize("input_size_pair", input_size_pairs)
@pytest.mark.parametrize("applier", appliers)
@pytest.mark.parametrize("repeat", [1, 55])
@pytest.mark.parametrize("in_type", [torch.float32, torch.float16, torch.bfloat16])
@pytest.mark.parametrize("out_type", [torch.float32, torch.float16, torch.bfloat16])
@pytest.mark.parametrize("inplace", [False, True])
36
def test_multi_tensor_scale(input_size_pair, applier, repeat, in_type, out_type, inplace):
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
    if inplace is True and (out_type is not in_type):
        pytest.skip("inplace=True and out_type != in_type is not supported.")
    elif (in_type == torch.float16 and out_type == torch.bfloat16) or (
        in_type == torch.bfloat16 and out_type == torch.float16
    ):
        pytest.skip("float16 to bfloat16 is not necessary and vice versa.")

    device = torch.device("cuda")
    scale = 4.0
    overflow_buf = torch.zeros(1, dtype=torch.int32, device=device)
    ref = torch.tensor([1.0], dtype=torch.float32, device=device)
    sizea, sizeb = input_size_pair

    def downscale(sizea, sizeb, applier, repeat, in_type, out_type, inplace=False):
        overflow_buf.zero_()
        a = torch.full([sizea], scale, dtype=torch.float32, device=device)
        b = torch.full([sizeb], scale, dtype=torch.float32, device=device)

        out_list = []
        for i in range(repeat):
            out_list += [a.clone().to(out_type), b.clone().to(out_type)]

        if inplace:
            in_list = out_list
        else:
            in_list = [out.clone().to(in_type) for out in out_list]

        applier(tex.multi_tensor_scale, overflow_buf, [in_list, out_list], 1.0 / scale)

        assert all([torch.allclose(out, ref.to(out_type)) for out in out_list])
        assert overflow_buf.item() == 0

    def find_inf(
        sizea,
        sizeb,
        applier,
        repeat,
        in_type,
        out_type,
        t,
        ind,
        val,
        inplace=False,
    ):
        overflow_buf.zero_()
        a = torch.full([sizea], scale, dtype=torch.float32, device=device)
        b = torch.full([sizeb], scale, dtype=torch.float32, device=device)

        out_list = []
        for i in range(repeat):
            out_list += [a.clone().to(out_type), b.clone().to(out_type)]

        if inplace:
            in_list = out_list
        else:
            in_list = [out.clone().to(in_type) for out in out_list]

        applier(tex.multi_tensor_scale, overflow_buf, [in_list, out_list], 1.0 / scale)

        overflow_buf.zero_()
        in_list[t][ind] = val
        applier(tex.multi_tensor_scale, overflow_buf, [in_list, out_list], 1.0 / scale)
        assert overflow_buf.item() > 0

    downscale(sizea, sizeb, applier, repeat, in_type, out_type, inplace=inplace)
    find_inf(
        sizea,
        sizeb,
        applier,
        repeat,
        in_type,
        out_type,
        0,
        0,
        float("nan"),
        inplace=inplace,
    )
    find_inf(
        sizea,
        sizeb,
        applier,
        repeat,
        in_type,
        out_type,
        2 * repeat - 1,
        sizeb - 1,
        float("inf"),
        inplace=inplace,
    )
    find_inf(
        sizea,
        sizeb,
        applier,
        repeat,
        in_type,
        out_type,
        2 * (repeat // 2),
        sizea // 2,
        float("inf"),
        inplace=inplace,
    )


@pytest.mark.parametrize("input_size_pair", input_size_pairs)
@pytest.mark.parametrize("applier", appliers)
@pytest.mark.parametrize("repeat", [1, 55])
@pytest.mark.parametrize("in_type", [torch.float32, torch.float16, torch.bfloat16])
@pytest.mark.parametrize("per_tensor", [False, True])
def test_multi_tensor_l2norm(input_size_pair, applier, repeat, in_type, per_tensor):
    sizea, sizeb = input_size_pair
    device = torch.device("cuda")
    val = 4.0
    overflow_buf = torch.zeros(1, dtype=torch.int32, device=device)

    overflow_buf.zero_()
    a = torch.full([sizea], val, dtype=torch.float32, device=device)
    b = torch.full([sizeb], val, dtype=torch.float32, device=device)

    in_list = []
    for i in range(repeat):
        in_list += [a.clone().to(in_type), b.clone().to(in_type)]

    if per_tensor:
160
        norm, norm_per_tensor = applier(tex.multi_tensor_l2norm, overflow_buf, [in_list], True)
161
162
163
        normab = torch.cat((a.norm().view(1), b.norm().view(1)))
        norm_per_tensor = norm_per_tensor.view(-1, 2)
    else:
164
        norm, _ = applier(tex.multi_tensor_l2norm, overflow_buf, [in_list], False)
165
166
167
168
169
170
171

    reference = torch.full(
        [(sizea + sizeb) * repeat], val, dtype=torch.float32, device=device
    ).norm()

    torch.testing.assert_close(norm, reference.broadcast_to(norm.shape))
    if per_tensor:
172
        torch.testing.assert_close(norm_per_tensor, normab.broadcast_to(norm_per_tensor.shape))
173
174
175
176
177
178
179
180
    assert overflow_buf.item() == 0


@pytest.mark.parametrize("input_size_pair", input_size_pairs)
@pytest.mark.parametrize("applier", appliers)
@pytest.mark.parametrize("repeat", [1, 55])
@pytest.mark.parametrize("in_type", [torch.float32, torch.float16, torch.bfloat16])
@pytest.mark.parametrize("per_tensor", [False, True])
181
def test_multi_tensor_unscale_l2norm(input_size_pair, applier, repeat, in_type, per_tensor):
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
    sizea, sizeb = input_size_pair
    device = torch.device("cuda")
    val = 4.0
    inv_scale = 0.5
    inv_scale_cuda = torch.tensor([inv_scale], dtype=torch.float32, device=device)
    overflow_buf = torch.zeros(1, dtype=torch.int32, device=device)

    overflow_buf.zero_()
    a = torch.full([sizea], val, dtype=torch.float32, device=device)
    b = torch.full([sizeb], val, dtype=torch.float32, device=device)

    in_list = []
    for i in range(repeat):
        in_list += [a.clone().to(in_type), b.clone().to(in_type)]

    if per_tensor:
        norm, norm_per_tensor = applier(
            tex.multi_tensor_unscale_l2norm,
            overflow_buf,
            [in_list],
            inv_scale_cuda,
            True,
        )
205
        normab = torch.cat(((a * inv_scale).norm().view(1), (b * inv_scale).norm().view(1)))
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
        norm_per_tensor = norm_per_tensor.view(-1, 2)
    else:
        norm, _ = applier(
            tex.multi_tensor_unscale_l2norm,
            overflow_buf,
            [in_list],
            inv_scale_cuda,
            True,
        )

    reference = torch.full(
        [(sizea + sizeb) * repeat], val * inv_scale, dtype=torch.float32, device=device
    ).norm()

    torch.testing.assert_close(norm, reference.broadcast_to(norm.shape))
    if per_tensor:
222
        torch.testing.assert_close(norm_per_tensor, normab.broadcast_to(norm_per_tensor.shape))
223
    assert overflow_buf.item() == 0
224
225
226
227
228


@pytest.mark.parametrize("input_size_pair", input_size_pairs + [(1, 1)])
@pytest.mark.parametrize("applier", appliers)
@pytest.mark.parametrize("repeat", [1, 55])
229
@pytest.mark.parametrize("fp8_dtype", [torch.float8_e4m3fn, torch.float8_e5m2])
230
231
232
@pytest.mark.parametrize("pow_2_scales", [False, True])
@pytest.mark.parametrize("epsilon", [0.0, 100.0])
def test_multi_tensor_compute_scale_and_scale_inv(
233
    input_size_pair, applier, repeat, fp8_dtype, pow_2_scales, epsilon
234
235
236
237
238
239
):
    sizea, sizeb = input_size_pair
    device = torch.device("cuda")
    overflow_buf = torch.zeros(1, dtype=torch.int32, device=device)
    a = torch.randn([sizea], dtype=torch.float32, device=device).abs()
    b = torch.randn([sizeb], dtype=torch.float32, device=device).abs()
240
    max_fp8 = torch.finfo(fp8_dtype).max
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258

    amax_list = []
    for i in range(repeat):
        amax_list += [a.clone(), b.clone()]

    scale_list = [torch.empty_like(x) for x in amax_list]
    scale_inv_list = [torch.empty_like(x) for x in amax_list]

    applier(
        tex.multi_tensor_compute_scale_and_scale_inv,
        overflow_buf,
        [amax_list, scale_list, scale_inv_list],
        max_fp8,
        pow_2_scales,
        epsilon,
    )

    for amax, scale, scale_inv in zip(amax_list, scale_list, scale_inv_list):
259
260
        scale_ref, scale_inv_ref, _ = scale_from_amax_tensor(
            torch.float32, amax, fp8_dtype, eps=epsilon, pow_2_scales=pow_2_scales
261
262
263
        )
        torch.testing.assert_close(scale, scale_ref, rtol=0, atol=0)
        torch.testing.assert_close(scale_inv, scale_inv_ref, rtol=0, atol=0)
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295


@pytest.mark.skipif(not mxfp8_available, reason=reason_for_no_mxfp8)
@pytest.mark.parametrize("input_size_pair", input_size_pairs + [(1, 1)])
@pytest.mark.parametrize("applier", appliers)
@pytest.mark.parametrize("repeat", [1, 55])
def test_multi_tensor_compute_scale_inv_e8m0(input_size_pair, applier, repeat):
    sizea, sizeb = input_size_pair
    device = torch.device("cuda")
    a = torch.randn([sizea], dtype=torch.bfloat16, device=device).abs()
    b = torch.randn([sizeb], dtype=torch.bfloat16, device=device).abs()

    amax_list = []
    for _ in range(repeat):
        amax_list += [a.clone(), b.clone()]
    scale_inv_list = [torch.empty_like(x).to(torch.uint8) for x in amax_list]

    applier(
        tex.multi_tensor_compute_scale_inv_e8m0,
        None,  # overflow_buf
        [amax_list, scale_inv_list],
    )

    max_fp8 = torch.finfo(torch.float8_e4m3fn).max
    for amax, scale_inv in zip(amax_list, scale_inv_list):
        scale_inv_u32 = (amax.float() / max_fp8).view(torch.int)
        exponent = scale_inv_u32 // 2**23
        mantissa = scale_inv_u32 & 0x7FFFFF
        exponent += (
            ((mantissa > 0) & (exponent != 0xFE)) & ~((exponent == 0) & (mantissa <= 0x400000))
        ).to(torch.int)
        torch.testing.assert_close(exponent.to(torch.uint8), scale_inv)