ray_executor.py 13.3 KB
Newer Older
Rui Qiao's avatar
Rui Qiao committed
1
2
3
4
5
6
7
8
9
10
import os
from collections import defaultdict
from itertools import islice, repeat
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple

import vllm.envs as envs
from vllm.config import VllmConfig
from vllm.logger import init_logger
from vllm.utils import get_distributed_init_method, get_ip, get_open_port
from vllm.v1.executor.abstract import Executor
11
12
from vllm.v1.executor.ray_utils import (RayWorkerWrapper,
                                        initialize_ray_cluster, ray)
13
from vllm.v1.kv_cache_interface import KVCacheConfig, KVCacheSpec
Rui Qiao's avatar
Rui Qiao committed
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
from vllm.v1.outputs import ModelRunnerOutput

if ray is not None:
    from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy

if TYPE_CHECKING:
    from ray.util.placement_group import PlacementGroup

logger = init_logger(__name__)


class RayExecutor(Executor):

    def __init__(self, vllm_config: VllmConfig) -> None:
        self.vllm_config = vllm_config
        self.parallel_config = vllm_config.parallel_config
        self.model_config = vllm_config.model_config
        self.forward_dag: Optional[ray.dag.CompiledDAG] = None

        # Disable Ray usage stats collection.
        ray_usage = os.environ.get("RAY_USAGE_STATS_ENABLED", "0")
        if ray_usage != "1":
            os.environ["RAY_USAGE_STATS_ENABLED"] = "0"

38
        initialize_ray_cluster(self.parallel_config)
Rui Qiao's avatar
Rui Qiao committed
39
        placement_group = self.parallel_config.placement_group
40

Rui Qiao's avatar
Rui Qiao committed
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
        # Create the parallel GPU workers.
        self._init_workers_ray(placement_group)

    def _init_workers_ray(self, placement_group: "PlacementGroup",
                          **ray_remote_kwargs):
        # A list of workers to run a model.
        self.workers: List[RayWorkerWrapper] = []
        if self.parallel_config.ray_workers_use_nsight:
            ray_remote_kwargs = self._configure_ray_workers_use_nsight(
                ray_remote_kwargs)

        # Create the workers.
        driver_ip = get_ip()
        for bundle_id, bundle in enumerate(placement_group.bundle_specs):
            if not bundle.get("GPU", 0):
                # Skip bundles that don't have GPUs,
                # as each worker needs one GPU.
                continue
            scheduling_strategy = PlacementGroupSchedulingStrategy(
                placement_group=placement_group,
                placement_group_capture_child_tasks=True,
                placement_group_bundle_index=bundle_id,
            )

            worker = ray.remote(
                num_cpus=0,
                num_gpus=1,
                scheduling_strategy=scheduling_strategy,
                **ray_remote_kwargs,
            )(RayWorkerWrapper).remote(vllm_config=self.vllm_config)
            self.workers.append(worker)

        logger.debug("workers: %s", self.workers)
        worker_ips = [
            ray.get(worker.get_node_ip.remote())  # type: ignore[attr-defined]
            for worker in self.workers
        ]
        ip_counts: Dict[str, int] = {}
        for ip in worker_ips:
            ip_counts[ip] = ip_counts.get(ip, 0) + 1

        worker_to_ip = dict(zip(self.workers, worker_ips))

        def sort_by_driver_then_worker_ip(worker):
            """
            Sort the workers based on 3 properties:
            1. If the worker is on the same node as the driver (vllm engine),
                it should be placed first.
            2. Then, if the worker is on a node with fewer workers, it should
                be placed first.
            3. Finally, if the work is on a node with smaller IP address, it
                should be placed first. This is simply a tiebreaker to make
                sure the workers are sorted in a deterministic way.
            """
            ip = worker_to_ip[worker]
            return (ip != driver_ip, ip_counts[ip], ip)

        # After sorting, the workers on the same node will be
        # close to each other, and the workers on the driver
        # node will be placed first.
        self.workers = sorted(self.workers, key=sort_by_driver_then_worker_ip)

        # Get the set of GPU IDs used on each node.
        worker_node_and_gpu_ids = self._run_workers("get_node_and_gpu_ids")

        node_workers = defaultdict(list)  # node id -> list of worker ranks
        node_gpus = defaultdict(list)  # node id -> list of gpu ids

        for i, (node_id, gpu_ids) in enumerate(worker_node_and_gpu_ids):
            node_workers[node_id].append(i)
            # `gpu_ids` can be a list of strings or integers.
            # convert them to integers for consistency.
            # NOTE: gpu_ids can be larger than 9 (e.g. 16 GPUs),
            # string sorting is not sufficient.
            # see https://github.com/vllm-project/vllm/issues/5590
            gpu_ids = [int(x) for x in gpu_ids]
            node_gpus[node_id].extend(gpu_ids)

        for node_id, gpu_ids in node_gpus.items():
            node_gpus[node_id] = sorted(gpu_ids)

        all_ips = set(worker_ips)
        n_ips = len(all_ips)
        n_nodes = len(node_workers)

        if n_nodes != n_ips:
            raise RuntimeError(
                f"Every node should have a unique IP address. Got {n_nodes}"
                f" nodes with node ids {list(node_workers.keys())} and "
                f"{n_ips} unique IP addresses {all_ips}. Please check your"
                " network configuration. If you set `VLLM_HOST_IP` or "
                "`HOST_IP` environment variable, make sure it is unique for"
                " each node.")

        # Set environment variables for the driver and workers.
        all_args_to_update_environment_variables = [({
            "CUDA_VISIBLE_DEVICES":
            ",".join(map(str, node_gpus[node_id])),
            "VLLM_TRACE_FUNCTION":
            str(envs.VLLM_TRACE_FUNCTION),
            "VLLM_USE_V1":
            str(int(envs.VLLM_USE_V1)),
            **({
                "VLLM_ATTENTION_BACKEND": envs.VLLM_ATTENTION_BACKEND
            } if envs.VLLM_ATTENTION_BACKEND is not None else {})
        }, ) for (node_id, _) in worker_node_and_gpu_ids]

        self._env_vars_for_all_workers = (
            all_args_to_update_environment_variables)

        self._run_workers("update_environment_variables",
                          all_args=self._get_env_vars_to_be_updated())

        if len(node_gpus) == 1:
            # in single node case, we don't need to get the IP address.
            # the loopback address is sufficient
            # NOTE: a node may have several IP addresses, one for each
            # network interface. `get_ip()` might return any of them,
            # while they might not work for communication inside the node
            # if the network setup is complicated. Using the loopback address
            # solves this issue, as it always works for communication inside
            # the node.
            driver_ip = "127.0.0.1"
        distributed_init_method = get_distributed_init_method(
            driver_ip, get_open_port())

        # Initialize the actual workers inside worker wrapper.
        init_worker_all_kwargs = [
            self._get_worker_kwargs(
                local_rank=node_workers[node_id].index(rank),
                rank=rank,
                distributed_init_method=distributed_init_method,
            ) for rank, (node_id, _) in enumerate(worker_node_and_gpu_ids)
        ]
        self._run_workers("init_worker", all_kwargs=init_worker_all_kwargs)
        self._run_workers("initialize")
        self._run_workers("load_model")

    def _configure_ray_workers_use_nsight(self,
                                          ray_remote_kwargs) -> Dict[str, Any]:
        # If nsight profiling is enabled, we need to set the profiling
        # configuration for the ray workers as runtime env.
        runtime_env = ray_remote_kwargs.setdefault("runtime_env", {})
        runtime_env.update({
            "nsight": {
                "t": "cuda,cudnn,cublas",
                "o": "'worker_process_%p'",
                "cuda-graph-trace": "node",
            }
        })

        return ray_remote_kwargs

    def _get_env_vars_to_be_updated(self):
        return self._env_vars_for_all_workers

    def _get_worker_kwargs(
            self,
            local_rank: int = 0,
            rank: int = 0,
            distributed_init_method: Optional[str] = None) -> Dict[str, Any]:
        """
        Return worker init args for a given rank.
        """
        if distributed_init_method is None:
            distributed_init_method = get_distributed_init_method(
                get_ip(), get_open_port())
        return dict(
            vllm_config=self.vllm_config,
            local_rank=local_rank,
            rank=rank,
            distributed_init_method=distributed_init_method,
        )

215
    def determine_available_memory(self) -> int:
Rui Qiao's avatar
Rui Qiao committed
216
        """
217
        Determine the available GPU memory in bytes.
Rui Qiao's avatar
Rui Qiao committed
218
        
219
        This invokes `determine_available_memory` on each worker and takes
Rui Qiao's avatar
Rui Qiao committed
220
221
222
        the min of the results, guaranteeing that the selected cache sizes are
        compatible with all workers.
        """
223
224

        memory_sizes = self._run_workers("determine_available_memory")
Rui Qiao's avatar
Rui Qiao committed
225
226

        # Since we use a shared centralized controller, we take the minimum
227
        # memory size across all workers to make sure all the memory
Rui Qiao's avatar
Rui Qiao committed
228
        # operators can be applied to all workers.
229
        return min(memory_sizes)
Rui Qiao's avatar
Rui Qiao committed
230

231
    def initialize(self, kv_cache_config: KVCacheConfig) -> None:
Rui Qiao's avatar
Rui Qiao committed
232
233
234
        """
        Initialize the KV cache in all workers.
        """
235
        self._run_workers("initialize_cache", kv_cache_config)
Rui Qiao's avatar
Rui Qiao committed
236
237
        self._run_workers("compile_or_warm_up_model")

238
239
240
241
242
243
244
245
246
247
248
    def get_kv_cache_spec(self) -> KVCacheSpec:
        """
        Get all kv cache needed by the model
        
        This invokes `get_kv_cache_spec` on each worker and asserts that
        they are identical. The KVCacheSpec is then returned.
        """
        kv_cache_specs = self._run_workers("get_kv_cache_spec")
        assert all(s == kv_cache_specs[0] for s in kv_cache_specs)
        return kv_cache_specs[0]

Rui Qiao's avatar
Rui Qiao committed
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
    def _run_workers(
        self,
        method: str,
        *args,
        all_args: Optional[List[Tuple[Any, ...]]] = None,
        all_kwargs: Optional[List[Dict[str, Any]]] = None,
        **kwargs,
    ) -> Any:
        """
        Runs the given method on all workers. Can be used in the following
        ways:

        Args:
        - args/kwargs: All workers share the same args/kwargs
        - all_args/all_kwargs: args/kwargs for each worker are specified
          individually
        """
        count = len(self.workers)
        all_worker_args = repeat(args, count) if all_args is None \
            else islice(all_args, 0, None)
        all_worker_kwargs = repeat(kwargs, count) if all_kwargs is None \
            else islice(all_kwargs, 0, None)

        ray_worker_refs = [
            worker.execute_method.remote(  # type: ignore[attr-defined]
                method, *worker_args, **worker_kwargs)
            for (worker, worker_args, worker_kwargs
                 ) in zip(self.workers, all_worker_args, all_worker_kwargs)
        ]
        return ray.get(ray_worker_refs)

    def execute_model(
        self,
        scheduler_output,
    ) -> ModelRunnerOutput:
        if self.forward_dag is None:
            self.forward_dag = self._compiled_ray_dag()
        # Only the first worker (with rank 0) returns the execution result.
        # Others return None.
        output = ray.get(self.forward_dag.execute(scheduler_output))[0]
        return output

    def profile(self, is_start=True):
        raise NotImplementedError

    def shutdown(self):
        if hasattr(self, "forward_dag") and self.forward_dag is not None:
            self.forward_dag.teardown()
            import ray
            for worker in self.workers:
                ray.kill(worker)
            self.forward_dag = None

    def check_health(self) -> None:
        logger.debug("Called check_health.")

    def _check_ray_compiled_graph_installation(self):
        import pkg_resources
        from packaging import version

        required_version = version.parse("2.39")
        current_version = version.parse(
            pkg_resources.get_distribution("ray").version)
        if current_version < required_version:
            raise ValueError(f"Ray version {required_version} is "
                             f"required, but found {current_version}")

        import importlib.util
        raycg = importlib.util.find_spec("ray.experimental.compiled_dag_ref")
        if raycg is None:
            raise ValueError("Ray Compiled Graph is not installed. "
                             "Run `pip install ray[adag]` to install it.")

        cupy_spec = importlib.util.find_spec("cupy")
        if cupy_spec is None and envs.VLLM_USE_RAY_COMPILED_DAG_NCCL_CHANNEL:
            raise ValueError(
                "cupy is not installed but required since "
                "VLLM_USE_RAY_COMPILED_DAG_NCCL_CHANNEL is set."
                "Run `pip install ray[adag]` and check cupy installation.")

    def _compiled_ray_dag(self):
        assert self.parallel_config.use_ray
        self._check_ray_compiled_graph_installation()
        from ray.dag import InputNode, MultiOutputNode

        with InputNode() as input_batches:
            outputs = [
                worker.execute_model.bind(  # type: ignore[attr-defined]
                    input_batches) for worker in self.workers
            ]
            forward_dag = MultiOutputNode(outputs)

        return forward_dag.experimental_compile()

    def __del__(self):
        self.shutdown()