test_sequence_parallel.py 10.2 KB
Newer Older
1
# SPDX-License-Identifier: Apache-2.0
2
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
3
4
5
6
7
8
9
"""
WARNING: This test runs in both single-node (4 GPUs) and multi-node
 (2 node with 2 GPUs each) modes. If the test only uses 2 GPUs, it is
 important to set the distributed backend to "mp" to avoid Ray scheduling
 all workers in a node other than the head node, which can cause the test
 to fail.
"""
10

11
12
13
import json
import os
from dataclasses import dataclass
14
from typing import Literal, NamedTuple
15
16
17

import pytest

18
from vllm.config.compilation import CompilationMode
19
from vllm.config.model import RunnerOption
20
from vllm.logger import init_logger
21
from vllm.platforms import current_platform
22
from vllm.utils.torch_utils import is_torch_equal_or_newer
23
24
25
26
27
28
29
30
31
32
33

from ..models.registry import HF_EXAMPLE_MODELS
from ..utils import compare_two_settings, create_new_process_for_each_test

logger = init_logger("test_sequence_parallel")

VLLM_MULTI_NODE = os.getenv("VLLM_MULTI_NODE", "0") == "1"


class ParallelSetup(NamedTuple):
    tp_size: int
34
    pp_size: int
35
    enable_fusion: bool
36
37
38
39
40
41
    eager_mode: bool
    chunked_prefill: bool


class SPTestOptions(NamedTuple):
    multi_node_only: bool
42
    load_format: str | None = None
43
44
45
46
47
48


@dataclass
class SPTestSettings:
    parallel_setups: list[ParallelSetup]
    distributed_backends: list[str]
49
    runner: RunnerOption
50
51
52
53
54
55
    test_options: SPTestOptions

    @staticmethod
    def detailed(
        *,
        tp_base: int = 2,
56
        pp_base: int = 1,
57
        multi_node_only: bool = False,
58
        runner: RunnerOption = "auto",
59
        load_format: str | None = None,
60
    ):
61
62
63
64
65
        parallel_setups = []
        for eager_mode_val in [False, True]:
            for pp_multiplier in [1, 2]:
                for chunked_prefill_val in [False, True]:
                    parallel_setups.append(
66
67
68
69
70
71
72
73
                        ParallelSetup(
                            tp_size=tp_base,
                            pp_size=pp_multiplier * pp_base,
                            enable_fusion=False,
                            eager_mode=eager_mode_val,
                            chunked_prefill=chunked_prefill_val,
                        )
                    )
74
        return SPTestSettings(
75
            parallel_setups=parallel_setups,
76
            distributed_backends=["mp", "ray"],
77
            runner=runner,
78
79
80
            test_options=SPTestOptions(
                multi_node_only=multi_node_only, load_format=load_format
            ),
81
82
83
84
85
86
        )

    @staticmethod
    def fast(
        *,
        tp_base: int = 2,
87
        pp_base: int = 1,
88
        runner: RunnerOption = "auto",
89
        multi_node_only: bool = False,
90
        load_format: str | None = None,
91
    ):
92
93
94
95
96
        parallel_setups = []
        for eager_mode_val in [False, True]:
            for pp_multiplier in [1, 2]:
                for chunked_prefill_val in [False, True]:
                    parallel_setups.append(
97
98
99
100
101
102
103
104
                        ParallelSetup(
                            tp_size=tp_base,
                            pp_size=pp_multiplier * pp_base,
                            enable_fusion=False,
                            eager_mode=eager_mode_val,
                            chunked_prefill=chunked_prefill_val,
                        )
                    )
105
        return SPTestSettings(
106
107
            parallel_setups=parallel_setups,
            distributed_backends=["mp", "ray"],
108
            runner=runner,
109
110
111
            test_options=SPTestOptions(
                multi_node_only=multi_node_only, load_format=load_format
            ),
112
113
114
115
116
117
118
        )

    @staticmethod
    def fp8_quant(
        *,
        tp_base: int = 2,
        pp_base: int = 1,
119
        runner: RunnerOption = "auto",
120
        multi_node_only: bool = False,
121
        load_format: str | None = None,
122
123
124
125
    ):
        parallel_setups = []
        for fusion_val in [False, True]:
            parallel_setups.append(
126
127
128
129
130
131
132
133
                ParallelSetup(
                    tp_size=tp_base,
                    pp_size=pp_base,
                    enable_fusion=fusion_val,
                    eager_mode=True,
                    chunked_prefill=False,
                )
            )
134
135
        return SPTestSettings(
            parallel_setups=parallel_setups,
136
            distributed_backends=["mp", "ray"],
137
            runner=runner,
138
139
140
            test_options=SPTestOptions(
                multi_node_only=multi_node_only, load_format=load_format
            ),
141
142
143
144
145
146
        )

    def iter_params(self, model_id: str):
        opts = self.test_options

        for parallel_setup in self.parallel_setups:
147
            for backend in self.distributed_backends:
148
149
150
151
152
153
154
                yield (
                    model_id,
                    parallel_setup,
                    backend,
                    self.runner,
                    opts,
                )
155
156
157
158
159
160


def _compare_sp(
    model_id: str,
    parallel_setup: ParallelSetup,
    distributed_backend: str,
161
    runner: RunnerOption,
162
163
    test_options: SPTestOptions,
    num_gpus_available: int,
164
    use_inductor_graph_partition: bool,
165
    enable_async_tp: bool,
166
167
168
169
170
171
    *,
    method: Literal["generate", "encode"],
    is_multimodal: bool,
):
    (
        tp_size,
172
        pp_size,
173
        enable_fusion,
174
175
176
177
178
179
180
181
182
183
184
185
        eager_mode,
        chunked_prefill,
    ) = parallel_setup

    multi_node_only, load_format = test_options

    model_info = HF_EXAMPLE_MODELS.find_hf_info(model_id)
    model_info.check_transformers_version(on_fail="skip")

    trust_remote_code = model_info.trust_remote_code
    tokenizer_mode = model_info.tokenizer_mode
    hf_overrides = model_info.hf_overrides
186
    require_embed_inputs = model_info.require_embed_inputs
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207

    if load_format == "dummy":
        # Avoid OOM
        text_overrides = {
            "num_hidden_layers": 4,
            "hidden_size": 512,
            "intermediate_size": 800,
            "num_attention_heads": 4,
            "num_key_value_heads": 1,
        }

        if is_multimodal:
            hf_overrides.update({"text_config": text_overrides})
        else:
            hf_overrides.update(text_overrides)
    else:
        model_info.check_available_online(on_fail="skip")

    if num_gpus_available < tp_size * pp_size:
        pytest.skip(f"Need at least {tp_size} x {pp_size} GPUs")
    if VLLM_MULTI_NODE and distributed_backend == "mp":
208
209
210
211
        pytest.skip(
            "Skipping multi-node pipeline parallel test for "
            "multiprocessing distributed backend"
        )
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
    if multi_node_only and not VLLM_MULTI_NODE:
        pytest.skip("Not in multi-node setting")

    common_args = [
        # use half precision for speed and memory savings in CI environment
        "--dtype",
        "float16",
        "--max-model-len",
        "2048",
        "--max-num-seqs",
        "8",
    ]
    if chunked_prefill:
        common_args.append("--enable-chunked-prefill")
    if eager_mode:
        common_args.append("--enforce-eager")
228
229
    if runner != "auto":
        common_args.extend(["--runner", runner])
230
231
232
233
234
235
236
237
    if trust_remote_code:
        common_args.append("--trust-remote-code")
    if tokenizer_mode:
        common_args.extend(["--tokenizer-mode", tokenizer_mode])
    if load_format:
        common_args.extend(["--load-format", load_format])
    if hf_overrides:
        common_args.extend(["--hf-overrides", json.dumps(hf_overrides)])
238
239
240
241
242
243
244
245
    if require_embed_inputs:
        common_args.extend(
            [
                "--skip-tokenizer-init",
                "--enable-prompt-embeds",
                "--enable-mm-embeds",
            ]
        )
246
247

    compilation_config = {
248
        "mode": CompilationMode.VLLM_COMPILE,
249
250
251
        "compile_sizes": [4, 8],
        "pass_config": {
            "enable_sequence_parallelism": True,
252
            "enable_async_tp": enable_async_tp,
253
254
            "enable_fusion": enable_fusion,
            "enable_noop": True,
255
        },
256
        "use_inductor_graph_partition": use_inductor_graph_partition,
257
258
259
260
261
262
    }

    tp_sp_args = [
        *common_args,
        "--tensor-parallel-size",
        str(tp_size),
263
264
        "--pipeline-parallel-size",
        str(pp_size),
265
266
267
        "--distributed-executor-backend",
        distributed_backend,
        "--compilation_config",
268
        json.dumps(compilation_config),
269
270
271
272
273
274
275
276
277
278
    ]

    tp_args = [
        *common_args,
        "--tensor-parallel-size",
        str(tp_size),
        "--distributed-executor-backend",
        "mp",
    ]

279
    compare_two_settings(model_id, tp_sp_args, tp_args, method=method)
280
281
282
283


SP_TEXT_GENERATION_MODELS = {
    # [Decoder-only]
284
    "hmellor/tiny-random-LlamaForCausalLM": SPTestSettings.fast(),
285
    "RedHatAI/Meta-Llama-3.1-8B-Instruct-FP8": SPTestSettings.fp8_quant(),
286
287
288
289
290
}

SP_TEST_MODELS = [
    # TODO support other models
    # [LANGUAGE GENERATION]
291
    "hmellor/tiny-random-LlamaForCausalLM",
Huy Do's avatar
Huy Do committed
292
    "RedHatAI/Meta-Llama-3.1-8B-Instruct-FP8",
293
294
295
296
]


@pytest.mark.parametrize(
297
298
299
300
301
302
303
    (
        "model_id",
        "parallel_setup",
        "distributed_backend",
        "runner",
        "test_options",
    ),
304
    [
305
306
        params
        for model_id, settings in SP_TEXT_GENERATION_MODELS.items()
307
308
309
310
        for params in settings.iter_params(model_id)
        if model_id in SP_TEST_MODELS
    ],
)
311
@pytest.mark.parametrize("use_inductor_graph_partition", [True, False])
312
@pytest.mark.parametrize("enable_async_tp", [False])  # TODO: enable async TP
313
314
315
316
317
@create_new_process_for_each_test()
def test_tp_sp_generation(
    model_id: str,
    parallel_setup: ParallelSetup,
    distributed_backend: str,
318
    runner: RunnerOption,
319
320
    test_options: SPTestOptions,
    num_gpus_available,
321
    use_inductor_graph_partition: bool,
322
    enable_async_tp: bool,
323
):
324
325
326
    if use_inductor_graph_partition and not is_torch_equal_or_newer("2.9.0.dev"):
        pytest.skip("inductor graph partition is only available in PyTorch 2.9+")

327
328
329
330
331
332
333
334
    # Skip FP8 SP-only test on sm89 (compute capability 8.9)
    if (
        "fp8" in model_id.lower()
        and current_platform.get_device_capability() < (9, 0)
        and (not enable_async_tp)
    ):
        pytest.skip("FP8 reduction support begins with sm90 capable devices.")

335
336
337
338
339
340
341
    _compare_sp(
        model_id,
        parallel_setup,
        distributed_backend,
        runner,
        test_options,
        num_gpus_available,
342
        use_inductor_graph_partition,
343
        enable_async_tp=enable_async_tp,
344
345
346
        method="generate",
        is_multimodal=False,
    )