parallel.py 37.4 KB
Newer Older
1
2
3
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project

4
import os
5
import socket
6
from collections.abc import Callable
7
from typing import TYPE_CHECKING, Any, Literal, overload
8

9
import regex as re
10
import torch
11
from pydantic import Field, field_validator, model_validator
12
from torch.distributed import ProcessGroup, ReduceOp, Store
13
14
15
16
17
18
from typing_extensions import Self

import vllm.envs as envs
from vllm.config.utils import config
from vllm.logger import init_logger
from vllm.platforms import current_platform
19
from vllm.utils.network_utils import get_open_ports_list
20
21
22
23
24

if TYPE_CHECKING:
    from ray.runtime_env import RuntimeEnv
    from ray.util.placement_group import PlacementGroup

25
    from vllm.v1.executor import Executor
26
27
28
else:
    RuntimeEnv = Any
    PlacementGroup = Any
29
    Executor = Any
30
31

logger = init_logger(__name__)
32
_NUMACTL_CPUSET_PATTERN = re.compile(r"^\d+(?:-\d+)?(?:,\d+(?:-\d+)?)*$")
33

34
ExpertPlacementStrategy = Literal["linear", "round_robin"]
35
DistributedExecutorBackend = Literal["ray", "mp", "uni", "external_launcher"]
36
DataParallelBackend = Literal["ray", "mp"]
Mercykid-bash's avatar
Mercykid-bash committed
37
EPLBPolicyOption = Literal["default"]
38
DCPCommBackend = Literal["ag_rs", "a2a"]
39
EPLBCommunicatorBackend = Literal["torch_nccl", "torch_gloo", "pynccl"]
40
41
42
43
44
All2AllBackend = Literal[
    "naive",
    "pplx",
    "deepep_high_throughput",
    "deepep_low_latency",
45
    "mori",
46
    "nixl_ep",
47
    "allgather_reducescatter",
48
49
50
    "flashinfer_all2allv",  # temporary alias for flashinfer_nvlink_two_sided
    "flashinfer_nvlink_two_sided",
    "flashinfer_nvlink_one_sided",
51
]
52
53


54
55
56
57
@config
class EPLBConfig:
    """Configuration for Expert Parallel Load Balancing (EP)."""

58
    window_size: int = Field(default=1000, gt=0)
59
    """Window size for expert load recording."""
60
    step_interval: int = Field(default=3000, gt=0)
61
62
63
64
65
66
67
    """
    Interval for rearranging experts in expert parallelism.

    Note that if this is greater than the EPLB window size, only the metrics
    of the last `lb_window_size` steps will be used for rearranging experts.
    """

68
    num_redundant_experts: int = Field(default=0, ge=0)
69
70
71
72
73
74
75
    """Number of redundant experts to use for expert parallelism."""

    log_balancedness: bool = False
    """
    Log the balancedness each step of expert parallelism.
    This is turned off by default since it will cause communication overhead.
    """
76
    log_balancedness_interval: int = Field(default=1, gt=0)
77
78
79
    """
    Interval for logging the balancedness.
    """
80
81
82
83
    use_async: bool = False
    """
    Whether to use non-blocking EPLB.
    """
84

Mercykid-bash's avatar
Mercykid-bash committed
85
86
87
    policy: EPLBPolicyOption = "default"
    """The policy type for expert parallel load balancing (EPLB)."""

88
89
90
91
92
93
94
95
96
    communicator: EPLBCommunicatorBackend | None = None
    """
    Backend for EPLB expert weight communication:
    - "torch_nccl": Use torch.distributed on the device process group
    - "torch_gloo": Use torch.distributed gloo with CPU staging
    - "pynccl": Use PyNccl send/recv
    - None: Auto-select backend ("torch_gloo" for async, "torch_nccl" for sync)
    """

97
98
99
100
101
102
103
104
    @model_validator(mode="after")
    def _validate_eplb_config(self) -> Self:
        if self.use_async and self.policy != "default":
            raise ValueError("Async EPLB is only supported with the default policy.")
        if self.log_balancedness and self.log_balancedness_interval <= 0:
            raise ValueError("log_balancedness_interval must be greater than 0.")
        return self

105

106
107
108
109
110
111
112
113
@config
class ParallelConfig:
    """Configuration for the distributed execution."""

    pipeline_parallel_size: int = 1
    """Number of pipeline parallel groups."""
    tensor_parallel_size: int = 1
    """Number of tensor parallel groups."""
114
115
    prefill_context_parallel_size: int = 1
    """Number of prefill context parallel groups."""
116
117
118
119
120
121
122
    data_parallel_size: int = 1
    """Number of data parallel groups. MoE layers will be sharded according to
    the product of the tensor parallel size and data parallel size."""
    data_parallel_size_local: int = 1
    """Number of local data parallel groups."""
    data_parallel_rank: int = 0
    """Rank of the data parallel group."""
123
    data_parallel_rank_local: int | None = None
124
125
126
127
128
129
130
131
    """Local rank of the data parallel group,
    set only in SPMD mode."""
    data_parallel_master_ip: str = "127.0.0.1"
    """IP of the data parallel master."""
    data_parallel_rpc_port: int = 29550
    """Port for data parallel messaging."""
    data_parallel_master_port: int = 29500
    """Port of the data parallel master."""
132
    data_parallel_backend: DataParallelBackend = "mp"
133
134
135
136
    """Backend to use for data parallel, either "mp" or "ray"."""
    data_parallel_external_lb: bool = False
    """Whether to use "external" DP LB mode. Applies only to online serving
    and when data_parallel_size > 0. This is useful for a "one-pod-per-rank"
co63oc's avatar
co63oc committed
137
    wide-EP setup in Kubernetes. Set implicitly when --data-parallel-rank
138
139
140
141
142
143
144
145
    is provided explicitly to vllm serve."""
    data_parallel_hybrid_lb: bool = False
    """Whether to use "hybrid" DP LB mode. Applies only to online serving
    and when data_parallel_size > 0. Enables running an AsyncLLM
    and API server on a "per-node" basis where vLLM load balances
    between local data parallel ranks, but an external LB balances
    between vLLM nodes/replicas. Set explicitly in conjunction with
    --data-parallel-start-rank."""
146
147
    is_moe_model: bool | None = None
    """Whether the deployed model is MoE (if known)."""
148
149
    enable_expert_parallel: bool = False
    """Use expert parallelism instead of tensor parallelism for MoE layers."""
150
151
152
153
154
155
156
    enable_ep_weight_filter: bool = False
    """Skip non-local expert weights during model loading when expert
    parallelism is active.  Each rank only reads its own expert shard from
    disk, which can drastically reduce storage I/O for MoE models with
    per-expert weight tensors (e.g. DeepSeek, Mixtral, Kimi-K2.5).  Has no
    effect on 3D fused-expert checkpoints (e.g. GPT-OSS) or non-MoE
    models."""
157
158
    enable_eplb: bool = False
    """Enable expert parallelism load balancing for MoE layers."""
159
    eplb_config: EPLBConfig = Field(default_factory=EPLBConfig)
160
    """Expert parallelism configuration."""
161
    expert_placement_strategy: ExpertPlacementStrategy = "linear"
162
163
    """The expert placement strategy for MoE layers:

164
165
    - "linear": Experts are placed in a contiguous manner. For example, with 4
      experts and 2 ranks, rank 0 will have experts [0, 1] and rank 1 will have
166
      experts [2, 3].
167
168
169
170
    - "round_robin": Experts are placed in a round-robin manner. For example,
      with 4 experts and 2 ranks, rank 0 will have experts [0, 2] and rank 1
      will have experts [1, 3]. This strategy can help improve load balancing
      for grouped expert models with no redundant experts."""
171
172
173
    all2all_backend: All2AllBackend = "allgather_reducescatter"
    """All2All backend for MoE expert parallel communication. Available options:

174
175
176
177
178
    - "allgather_reducescatter": All2all based on allgather and reducescatter
    - "deepep_high_throughput": Use deepep high-throughput kernels
    - "deepep_low_latency": Use deepep low-latency kernels
    - "mori": Use mori kernels
    - "nixl_ep": Use nixl-ep kernels
179
180
    - "flashinfer_nvlink_two_sided": Use flashinfer two-sided kernels for mnnvl
    - "flashinfer_nvlink_one_sided": Use flashinfer high-throughput a2a kernels"""
181

182
    max_parallel_loading_workers: int | None = None
183
184
185
186
187
188
189
    """Maximum number of parallel loading workers when loading model
    sequentially in multiple batches. To avoid RAM OOM when using tensor
    parallel and large models."""

    disable_custom_all_reduce: bool = False
    """Disable the custom all-reduce kernel and fall back to NCCL."""

190
191
192
    enable_elastic_ep: bool = False
    """Enable elastic expert parallelism with stateless NCCL groups for DP/EP."""

193
    enable_dbo: bool = False
194
    """Enable dual batch overlap for the model executor."""
195
196
    ubatch_size: int = 0
    """Number of ubatch size."""
197
198

    dbo_decode_token_threshold: int = 32
199
200
201
202
203
204
205
206
207
    """The threshold for dual batch overlap for batches only containing decodes.
    If the number of tokens in the request is greater than this threshold,
    microbatching will be used. Otherwise, the request will be processed in a
    single batch."""
    dbo_prefill_token_threshold: int = 512  # TODO(lucas): tune
    """The threshold for dual batch overlap for batches that contain one or more
    prefills. If the number of tokens in the request is greater than this
    threshold, microbatching will be used. Otherwise, the request will be
    processed in a single batch."""
208

209
    disable_nccl_for_dp_synchronization: bool | None = None
210
    """Forces the dp synchronization logic in vllm/v1/worker/dp_utils.py 
211
212
213
214
    to use Gloo instead of NCCL for its all reduce.

    Defaults to True when async scheduling is enabled, False otherwise.
    """
215

216
217
218
    ray_workers_use_nsight: bool = False
    """Whether to profile Ray workers with nsight, see https://docs.ray.io/en/latest/ray-observability/user-guides/profiling.html#profiling-nsight-profiler."""

219
    ray_runtime_env: RuntimeEnv | None = None
220
221
    """Ray runtime environment to pass to distributed workers."""

222
    placement_group: PlacementGroup | None = None
223
224
    """ray distributed model workers placement group."""

225
    distributed_executor_backend: (
226
        str | DistributedExecutorBackend | type[Executor] | None
227
    ) = None
228
229
    """
    Backend to use for distributed model workers, either "ray" or "mp"
230
231
232
233
234
235
    (multiprocessing). If the product of pipeline_parallel_size and tensor_parallel_size
    is less than or equal to the number of GPUs available, "mp" will be used to
    keep processing on a single host. Otherwise, an error will be raised. To use "mp"
    you must also set nnodes, and to use "ray" you must manually set
    distributed_executor_backend to "ray".

236
237
238
239
    Note:
        [TPU](https://docs.vllm.ai/projects/tpu/en/latest/) platform only supports Ray
        for distributed inference.
    """
240
241
242
243
244
245
246
247
248
249
250
251

    worker_cls: str = "auto"
    """The full name of the worker class to use. If "auto", the worker class
    will be determined based on the platform."""
    sd_worker_cls: str = "auto"
    """The full name of the worker class to use for speculative decoding.
    If "auto", the worker class will be determined based on the platform."""
    worker_extension_cls: str = ""
    """The full name of the worker extension class to use. The worker extension
    class is dynamically inherited by the worker class. This is used to inject
    new attributes and methods to the worker class for use in collective_rpc
    calls."""
252
253
254
255
256
257
258
259
260
261
    master_addr: str = "127.0.0.1"
    """distributed master address for multi-node distributed 
    inference when distributed_executor_backend is mp."""
    master_port: int = 29501
    """distributed master port for multi-node distributed 
    inference when distributed_executor_backend is mp."""
    node_rank: int = 0
    """distributed node rank for multi-node distributed 
    inference when distributed_executor_backend is mp."""
    nnodes: int = 1
262
    """num of nodes for multi-node distributed
263
    inference when distributed_executor_backend is mp."""
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
    numa_bind: bool = False
    """Enable NUMA binding for GPU worker subprocesses."""
    numa_bind_nodes: list[int] | None = None
    """NUMA node to bind each GPU worker to.

    Specify one NUMA node per visible GPU, for example `[0, 0, 1, 1]`
    for a 4-GPU system with GPUs 0-1 on NUMA node 0 and GPUs 2-3 on
    NUMA node 1. If unset and `numa_bind=True`, vLLM auto-detects the
    GPU-to-NUMA topology. The values are passed to `numactl --membind`
    and `--cpunodebind`, so they must be valid `numactl` NUMA node indices.
    """
    numa_bind_cpus: list[str] | None = None
    """Optional CPU lists to bind each GPU worker to.

    Specify one CPU list per visible GPU, for example
    `["0-3", "4-7", "8-11", "12-15"]`. When set, vLLM uses
    `numactl --physcpubind` instead of `--cpunodebind`. This is useful
    for custom policies such as binding to PCT or other high-frequency cores.
    Each entry must use `numactl --physcpubind` CPU-list syntax, for example
    `"0-3"` or `"0,2,4-7"`.
    """
285

286
287
288
289
290
291
    distributed_timeout_seconds: int | None = None
    """Timeout in seconds for distributed operations (e.g., init_process_group).
    If set, this value is passed to torch.distributed.init_process_group as the
    timeout parameter. If None, PyTorch's default timeout is used (600s for NCCL).
    Increase this for multi-node setups where model downloads may be slow."""

292
    world_size: int = Field(init=False)
293
294
295
296
297
    """world_size is TPxPP, it affects the number of workers we create."""

    rank: int = 0
    """Global rank in distributed setup."""

298
    _data_parallel_master_port_list: list[int] = Field(default_factory=list)
299
300
301
302
    """List of open port auto-queried for data parallel messaging.
    Set to be private as it's not intended to be configured by users.
    """

303
304
305
    _coord_store_port: int = 0
    """Port of the coordination TCPStore. Can be set by the API server; workers
    connect as clients to exchange self-picked group ports at runtime."""
306

307
308
309
310
311
    decode_context_parallel_size: int = 1
    """Number of decode context parallel groups, because the world size does
    not change by dcp, it simply reuse the GPUs of TP group, and tp_size
    needs to be divisible by dcp_size."""

312
    dcp_kv_cache_interleave_size: int = 1
313
314
315
316
317
318
    """
    Interleave size of kv_cache storage while using DCP.
    dcp_kv_cache_interleave_size has been replaced by cp_kv_cache_interleave_size,
    and will be deprecated when PCP is fully supported.

    """
319
320
321
322
323
324
325
326
    dcp_comm_backend: DCPCommBackend = "ag_rs"
    """Communication backend for Decode Context Parallel (DCP).
    - "ag_rs": AllGather + ReduceScatter (default, existing behavior)
    - "a2a": All-to-All exchange of partial outputs + LSE, then
      combine with Triton kernel. Reduces NCCL calls from 3 to 2
      per layer for MLA models.
    """

327
328
329
    cp_kv_cache_interleave_size: int = 1
    """Interleave size of kv_cache storage while using DCP or PCP.
    For `total_cp_rank = pcp_rank * dcp_world_size + dcp_rank`,
330
        and `total_cp_world_size = pcp_world_size * dcp_world_size`.
331
    store interleave_size tokens on total_cp_rank i,
332
    then store next interleave_size tokens on total_cp_rank i+1.
333
334
335
336
337
338
339
    Interleave_size=1: token-level alignment, where token `i` is stored on
        total_cp_rank `i % total_cp_world_size`.
    Interleave_size=block_size: block-level alignment, where tokens are
        first populated to the preceding ranks. Tokens are then stored
        in (rank i+1, block j) only after (rank i, block j) is fully occupied.
    Block_size should be greater than or equal to cp_kv_cache_interleave_size.
    Block_size should be divisible by cp_kv_cache_interleave_size.
340
341
    """

342
343
344
345
    data_parallel_index: int = Field(init=False)
    """Equal to the data parallel rank but not used for torch process groups
    and not overridden for dense models."""

346
    _api_process_count: int = Field(default=1, gt=0)
347
348
349
350
351
352
353
354
    """
    The number of API processes initialized.

    Note:
        This is an internal config that is only valid for and
        should only be set by API server scale-out.
    """

355
    _api_process_rank: int = Field(default=0, ge=-1)
356
357
358
359
360
361
362
363
364
    """
    The rank of this API process, or `-1` for engine core processes
    under API server scale-out.

    Note:
        This is an internal config that is only valid for and
        should only be set by API server scale-out.
    """

365
366
367
368
369
370
    @field_validator("disable_nccl_for_dp_synchronization", mode="wrap")
    @classmethod
    def _skip_none_validation(cls, value: Any, handler: Callable) -> Any:
        """Skip validation if the value is `None` when initialisation is delayed."""
        return None if value is None else handler(value)

371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
    @field_validator("numa_bind_nodes")
    @classmethod
    def _validate_numa_bind_nodes(cls, value: list[int] | None) -> list[int] | None:
        if value is None:
            return None
        if not value:
            raise ValueError("numa_bind_nodes must not be empty.")
        if any(node < 0 for node in value):
            raise ValueError("numa_bind_nodes must contain non-negative integers.")
        return value

    @field_validator("numa_bind_cpus")
    @classmethod
    def _validate_numa_bind_cpus(cls, value: list[str] | None) -> list[str] | None:
        if value is None:
            return None
        if not value:
            raise ValueError("numa_bind_cpus must not be empty.")

        for cpuset in value:
            if not cpuset:
                raise ValueError("numa_bind_cpus entries must not be empty.")
            if not _NUMACTL_CPUSET_PATTERN.fullmatch(cpuset):
                raise ValueError(
                    "numa_bind_cpus entries must use numactl CPU list syntax, "
                    "for example '0-3' or '0,2,4-7'."
                )
            for part in cpuset.split(","):
                if "-" not in part:
                    continue
                start_str, end_str = part.split("-", 1)
                if int(start_str) > int(end_str):
                    raise ValueError(
                        f"numa_bind_cpus ranges must be ascending, but got '{cpuset}'."
                    )
        return value

408
409
410
411
412
413
414
415
416
    @model_validator(mode="after")
    def _validate_parallel_config(self) -> Self:
        if self._api_process_rank >= self._api_process_count:
            raise ValueError(
                "Invalid value of `_api_process_rank`. "
                f"Expected to be `-1` or `[0, {self._api_process_count})`, "
                f"but found: {self._api_process_rank}"
            )

417
        if self.all2all_backend in ["pplx", "naive"]:
418
            logger.warning(
419
420
421
                "The '%s' all2all backend has been removed. "
                "Falling back to 'allgather_reducescatter'.",
                self.all2all_backend,
422
423
424
            )
            self.all2all_backend = "allgather_reducescatter"

425
426
427
428
429
430
431
432
433
434
435
        if self.data_parallel_size_local > self.data_parallel_size:
            raise ValueError(
                f"data_parallel_size_local ({self.data_parallel_size_local}) "
                f"must be <= data_parallel_size ({self.data_parallel_size})"
            )

        if self.data_parallel_size <= 1 and self.data_parallel_external_lb:
            raise ValueError(
                "data_parallel_external_lb can only be set when data_parallel_size > 1"
            )

436
437
438
439
440
441
442
        if not self.numa_bind and (
            self.numa_bind_nodes is not None or self.numa_bind_cpus is not None
        ):
            raise ValueError(
                "numa_bind_nodes and numa_bind_cpus require numa_bind=True."
            )

443
        if self.enable_eplb:
444
            if not current_platform.is_cuda_alike():
445
446
                raise ValueError(
                    "Expert parallelism load balancing is only supported on "
447
                    "CUDA devices or ROCm devices now."
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
                )
            if not self.enable_expert_parallel:
                raise ValueError("enable_expert_parallel must be True to use EPLB.")
            if self.tensor_parallel_size * self.data_parallel_size <= 1:
                raise ValueError(
                    "EPLB requires tensor_parallel_size or data_parallel_size "
                    f"to be greater than 1, but got "
                    f"TP={self.tensor_parallel_size},DP={self.data_parallel_size}."
                )
        else:
            if self.eplb_config.num_redundant_experts != 0:
                raise ValueError(
                    "num_redundant_experts is set to "
                    f"{self.eplb_config.num_redundant_experts} but EPLB is not "
                    "enabled. Either enable EPLB or unset "
                    "num_redundant_experts."
                )

466
467
468
469
470
471
472
473
474
475
476
        # Note(hc): In the current implementation of decode context
        # parallel(DCP), tp_size needs to be divisible by dcp_size,
        # because the world size does not change by dcp, it simply
        # reuses the GPUs of TP group, and split one TP group into
        # tp_size//dcp_size DCP groups.
        if self.tensor_parallel_size % self.decode_context_parallel_size != 0:
            raise ValueError(
                f"tp_size={self.tensor_parallel_size} must be divisible by"
                f"dcp_size={self.decode_context_parallel_size}."
            )

477
478
479
480
481
        if self.dcp_comm_backend == "a2a" and self.decode_context_parallel_size <= 1:
            raise ValueError(
                "dcp_comm_backend='a2a' requires decode_context_parallel_size > 1."
            )

482
483
        return self

484
485
486
487
488
489
    @property
    def world_size_across_dp(self) -> int:
        """world_size_across_dp is TPxPPxDP, it is the size of the world
        including data parallelism."""
        return self.world_size * self.data_parallel_size

490
491
492
493
494
495
496
497
    @property
    def use_ubatching(self) -> bool:
        return self.enable_dbo or self.ubatch_size > 1

    @property
    def num_ubatches(self) -> int:
        return 2 if self.enable_dbo else self.ubatch_size

498
499
500
501
502
503
504
505
    @property
    def local_engines_only(self) -> bool:
        """
        Client manages local+remote EngineCores in pure internal LB case.
        Client manages local EngineCores in hybrid and external LB case.
        """
        return self.data_parallel_external_lb or self.data_parallel_hybrid_lb

506
507
508
509
510
511
    def get_next_dp_init_port(self) -> int:
        """
        We might need to initialize process groups in multiple
        processes that is related to data parallelism,
        e.g. both in the worker and in the engine, which
        can live in different processes. To avoid port conflicts, we
512
513
        pop a new port from the prepared port list each time we need to
        initialize a new process group related to data parallelism.
514
        """
515
516
517
518
519
520
        if self._data_parallel_master_port_list:
            answer = self._data_parallel_master_port_list.pop()
        else:
            answer = self.data_parallel_master_port
            self.data_parallel_master_port += 1

521
522
        return answer

523
524
    def _pick_stateless_dp_port(self) -> tuple[int, socket.socket | None]:
        """Return ``(port, listen_socket)`` for DP group init.
525

526
527
528
        With a coord store, rank 0 binds a socket and publishes the port;
        others read it.  Without one, pops a pre-allocated port and
        returns ``listen_socket=None``.
529
        """
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
        if not self._coord_store_port:
            return self.get_next_dp_init_port(), None

        from vllm.distributed.utils import get_cached_tcp_store_client

        store = get_cached_tcp_store_client(
            self.data_parallel_master_ip, self._coord_store_port
        )

        key = "dp_master_port"
        if self.data_parallel_rank == 0:
            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            s.bind((self.data_parallel_master_ip, 0))
            s.listen()
            port = s.getsockname()[1]
            store.set(key, str(port).encode())
            return port, s
        else:
            return int(store.get(key).decode()), None
549

550
551
552
553
554
555
556
557
558
559
560
    @overload
    def stateless_init_dp_group(
        self, return_store: Literal[False] = ...
    ) -> ProcessGroup: ...
    @overload
    def stateless_init_dp_group(
        self, return_store: Literal[True] = ...
    ) -> tuple[ProcessGroup, Store]: ...
    def stateless_init_dp_group(
        self, return_store: bool = False
    ) -> ProcessGroup | tuple[ProcessGroup, Store]:
561
562
563
564
565
566
567
568
569
570
        # NOTE: In high-concurrency scenarios multiple processes
        # can pick the same (currently free) port through a race
        # condition when calling `get_open_port()`. When the first
        # process binds the port the others will subsequently fail
        # with `torch.distributed.DistNetworkError: EADDRINUSE`.
        # To make the initialization more robust we retry a few times
        # with a fresh port whenever this specific error is observed.
        from torch.distributed import DistNetworkError

        from vllm.distributed.utils import (
571
572
            stateless_init_torch_distributed_process_group,
        )
573
574

        max_retries = 5
575
        last_exc: Exception | None = None
576
577
        for _ in range(max_retries):
            try:
578
                port, listen_socket = self._pick_stateless_dp_port()
579
580
581
                # use gloo since the engine process might not have cuda device
                return stateless_init_torch_distributed_process_group(
                    self.data_parallel_master_ip,
582
                    port,
583
584
                    self.data_parallel_rank,
                    self.data_parallel_size,
585
586
                    backend="gloo",
                    return_store=return_store,
587
                    listen_socket=listen_socket,
588
                )
589
590
591
            except DistNetworkError as e:
                # We only want to retry when the root cause is EADDRINUSE.
                if "EADDRINUSE" in str(e):
592
                    logger.warning("Address already in use. Retrying with a new port.")
593
594
595
596
597
598
599
600
                    last_exc = e
                    continue  # try again with a new port
                raise e

        # If we get here all retries have failed.
        assert last_exc is not None
        raise last_exc

601
602
603
604
605
606
607
608
609
610
    # The all_reduce at the end of attention (during o_proj) means that
    # inputs are replicated across each rank of the tensor parallel group.
    # If using expert-parallelism with DeepEP All2All ops, replicated
    # tokens results in useless duplicate computation and communication.
    #
    # In this case, ensure the input to the experts is sequence parallel
    # to avoid the excess work.
    #
    @property
    def use_sequence_parallel_moe(self) -> bool:
611
        return (
612
            self.all2all_backend
613
614
615
616
            in (
                "allgather_reducescatter",
                "deepep_high_throughput",
                "deepep_low_latency",
617
                "mori",
618
                "nixl_ep",
619
620
621
622
623
            )
            and self.enable_expert_parallel
            and self.tensor_parallel_size > 1
            and self.data_parallel_size > 1
        )
624

625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
    @property
    def node_rank_within_dp(self) -> int:
        return self.node_rank % self.nnodes_within_dp

    @property
    def nnodes_within_dp(self) -> int:
        if self.nnodes == 1:
            return 1
        data_parallel_node_size = (
            self.data_parallel_size // self.data_parallel_size_local
        )
        return self.nnodes // data_parallel_node_size

    @property
    def local_world_size(self) -> int:
        return self.world_size // self.nnodes_within_dp

642
    @staticmethod
643
644
    def has_unfinished_dp(dp_group: ProcessGroup, has_unfinished: bool) -> bool:
        tensor = torch.tensor([has_unfinished], dtype=torch.int32, device="cpu")
645
646
647
648
649
650
651
652
653
        # dp rank 0: has_unfinished_seqs=True
        # dp rank 1: has_unfinished_seqs=False
        # aggregated: has_unfinished_seqs=True
        # so this is an OR operation, i.e. MAX in integers
        torch.distributed.all_reduce(tensor, op=ReduceOp.MAX, group=dp_group)
        aggregated_has_unfinished = bool(tensor.item())
        return aggregated_has_unfinished

    @staticmethod
654
    def sync_kv_cache_memory_size(dp_group: ProcessGroup, kv_cache_memory: int) -> int:
655
656
        if kv_cache_memory == -1:
            kv_cache_memory = torch.iinfo(torch.int64).max
657
        tensor = torch.tensor([kv_cache_memory], dtype=torch.int64, device="cpu")
658
659
660
661
662
663
664
665
666
667
668
669
        # we cannot use broadcast for stateless dp group since it depends
        # on global rank
        torch.distributed.all_reduce(tensor, op=ReduceOp.MIN, group=dp_group)
        return tensor.item()

    def compute_hash(self):
        """
        Provide a hash that uniquely identifies all the configs
        that affect the structure of the computation
        graph from input ids/embeddings to the final hidden states,
        excluding anything before input ids/embeddings and after
        the final hidden states.
670
671
672

        This hash is also used for DP worker configuration validation
        to prevent hangs from mismatched collective communication patterns.
673
        """
674
675
676
677
        ignored_factors = {
            # Derived/runtime topology, networking, or launch details
            "data_parallel_rank",
            "data_parallel_rank_local",
678
            "data_parallel_size_local",
679
            "data_parallel_index",
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
            "data_parallel_backend",
            "data_parallel_external_lb",
            "data_parallel_hybrid_lb",
            "data_parallel_master_ip",
            "data_parallel_master_port",
            "_data_parallel_master_port_list",
            "data_parallel_rpc_port",
            "rank",
            "master_addr",
            "master_port",
            "node_rank",
            "nnodes",
            "max_parallel_loading_workers",
            "disable_custom_all_reduce",
            "ray_workers_use_nsight",
            "ray_runtime_env",
            "placement_group",
            "distributed_executor_backend",
            "worker_cls",
            "sd_worker_cls",
            "worker_extension_cls",
            "_api_process_count",
            "_api_process_rank",
        }

        from vllm.config.utils import get_hash_factors, hash_factors

        factors = get_hash_factors(self, ignored_factors)
        return hash_factors(factors)
709
710

    def __post_init__(self) -> None:
711
        # Continue with the rest of the initialization
712
713
714
715
716
        self.world_size = (
            self.pipeline_parallel_size
            * self.tensor_parallel_size
            * self.prefill_context_parallel_size
        )
717

718
719
720
721
        if self.distributed_executor_backend == "external_launcher":
            logger.info("Using external launcher for distributed inference.")
            self.world_size *= self.data_parallel_size

722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
        if self.enable_elastic_ep:
            if not self.enable_eplb:
                raise ValueError("Elastic EP is only supported with enable_eplb=True.")
            if self.pipeline_parallel_size > 1:
                raise ValueError(
                    "Elastic EP is not supported with pipeline parallelism "
                    f"(pipeline_parallel_size={self.pipeline_parallel_size})."
                )
            if self.data_parallel_external_lb or self.data_parallel_hybrid_lb:
                raise NotImplementedError(
                    "Elastic EP is not compatible with data_parallel_external_lb "
                    "or data_parallel_hybrid_lb. Elastic EP relies on a single API "
                    "server and core client to coordinate scale up/down."
                )

737
738
        if self.data_parallel_size > 1 or self.data_parallel_size_local == 0:
            # Data parallel was specified in the engine args.
739
740
741
            if self.distributed_executor_backend == "external_launcher":
                # For external launcher,
                # we need to set the data parallel rank automatically
742
743
744
745
746
747
748
                self.data_parallel_rank = int(os.environ["RANK"]) // (
                    self.world_size // self.data_parallel_size
                )
                logger.info(
                    "Set data_parallel_rank to %d automatically.",
                    self.data_parallel_rank,
                )
749
750
751
752
753
754
            if not self.enable_elastic_ep:
                if not self._data_parallel_master_port_list:
                    self._data_parallel_master_port_list = get_open_ports_list(5)
                self.data_parallel_master_port = (
                    self._data_parallel_master_port_list.pop()
                )
755
756
757
758

            if not (0 <= self.data_parallel_rank < self.data_parallel_size):
                raise ValueError(
                    f"data_parallel_rank ({self.data_parallel_rank})"
759
760
                    f" must be in the range [0, {self.data_parallel_size})"
                )
761
762
763
764
765
766
767
768
        else:
            # Otherwise fall back to env vars (e.g. for offline SPMD case).
            self.data_parallel_size = envs.VLLM_DP_SIZE
            self.data_parallel_rank = envs.VLLM_DP_RANK
            self.data_parallel_rank_local = envs.VLLM_DP_RANK_LOCAL
            self.data_parallel_master_ip = envs.VLLM_DP_MASTER_IP
            self.data_parallel_master_port = envs.VLLM_DP_MASTER_PORT

769
770
771
772
773
774
775
776
            if self.data_parallel_size > 1 and self.is_moe_model is False:
                raise ValueError(
                    "Offline data parallel mode is not supported/useful"
                    " for dense models."
                )

        self.data_parallel_index = self.data_parallel_rank

777
778
779
780
        if self.distributed_executor_backend == "external_launcher":
            os.environ["VLLM_ENABLE_V1_MULTIPROCESSING"] = "0"
            logger.info("Disabling V1 multiprocessing for external launcher.")

781
        if self.distributed_executor_backend is None and self.world_size_across_dp > 1:
782
783
784
            # We use multiprocessing by default if world_size fits on the
            # current node and we aren't in a ray placement group.

785
            from vllm.v1.executor import ray_utils
786

787
788
            backend: DistributedExecutorBackend = "mp"
            ray_found = ray_utils.ray_is_available()
789
            if current_platform.is_tpu() and envs.VLLM_XLA_USE_SPMD:
790
                backend = "uni"
791
792
            elif current_platform.is_cuda() and self.nnodes > 1:
                backend = "mp"
793
794
            elif (
                current_platform.is_cuda()
795
                and current_platform.device_count() < self.world_size
796
            ):
797
                gpu_count = current_platform.device_count()
798
                raise ValueError(
799
800
801
802
803
                    f"World size ({self.world_size}) is larger than the number of "
                    f"available GPUs ({gpu_count}) in this node. If this is "
                    "intentional and you are using:\n"
                    "- ray, set '--distributed-executor-backend ray'.\n"
                    "- multiprocessing, set '--nnodes' appropriately."
804
                )
805
            elif self.data_parallel_backend == "ray":
806
807
808
809
                logger.info(
                    "Using ray distributed inference because "
                    "data_parallel_backend is ray"
                )
810
811
812
813
814
815
                backend = "ray"
            elif ray_found:
                if self.placement_group:
                    backend = "ray"
                else:
                    from ray import is_initialized as ray_is_initialized
816

817
818
                    if ray_is_initialized():
                        from ray.util import get_current_placement_group
819

820
821
822
                        if get_current_placement_group():
                            backend = "ray"
            self.distributed_executor_backend = backend
823
            logger.debug("Defaulting to use %s for distributed inference", backend)
824
825
826
827

        if self.distributed_executor_backend is None and self.world_size == 1:
            self.distributed_executor_backend = "uni"

828
829
830
831
832
        if self.max_parallel_loading_workers is not None:
            logger.warning(
                "max_parallel_loading_workers is currently "
                "not supported and will be ignored."
            )
833
834
835
836
837
        allowed_backends = ("mp", "uni", "external_launcher")
        if (
            self.distributed_executor_backend not in allowed_backends
            and self.nnodes > 1
        ):
838
            raise ValueError(
839
                "nnodes > 1 can only be set when distributed executor "
840
                "backend is mp, uni or external_launcher."
841
            )
842

843
844
845
846
847
848
849
850
851
852
853
854
        if self.enable_eplb and self.eplb_config.communicator is None:
            if self.enable_elastic_ep:
                # Elastic EP requires stateless mode
                # (torch.distributed.batch_isend_irecv doesn't
                # support stateless mode), so we use PyNCCL backend
                self.eplb_config.communicator = "pynccl"
            elif self.eplb_config.use_async:
                # Torch Gloo is a backend that allows avoiding hangs
                # due to NCCL multi-thread conflicts in async EPLB
                self.eplb_config.communicator = "torch_gloo"
            else:
                self.eplb_config.communicator = "torch_nccl"
855

856
857
858
859
    @property
    def use_ray(self) -> bool:
        return self.distributed_executor_backend == "ray" or (
            isinstance(self.distributed_executor_backend, type)
860
861
            and getattr(self.distributed_executor_backend, "uses_ray", False)
        )
862

863
    @model_validator(mode="after")
864
865
    def _verify_args(self) -> Self:
        # Lazy import to avoid circular import
866
        from vllm.v1.executor import Executor
867
868

        # Enable batch invariance settings if requested
869
        if envs.VLLM_BATCH_INVARIANT:
870
            self.disable_custom_all_reduce = True
871
872
873
874
875
876

        if (
            self.distributed_executor_backend is not None
            and not isinstance(self.distributed_executor_backend, str)
            and not (
                isinstance(self.distributed_executor_backend, type)
877
                and issubclass(self.distributed_executor_backend, Executor)
878
879
            )
        ):
880
881
882
            raise ValueError(
                "Unrecognized distributed executor backend "
                f"{self.distributed_executor_backend}. Supported "
883
                "values are 'ray', 'mp' 'uni', 'external_launcher', "
884
                " custom Executor subclass or its import path."
885
            )
886
        if self.use_ray:
887
            from vllm.v1.executor import ray_utils
888

889
890
891
892
893
894
            ray_utils.assert_ray_available()

        if not current_platform.use_custom_allreduce():
            self.disable_custom_all_reduce = True
            logger.debug(
                "Disabled the custom all-reduce kernel because it is not "
895
896
                "supported on current platform."
            )
897
898
899
900
901
        if self.nnodes > 1:
            self.disable_custom_all_reduce = True
            logger.debug(
                "Disabled the custom all-reduce since we are running on multi-node."
            )
902
        if self.ray_workers_use_nsight and not self.use_ray:
903
904
905
            raise ValueError(
                "Unable to use nsight profiling unless workers run with Ray."
            )
906
907

        return self