ray_xpu_executor.py 1.39 KB
Newer Older
1
import asyncio
2
from typing import List, Optional
3

4
5
import ray

6
import vllm.envs as envs
7
8
from vllm.executor.ray_gpu_executor import RayGPUExecutor, RayGPUExecutorAsync
from vllm.executor.xpu_executor import XPUExecutor
9
from vllm.logger import init_logger
10
from vllm.utils import make_async
11
12
13
14

logger = init_logger(__name__)


15
class RayXPUExecutor(RayGPUExecutor, XPUExecutor):
16

17
    def _get_env_vars_to_be_updated(self):
18
        # Get the set of GPU IDs used on each node.
19
20
21
22
23
24
25
        worker_node_and_gpu_ids = []
        for worker in [self.driver_dummy_worker] + self.workers:
            if worker is None:
                # driver_dummy_worker can be None when using ray spmd worker.
                continue
            worker_node_and_gpu_ids.append(
                ray.get(worker.get_node_and_gpu_ids.remote()))  # type: ignore
26

27
28
29
30
31
32
        # Set environment variables for the driver and workers.
        all_args_to_update_environment_variables = [({
            "VLLM_TRACE_FUNCTION":
            str(envs.VLLM_TRACE_FUNCTION),
        }, ) for (_, _) in worker_node_and_gpu_ids]
        return all_args_to_update_environment_variables
33
34


35
class RayXPUExecutorAsync(RayXPUExecutor, RayGPUExecutorAsync):
36
37
38
39

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.driver_exec_method = make_async(self.driver_worker.execute_method)
40
        self.pp_locks: Optional[List[asyncio.Lock]] = None