test_compressed_tensors.py 29.4 KB
Newer Older
1
# SPDX-License-Identifier: Apache-2.0
2
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
3
"""Test model set-up and weight loading for llmcompressor-quantized models.
4
5
6

Run `pytest tests/quantization/test_compressed_tensors.py`.
"""
7

8
import pytest
9
import torch
10
import os
11

12
from compressed_tensors.quantization import QuantizationType
13

14
from tests.models.utils import check_logprobs_close
15
from vllm.model_executor.layers.fused_moe import UnquantizedFusedMoEMethod
16
from vllm.model_executor.layers.quantization.compressed_tensors.compressed_tensors import (  # noqa: E501
17
18
19
20
21
22
23
24
25
26
27
    CompressedTensors24,
    CompressedTensorsLinearMethod,
    CompressedTensorsW4A4Fp4,
    CompressedTensorsW4A8Fp8,
    CompressedTensorsW4A16Fp4,
    CompressedTensorsW4A16Sparse24,
    CompressedTensorsW8A8Fp8,
    CompressedTensorsW8A8Int8,
    CompressedTensorsW8A16Fp8,
    CompressedTensorsWNA16,
)
28
from vllm.model_executor.layers.quantization.input_quant_fp8 import QuantFP8
29
from vllm.model_executor.layers.quantization.utils.fp8_utils import W8A8BlockFp8LinearOp
30
from vllm.model_executor.layers.quantization.utils.quant_utils import (
31
32
    cutlass_fp4_supported,
)
33
from vllm.model_executor.layers.quantization.utils.w8a8_utils import (
34
35
    sparse_cutlass_supported,
)
zhuwenwen's avatar
zhuwenwen committed
36
from vllm.platforms import current_platform
37
from ..utils import models_path_prefix
38

39
40
41
42
43
# AITER only supports per-channel-per-channel INT8 gemm
# and per-tensor-per-tensor INT8 GEMM.
# It does not support mix precision MM and mix quantization scheme.
ROCM_AITER_SUPPORTED_INT8_MODEL = [
    "neuralmagic/Llama-3.2-1B-quantized.w8a8",
44
    "nm-testing/tinyllama-oneshot-w8a8-channel-dynamic-token-v2",
45
46
47
48
49
50
51
52
53
54
55
]

# TritonScaledMMLinearKernel only supports symmetric quantization.
ROCM_TRITON_SCALED_MM_SUPPORTED_INT8_MODEL = [
    "nm-testing/tinyllama-oneshot-w8w8-test-static-shape-change",
    "nm-testing/tinyllama-oneshot-w8-channel-a8-tensor",
    "neuralmagic/Llama-3.2-1B-quantized.w8a8",
    "nm-testing/tinyllama-oneshot-w8a8-dynamic-token-v2",
    "nm-testing/tinyllama-oneshot-w8a8-channel-dynamic-token-v2",
]

56

57
@pytest.fixture(scope="function", autouse=True)
58
59
60
def enable_pickle(monkeypatch):
    """`LLM.apply_model` requires pickling a function."""
    monkeypatch.setenv("VLLM_ALLOW_INSECURE_SERIALIZATION", "1")
61
62


63
64
@pytest.mark.parametrize(
    "model_args",
65
66
    [
        (
zhuwenwen's avatar
zhuwenwen committed
67
            os.path.join(models_path_prefix, "nm-testing/tinyllama-oneshot-w8w8-test-static-shape-change"),
68
69
70
71
72
73
            "tensor",
            QuantizationType.INT,
            2560,
            True,
        ),
        (
zhuwenwen's avatar
zhuwenwen committed
74
            os.path.join(models_path_prefix, "nm-testing/asym-w8w8-int8-static-per-tensor-tiny-llama"),
75
76
77
78
79
80
81
            "tensor",
            QuantizationType.INT,
            2560,
            False,
        ),
    ],
)
82
def test_compressed_tensors_w8a8_static_setup(vllm_runner, model_args):
83
    model_path, strategy, quant_type, shape_0, is_symmetric = model_args
84

85
86
87
88
    if (
        current_platform.is_rocm()
        and model_path not in ROCM_TRITON_SCALED_MM_SUPPORTED_INT8_MODEL
    ):
89
        pytest.skip(f"Skip model {model_path} as it is not supported on ROCm.")
90

91
    with vllm_runner(model_path, enforce_eager=True) as llm:
92
93
94
95
96
97
98
99
100
101

        def check_model(model):
            layer = model.model.layers[0]

            qkv_proj = layer.self_attn.qkv_proj
            o_proj = layer.self_attn.o_proj
            gate_up_proj = layer.mlp.gate_up_proj
            down_proj = layer.mlp.down_proj

            # assert zp for symmetric and asymmetric cases
102
            def zp_valid(zp: torch.Tensor | None):
103
104
105
106
107
108
109
110
111
112
                if is_symmetric:
                    return zp is None

                return zp is not None and zp.dtype is torch.int32

            assert zp_valid(qkv_proj.input_zero_point)
            assert zp_valid(o_proj.input_zero_point)
            assert zp_valid(gate_up_proj.input_zero_point)
            assert zp_valid(down_proj.input_zero_point)

113
114
115
116
            assert isinstance(qkv_proj.quant_method, CompressedTensorsLinearMethod)
            assert isinstance(o_proj.quant_method, CompressedTensorsLinearMethod)
            assert isinstance(gate_up_proj.quant_method, CompressedTensorsLinearMethod)
            assert isinstance(down_proj.quant_method, CompressedTensorsLinearMethod)
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
            assert isinstance(qkv_proj.scheme, CompressedTensorsW8A8Int8)

            assert qkv_proj.scheme.strategy == strategy
            assert qkv_proj.scheme.is_static_input_scheme
            expected_type = torch.int8

            assert qkv_proj.weight.dtype is expected_type
            assert o_proj.weight.dtype is expected_type
            assert gate_up_proj.weight.dtype is expected_type

            if qkv_proj.scheme.strategy == "tensor":
                # Make sure it is a channelwise buffer
                # After running process_weights_after_loading
                assert len(qkv_proj.weight_scale.shape) == 2
                assert qkv_proj.weight_scale.shape[0] == shape_0
                assert qkv_proj.weight_scale.shape[1] == 1
            assert qkv_proj.weight_scale.dtype is torch.float32
            assert qkv_proj.input_scale.dtype is torch.float32

        llm.apply_model(check_model)
137

138
        output = llm.generate_greedy(["Hello my name is"], max_tokens=4)
139
140
        assert output

141

142
143
144
@pytest.mark.parametrize(
    "model_path",
    [
zhuwenwen's avatar
zhuwenwen committed
145
        os.path.join(models_path_prefix, "neuralmagic/Llama-3.2-1B-quantized.w8a8"),
146
147
    ],
)
148
@pytest.mark.parametrize("max_tokens", [4])
149
@pytest.mark.parametrize("num_logprobs", [10])
150
@pytest.mark.parametrize(
151
152
    "use_aiter", [True, False] if current_platform.is_rocm() else [False]
)
153
154
155
156
157
158
159
def test_compressed_tensors_w8a8_logprobs(
    hf_runner,
    vllm_runner,
    example_prompts,
    model_path,
    max_tokens,
    num_logprobs,
160
161
    use_aiter,
    monkeypatch,
162
):
163
164
165
166
    if (
        current_platform.is_rocm()
        and model_path not in ROCM_TRITON_SCALED_MM_SUPPORTED_INT8_MODEL
    ):
167
        pytest.skip(f"Skip model {model_path} as it is not supported on ROCm.")
168
169
170

    if use_aiter:
        if model_path not in ROCM_AITER_SUPPORTED_INT8_MODEL:
171
            pytest.skip(f"Skip model {model_path} as it is not support by aiter.")
172
173
174
        # this will enable VLLM_ROCM_USE_AITER_LINEAR
        monkeypatch.setenv("VLLM_ROCM_USE_AITER", "1")

175
176
    dtype = "bfloat16"

177
178
    # skip language translation prompt for the static per tensor models
    if model_path in (
179
180
        os.path.join(models_path_prefix, "nm-testing/Meta-Llama-3-8B-Instruct-W8A8-Static-Per-Tensor-Sym"),
        os.path.join(models_path_prefix, "nm-testing/Meta-Llama-3-8B-Instruct-W8A8-Static-Per-Tensor-Asym"),
181
    ):
182
183
        example_prompts = example_prompts[0:-1]

184
185
    with hf_runner(model_path, dtype=dtype) as hf_model:
        hf_outputs = hf_model.generate_greedy_logprobs_limit(
186
187
            example_prompts, max_tokens, num_logprobs
        )
188

189
    with vllm_runner(model_path, dtype=dtype, enforce_eager=True) as vllm_model:
190
        vllm_outputs = vllm_model.generate_greedy_logprobs(
191
192
            example_prompts, max_tokens, num_logprobs
        )
193
194
195
196
197
198
199
200

    check_logprobs_close(
        outputs_0_lst=hf_outputs,
        outputs_1_lst=vllm_outputs,
        name_0="hf",
        name_1="vllm",
    )

201
202
203
    if current_platform.is_rocm():
        torch.cuda.synchronize()

204

205
def test_compressed_tensors_no_enforce_eager(vllm_runner):
206
    model_path = os.path.join(models_path_prefix, "nm-testing/tinyllama-oneshot-w8w8-test-static-shape-change")
207
    with vllm_runner(model_path) as llm:
208
        output = llm.generate_greedy("Hello my name is", max_tokens=4)
209
210
211
        assert output


212
213
214
@pytest.mark.parametrize(
    "model_args",
    [
zhuwenwen's avatar
zhuwenwen committed
215
        (os.path.join(models_path_prefix, "nm-testing/tinyllama-oneshot-w8a8-dynamic-token-v2"), "tensor"),
216
        (
zhuwenwen's avatar
zhuwenwen committed
217
            os.path.join(models_path_prefix, "nm-testing/tinyllama-oneshot-w8a8-channel-dynamic-token-v2"),
218
219
220
221
            "channel",
        ),
    ],
)
222
@pytest.mark.parametrize(
223
224
    "use_aiter", [True, False] if current_platform.is_rocm() else [False]
)
225
226
227
228
229
230
def test_compressed_tensors_w8a8_dynamic_per_token(
    vllm_runner,
    model_args,
    use_aiter,
    monkeypatch,
):
231
    model_path, strategy = model_args
232

233
234
235
236
    if (
        current_platform.is_rocm()
        and model_path not in ROCM_TRITON_SCALED_MM_SUPPORTED_INT8_MODEL
    ):
237
        pytest.skip(f"Skip model {model_path} as it is not supported on ROCm.")
238
239
240

    if use_aiter:
        if model_path not in ROCM_AITER_SUPPORTED_INT8_MODEL:
241
            pytest.skip(f"Skip model {model_path} as it is not support by aiter.")
242
243
244
        # this will enable VLLM_ROCM_USE_AITER_LINEAR
        monkeypatch.setenv("VLLM_ROCM_USE_AITER", "1")

245
    with vllm_runner(model_path, enforce_eager=True, dtype=torch.float16) as llm:
246

247
248
        def check_model(model):
            layer = model.model.layers[0]
249

250
251
            qkv_proj = layer.self_attn.qkv_proj

252
            assert isinstance(qkv_proj.quant_method, CompressedTensorsLinearMethod)
253
254
255
256
            assert isinstance(qkv_proj.scheme, CompressedTensorsW8A8Int8)
            assert not qkv_proj.scheme.is_static_input_scheme
            assert qkv_proj.scheme.strategy == strategy
            assert qkv_proj.weight.dtype is torch.int8
257

258
        llm.apply_model(check_model)
259

260
        output = llm.generate_greedy(["Hello my name is"], max_tokens=4)
261
262
        assert output

263

264
@pytest.mark.skipif(current_platform.is_rocm(),
265
                    reason="WNA16 is not supported on ROCm.")
266
267
@pytest.mark.parametrize(
    "wNa16_args",
268
269
    [
        (
270
            os.path.join(models_path_prefix, "nm-testing/tinyllama-oneshot-w4a16-channel-v2"),
271
272
273
274
275
276
277
            "channel",
            None,
            8,
            True,
            False,
        ),
        (
278
            os.path.join(models_path_prefix, "nm-testing/TinyLlama-1.1B-Chat-v1.0-W4A16-G128-Asym-Updated-ActOrder"),
279
280
281
282
283
284
285
286
287
288
            "group",
            128,
            8,
            False,
            True,
        ),
    ],
)
@pytest.mark.skipif(
    not current_platform.is_cuda(), reason="The tests are skipped on non-CUDA platform."
289
)
290
def test_compressed_tensors_wNa16(vllm_runner, wNa16_args):
291
    model, strategy, group, pack_factor, symmetric, has_g_idx = wNa16_args
292
    with vllm_runner(model, enforce_eager=True) as llm:
293

294
295
        def check_model(model):
            layer = model.model.layers[0]
296

297
            qkv_proj = layer.self_attn.qkv_proj
298
            assert isinstance(qkv_proj.quant_method, CompressedTensorsLinearMethod)
299
            assert isinstance(qkv_proj.scheme, CompressedTensorsWNA16)
300

301
            assert qkv_proj.scheme.strategy == strategy
302
            assert qkv_proj.scheme.group_size == (-1 if group is None else group)
303

304
            assert qkv_proj.scheme.pack_factor == pack_factor
305
306
            assert qkv_proj.scheme.symmetric == symmetric
            assert qkv_proj.scheme.has_g_idx == has_g_idx
307
308

        llm.apply_model(check_model)
309

310
        output = llm.generate_greedy("Hello my name is", max_tokens=4)
311
312
        assert output

313

314
315
316
@pytest.mark.skipif(
    not current_platform.is_cuda(), reason="This test is skipped on non-CUDA platform."
)
317
def test_compressed_tensors_w4a16_marlin24(vllm_runner):
318
    model_path = os.path.join(models_path_prefix, "nm-testing/llama7b-one-shot-2_4-w4a16-marlin24-t")
319
    with vllm_runner(model_path, enforce_eager=True) as llm:
320

321
322
323
324
        def check_model(model):
            layer = model.model.layers[0]

            qkv_proj = layer.self_attn.qkv_proj
325

326
            assert isinstance(qkv_proj.quant_method, CompressedTensorsLinearMethod)
327
328
            assert isinstance(qkv_proj.scheme, CompressedTensorsW4A16Sparse24)
            assert qkv_proj.weight_packed.dtype is torch.int32
329

330
        llm.apply_model(check_model)
331

332
        output = llm.generate_greedy("Hello my name is", max_tokens=4)
333
        assert output
334
335


336
@pytest.mark.skipif(current_platform.is_rocm(),
337
                    reason="FP8 is not supported on ROCm.")
338
def test_compressed_tensors_fp8(vllm_runner):
339
    model_path = os.path.join(models_path_prefix, "nm-testing/Meta-Llama-3-8B-FP8-compressed-tensors-test")
340
    with vllm_runner(model_path, enforce_eager=True) as llm:
341

342
343
        def check_model(model):
            layer = model.model.layers[0]
344

345
            qkv_proj = layer.self_attn.qkv_proj
346

347
            assert isinstance(qkv_proj.quant_method, CompressedTensorsLinearMethod)
348
349
            assert isinstance(
                qkv_proj.scheme,
350
351
                (CompressedTensorsW8A8Fp8, CompressedTensorsW8A16Fp8),
            )
352

353
354
355
356
            assert qkv_proj.input_scale.dtype is torch.float32

            if isinstance(qkv_proj.scheme, CompressedTensorsW8A8Fp8):
                assert len(qkv_proj.input_scale.shape) == 0
357
                assert qkv_proj.weight.dtype is current_platform.fp8_dtype()
358
359
360
361
                assert qkv_proj.weight_scale.dtype is torch.float32
                assert len(qkv_proj.weight_scale.shape) == 0

        llm.apply_model(check_model)
362

363
        output = llm.generate_greedy("Hello my name is", max_tokens=4)
364
        assert output
365
366


367
@pytest.mark.skipif(
368
369
    not current_platform.is_cuda(), reason="This test is skipped on non-CUDA platform."
)
370
def test_compressed_tensors_kv_cache(vllm_runner):
371
    model_path = os.path.join(models_path_prefix, "nm-testing/TinyLlama-1.1B-compressed-tensors-kv-cache-scheme")
372
373
    with vllm_runner(model_path, enforce_eager=True, kv_cache_dtype="fp8") as llm:
        output = llm.generate_greedy("Hello world!", max_tokens=4)
374
        assert output
375
376


377
378
379
380
@pytest.mark.skipif(
    not sparse_cutlass_supported(),
    reason="Sparse FP8 is not yet supported on this GPU type.",
)
381
def _test_2of4_quant_models(qkv_proj, weight_strategy, input_strategy, format="dense"):
382
383
384
385
386
387
388
389
    assert isinstance(qkv_proj.quant_method, CompressedTensorsLinearMethod)
    assert isinstance(qkv_proj.scheme, CompressedTensors24)

    assert qkv_proj.scheme.weight_quant.strategy == weight_strategy
    assert qkv_proj.scheme.input_quant.strategy == input_strategy
    assert qkv_proj.scheme.quantized
    assert qkv_proj.quant_method.quantization_config.sparsity_scheme_map
    sparsity_map = qkv_proj.quant_method.quantization_config.sparsity_scheme_map  # noqa: E501
390
    assert sparsity_map.get("Linear").format == format
391
392
393
    assert sparsity_map.get("Linear").sparsity_structure == "2:4"


394
@pytest.mark.skipif(
395
    not current_platform.is_cuda() or not current_platform.has_device_capability(90),
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
    reason="Sparse FP8 is not yet supported on this GPU type.",
)
@pytest.mark.parametrize(
    "args_2of4",
    [
        (
            "nm-testing/Meta-Llama-3-8B-Instruct-FP8-Dynamic-2of4-testing",
            "channel",
            "token",
        ),
        (
            "nm-testing/Meta-Llama-3-8B-Instruct-FP8-Static-Per-Tensor-testing",
            "channel",
            "tensor",
        ),
        (
            "nm-testing/Meta-Llama-3-8B-Instruct-FP8-Static-testing",
            "tensor",
            "tensor",
        ),
        (
            "nm-testing/Meta-Llama-3-8B-Instruct-FP8-Dynamic-IA-Per-Tensor-Weight-testing",
            "tensor",
            "token",
        ),
    ],
)
423
424
def test_compressed_tensors_2of4_quant_fp8(vllm_runner, args_2of4):
    model, weight_strategy, input_strategy = args_2of4
425
    with vllm_runner(model, enforce_eager=True) as llm:
426

427
428
429
430
431
432
433
434
        def check_model(model):
            layer = model.model.layers[0]

            qkv_proj = layer.self_attn.qkv_proj
            assert qkv_proj.scheme.weights_dtype == torch.float8_e4m3fn
            _test_2of4_quant_models(qkv_proj, weight_strategy, input_strategy)

        llm.apply_model(check_model)
435

436
        output = llm.generate_greedy("Hello my name is", max_tokens=4)
437
438
439
440
        print(output)
        assert output


441
@pytest.mark.skipif(
442
    not current_platform.is_cuda() or not current_platform.has_device_capability(90),
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
    reason="Sparse FP8 is not yet supported on this GPU type.",
)
@pytest.mark.parametrize(
    "args_2of4",
    [
        (
            "nm-testing/TinyLlama-1.1B-Chat-v1.0-gsm8k-pruned.2of4-chnl_wts_per_tok_dyn_act_fp8-BitM",
            "channel",
            "token",
        ),
        (
            "nm-testing/TinyLlama-1.1B-Chat-v1.0-gsm8k-pruned.2of4-chnl_wts_tensor_act_fp8-BitM",
            "channel",
            "tensor",
        ),
        (
            "nm-testing/TinyLlama-1.1B-Chat-v1.0-gsm8k-pruned.2of4-tensor_wts_per_tok_dyn_act_fp8-BitM",
            "tensor",
            "token",
        ),
        (
            "nm-testing/TinyLlama-1.1B-Chat-v1.0-gsm8k-pruned.2of4-tensor_wts_tensor_act_fp8-BitM",
            "tensor",
            "tensor",
        ),
    ],
)
def test_compressed_tensors_2of4_quant_fp8_compressed(vllm_runner, args_2of4):
    model, weight_strategy, input_strategy = args_2of4
472
    with vllm_runner(model, enforce_eager=True) as llm:
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487

        def check_model(model):
            layer = model.model.layers[0]

            qkv_proj = layer.self_attn.qkv_proj
            assert qkv_proj.scheme.weights_dtype == torch.float8_e4m3fn
            _test_2of4_quant_models(
                qkv_proj,
                weight_strategy,
                input_strategy,
                format="sparse-24-bitmask",
            )

        llm.apply_model(check_model)

488
        output = llm.generate_greedy("Hello my name is", max_tokens=4)
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
        print(output)
        assert output


@pytest.mark.skipif(
    not sparse_cutlass_supported(),
    reason="cutlass is not yet supported on this GPU type.",
)
@pytest.mark.parametrize(
    "args_2of4",
    [
        (
            "nm-testing/TinyLlama-1.1B-Chat-v1.0-gsm8k-pruned.2of4-chnl_wts_per_tok_dyn_act_int8-BitM",
            "channel",
            "token",
        ),
        (
            "nm-testing/TinyLlama-1.1B-Chat-v1.0-gsm8k-pruned.2of4-chnl_wts_tensor_act_int8-BitM",
            "channel",
            "tensor",
        ),
        (
            "nm-testing/TinyLlama-1.1B-Chat-v1.0-gsm8k-pruned.2of4-tensor_wts_per_tok_dyn_act_int8-BitM",
            "tensor",
            "token",
        ),
        (
            "nm-testing/TinyLlama-1.1B-Chat-v1.0-gsm8k-pruned.2of4-tensor_wts_tensor_act_int8-BitM",
            "tensor",
            "tensor",
        ),
    ],
)
def test_compressed_tensors_2of4_quant_int8_compressed(vllm_runner, args_2of4):
    model, weight_strategy, input_strategy = args_2of4
524
    with vllm_runner(model, enforce_eager=True) as llm:
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539

        def check_model(model):
            layer = model.model.layers[0]

            qkv_proj = layer.self_attn.qkv_proj
            assert qkv_proj.scheme.weights_dtype == torch.int8
            _test_2of4_quant_models(
                qkv_proj,
                weight_strategy,
                input_strategy,
                format="sparse-24-bitmask",
            )

        llm.apply_model(check_model)

540
        output = llm.generate_greedy("Hello my name is", max_tokens=4)
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
        print(output)
        assert output


@pytest.mark.skipif(
    not sparse_cutlass_supported(),
    reason="Sparse FP8 is not yet supported on this GPU type.",
)
@pytest.mark.parametrize(
    "args_2of4",
    [
        (
            "nm-testing/TinyLlama-1.1B-Chat-v1.0-INT8-Dynamic-IA-Per-Channel-Weight-testing",
            "channel",
            "token",
        ),
        (
            "nm-testing/TinyLlama-1.1B-Chat-v1.0-INT8-Static-testing",
            "tensor",
            "tensor",
        ),
        (
            "nm-testing/TinyLlama-1.1B-Chat-v1.0-INT8-Dynamic-IA-Per-Tensor-Weight-testing",
            "tensor",
            "token",
        ),
    ],
)
569
570
def test_compressed_tensors_2of4_quant_int8(vllm_runner, args_2of4):
    model, weight_strategy, input_strategy = args_2of4
571
    with vllm_runner(model, enforce_eager=True) as llm:
572

573
574
575
576
577
578
579
580
        def check_model(model):
            layer = model.model.layers[0]

            qkv_proj = layer.self_attn.qkv_proj
            assert qkv_proj.scheme.weights_dtype == torch.int8
            _test_2of4_quant_models(qkv_proj, weight_strategy, input_strategy)

        llm.apply_model(check_model)
581

582
        output = llm.generate_greedy("Hello my name is", max_tokens=4)
583
584
585
586
        print(output)
        assert output


587
588
@pytest.mark.skipif(
    not sparse_cutlass_supported(),
589
590
    reason="2of4 Sparse is not yet supported on this GPU type.",
)
591
592
@pytest.mark.parametrize(
    "args_2of4",
593
594
    [("nm-testing/TinyLlama-1.1B-Chat-v1.0-2of4-Sparse-Dense-Compressor")],
)
595
596
def test_compressed_tensors_2of4_sparse(vllm_runner, args_2of4):
    model = args_2of4
597
    with vllm_runner(model, enforce_eager=True) as llm:
598
599
600
601
602

        def check_model(model):
            layer = model.model.layers[0]

            qkv_proj = layer.self_attn.qkv_proj
603
            assert isinstance(qkv_proj.quant_method, CompressedTensorsLinearMethod)
604
605
606
607
608
609
            assert isinstance(qkv_proj.scheme, CompressedTensors24)

            assert qkv_proj.scheme.weight_quant is None
            assert qkv_proj.scheme.input_quant is None
            assert not qkv_proj.scheme.quantized
            assert qkv_proj.quant_method.quantization_config.sparsity_scheme_map
610
            sparsity_map = qkv_proj.quant_method.quantization_config.sparsity_scheme_map  # noqa: E501
611
612
613
614
            assert sparsity_map.get("Linear").format == "dense"
            assert sparsity_map.get("Linear").sparsity_structure == "2:4"

        llm.apply_model(check_model)
615

616
        output = llm.generate_greedy("Hello my name is", max_tokens=4)
617
618
        print(output)
        assert output
619
620
621
622
623
624
625


@pytest.mark.skipif(
    not sparse_cutlass_supported(),
    reason="Cutlass is not yet supported on this GPU type.",
)
@pytest.mark.parametrize(
626
627
    "args_2of4", [("nm-testing/llama2.c-stories42M-pruned2.4-compressed")]
)
628
629
def test_compressed_tensors_2of4_sparse_compressed(vllm_runner, args_2of4):
    model = args_2of4
630
    with vllm_runner(model, enforce_eager=True) as llm:
631
632
633
634
635

        def check_model(model):
            layer = model.model.layers[0]

            qkv_proj = layer.self_attn.qkv_proj
636
            assert isinstance(qkv_proj.quant_method, CompressedTensorsLinearMethod)
637
638
639
640
641
642
            assert isinstance(qkv_proj.scheme, CompressedTensors24)

            assert qkv_proj.scheme.weight_quant is None
            assert qkv_proj.scheme.input_quant is None
            assert not qkv_proj.scheme.quantized
            assert qkv_proj.quant_method.quantization_config.sparsity_scheme_map
643
            sparsity_map = qkv_proj.quant_method.quantization_config.sparsity_scheme_map  # noqa: E501
644
645
646
647
648
            assert sparsity_map.get("Linear").format == "sparse-24-bitmask"
            assert sparsity_map.get("Linear").sparsity_structure == "2:4"

        llm.apply_model(check_model)

649
        output = llm.generate_greedy("Hello my name is", max_tokens=4)
650
651
        print(output)
        assert output
652
653


654
655
656
@pytest.mark.skipif(
    not current_platform.is_cuda(), reason="This test is skipped on non-CUDA platform."
)
657
658
@pytest.mark.parametrize(
    "args",
659
    [
660
661
        # TODO: Enable once model is available again
        # ("nm-testing/TinyLlama-1.1B-Chat-v1.0-NVFP4A16", CompressedTensorsW4A16Fp4),
662
        (os.path.join(models_path_prefix, "nm-testing/TinyLlama-1.1B-Chat-v1.0-NVFP4"), CompressedTensorsW4A4Fp4),
663
664
    ],
)
665
666
def test_compressed_tensors_nvfp4(vllm_runner, args):
    model, scheme = args
667
668
    with vllm_runner(model, enforce_eager=True) as llm:

669
670
671
672
673
674
675
676
677
# @pytest.mark.parametrize(
#     "args",
#     [("nm-testing/TinyLlama-1.1B-Chat-v1.0-NVFP4A16",
#       CompressedTensorsW4A16Fp4),
#      ("nm-testing/TinyLlama-1.1B-Chat-v1.0-NVFP4", CompressedTensorsW4A4Fp4)])
# def test_compressed_tensors_nvfp4(vllm_runner, args):
#     model, scheme = args
#     with vllm_runner(model, enforce_eager=True) as llm:

678
            qkv_proj = layer.self_attn.qkv_proj
679
680
681
682
683
684
            assert isinstance(qkv_proj.quant_method, CompressedTensorsLinearMethod)
            if (
                isinstance(qkv_proj.scheme, scheme)
                or isinstance(qkv_proj.scheme, CompressedTensorsW4A16Fp4)
                and not cutlass_fp4_supported()
            ):
685
686
687
                assert True
            else:
                raise AssertionError("FP4 Scheme Mismatch")
688
689
690
691
692
693
694
695
696
697
698

#             qkv_proj = layer.self_attn.qkv_proj
#             assert isinstance(qkv_proj.quant_method,
#                               CompressedTensorsLinearMethod)
#             if isinstance(qkv_proj.scheme, scheme) or isinstance(
#                     qkv_proj.scheme,
#                     CompressedTensorsW4A16Fp4) and not cutlass_fp4_supported():
#                 assert True
#             else:
#                 raise AssertionError("FP4 Scheme Mismatch")

699
        llm.apply_model(check_model)
700
        output = llm.generate_greedy("Hello my name is", max_tokens=4)
701
702
        print(output)
        assert output
703
704
705


@pytest.mark.skipif(
706
    not current_platform.is_cuda() or not current_platform.has_device_capability(90),
707
708
    reason="W4A8 FP8 is not yet supported on this GPU type.",
)
709
710
711
712
@pytest.mark.parametrize(
    "args",
    [("czhu-cohere/TinyLlama-1.1B-Chat-v1.0-W4A8-e2e", CompressedTensorsW4A8Fp8)],
)
713
714
715
716
717
718
719
720
721
722
723
724
725
def test_compressed_tensors_w4a8_fp8(vllm_runner, args):
    model, scheme = args
    with vllm_runner(model, enforce_eager=True) as llm:

        def check_model(model):
            layer = model.model.layers[0]

            qkv_proj = layer.self_attn.qkv_proj
            o_proj = layer.self_attn.o_proj
            gate_up_proj = layer.mlp.gate_up_proj
            down_proj = layer.mlp.down_proj

            for proj in (qkv_proj, o_proj, gate_up_proj, down_proj):
726
                assert isinstance(proj.quant_method, CompressedTensorsLinearMethod)
727
728
729
730
731
732
733
734
                assert isinstance(proj.scheme, scheme)

                assert proj.weight_packed.dtype is torch.int32
                assert proj.weight_scale.dtype is torch.float8_e4m3fn
                assert proj.weight_chan_scale.dtype is torch.float32
                assert proj.scheme.group_size == 128

        llm.apply_model(check_model)
735
        output = llm.generate_greedy("Hello my name is", max_tokens=4)
736
737
        print(output)
        assert output
738
739


740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
@pytest.mark.skipif(
    not current_platform.is_cuda(), reason="This test is skipped on non-CUDA platform."
)
@pytest.mark.parametrize(
    "model,prompt,exp_perplexity",
    [
        (
            "nm-testing/Llama-3.2-1B-Instruct-spinquantR1R2R4-w4a16",
            "Flat is better than nested.\nSparse is better than dense.",
            150.0,
        ),
        (
            "nm-testing/Llama-3.2-1B-Instruct-quip-w4a16",
            "Flat is better than nested.\nSparse is better than dense.",
            150.0,
        ),
    ],
)
def test_compressed_tensors_transforms_perplexity(
    vllm_runner, model, prompt, exp_perplexity
):
761
762
763
    with vllm_runner(model, enforce_eager=True) as llm:
        perplexity = llm.generate_prompt_perplexity([prompt])[0]
        print(perplexity)
764
        assert perplexity <= exp_perplexity
765
766
767
768


def test_compressed_tensors_fp8_block_enabled(vllm_runner):
    model_path = "RedHatAI/Qwen3-0.6B-FP8-BLOCK"
769
    with vllm_runner(model_path, enforce_eager=True) as llm:
770
771
772
773
774
775
        fp8_dtype = current_platform.fp8_dtype()

        def check_model(model):
            layer = model.model.layers[0]

            qkv_proj = layer.self_attn.qkv_proj
776
            assert isinstance(qkv_proj.quant_method, CompressedTensorsLinearMethod)
777
            assert isinstance(qkv_proj.scheme, CompressedTensorsW8A8Fp8)
778
779
780
            assert isinstance(
                qkv_proj.scheme.w8a8_block_fp8_linear, W8A8BlockFp8LinearOp
            )
781
782
783
784
785
786

            assert qkv_proj.weight.dtype is fp8_dtype
            assert qkv_proj.weight_scale.dtype is torch.float32
            assert len(qkv_proj.weight.shape) == 2
            assert len(qkv_proj.weight_scale.shape) == 2

787
            input_quant_op = qkv_proj.scheme.w8a8_block_fp8_linear.input_quant_op
788
            assert isinstance(input_quant_op, QuantFP8)
789
790
791
792
            assert input_quant_op._forward_method in (
                input_quant_op.forward_cuda,
                input_quant_op.forward_hip,
            )
793
794
795

        llm.apply_model(check_model)

796
        output = llm.generate_greedy("Hello my name is", max_tokens=4)
797
        assert output
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844


@pytest.mark.skipif(
    not current_platform.is_cuda(),
    reason="This test is not for non-CUDA platforms",
)
def test_compressed_tensors_moe_ignore_with_model(vllm_runner):
    """
    Integration test for MoE layer ignore functionality with a real model.

    This test would verify that when loading a compressed-tensors quantized
    MoE model where some MoE layers are in the ignore list, those layers
    use UnquantizedFusedMoEMethod while non-ignored layers use the
    quantized method.

    Expected model structure:
    - Compressed-tensors quantized MoE model (e.g., Mixtral-based)
    - Config with ignore list containing specific MoE layers
    - Multiple MoE layers where some are quantized and some are not
    """

    # model_path = "nm-testing/tinysmokeqwen3moe-W4A16-first-only" # CT 12.3
    model_path = "nm-testing/tinysmokeqwen3moe-W4A16-first-only-CTstable"  # CT 12.2

    with vllm_runner(model_path, enforce_eager=True) as llm:

        def check_model(model):
            from vllm.model_executor.layers.fused_moe import FusedMoE
            from vllm.model_executor.layers.quantization.compressed_tensors.compressed_tensors_moe import (  # noqa: E501
                CompressedTensorsMoEMethod,
            )

            # Check layer 0 MoE (should be quantized)
            layer_quantized = model.model.layers[0].mlp.experts
            assert isinstance(layer_quantized, FusedMoE)
            assert isinstance(layer_quantized.quant_method, CompressedTensorsMoEMethod)

            # Check layer 10 MoE (should be unquantized + ignored)
            layer_unquantized = model.model.layers[3].mlp.experts
            assert isinstance(layer_unquantized, FusedMoE)
            assert isinstance(layer_unquantized.quant_method, UnquantizedFusedMoEMethod)

        llm.apply_model(check_model)

        # Verify the model can generate output
        output = llm.generate_greedy("Hello, my name is", max_tokens=4)
        assert output