"vscode:/vscode.git/clone" did not exist on "01a55941f5443c16a88889995f8149edcee2eec3"
xpu_executor.py 2.79 KB
Newer Older
1
from typing import Callable, List, Optional, Tuple, Type, Union
2
3
4

import torch

5
from vllm.config import ModelConfig, ParallelConfig
6
7
8
from vllm.executor.executor_base import ExecutorAsyncBase
from vllm.executor.gpu_executor import GPUExecutor
from vllm.logger import init_logger
9
10
from vllm.model_executor.layers.sampler import SamplerOutput
from vllm.sequence import ExecuteModelRequest, PoolerOutput
11
from vllm.utils import make_async
12
from vllm.worker.worker_base import WorkerBase
13
14
15
16
17
18

logger = init_logger(__name__)


class XPUExecutor(GPUExecutor):

19
20
    uses_ray: bool = False

21
22
23
24
    def _init_executor(self) -> None:
        assert self.device_config.device_type == "xpu"
        assert self.speculative_config is None, (
            "Speculative decoding not yet supported for XPU backend")
25

26
27
        self.model_config = _verify_and_get_model_config(self.model_config)
        GPUExecutor._init_executor(self)
28

29
30
31
    def _get_worker_module_and_class(
            self) -> Tuple[str, str, Optional[Callable[[], Type[WorkerBase]]]]:
        worker_class_fn = None
32
        if self.speculative_config is not None:
33
34
            raise NotImplementedError(
                "XPU does not support speculative decoding")
35
36
37
        else:
            worker_module_name = "vllm.worker.xpu_worker"
            worker_class_name = "XPUWorker"
38
        return (worker_module_name, worker_class_name, worker_class_fn)
39
40

    def execute_model(
41
42
        self, execute_model_req: ExecuteModelRequest
    ) -> Optional[List[Union[SamplerOutput, PoolerOutput]]]:
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
        output = self.driver_worker.execute_model(execute_model_req)
        return output


class XPUExecutorAsync(XPUExecutor, ExecutorAsyncBase):

    async def execute_model_async(
        self,
        execute_model_req: ExecuteModelRequest,
    ) -> List[SamplerOutput]:
        output = await make_async(self.driver_worker.execute_model
                                  )(execute_model_req=execute_model_req)
        return output


def _verify_and_get_model_config(config: ModelConfig) -> ModelConfig:
    if config.dtype == torch.bfloat16:
        logger.warning(
            "bfloat16 is not fully supported on XPU, casting to float16.")
        config.dtype = torch.float16
    if not config.enforce_eager:
        logger.warning(
            "CUDA graph is not supported on XPU, fallback to the eager "
            "mode.")
        config.enforce_eager = True
    return config
69
70
71
72
73
74
75
76
77
78


def _verify_and_get_parallel_config(config: ParallelConfig) -> ParallelConfig:
    if (config.distributed_executor_backend is not None
            and config.distributed_executor_backend != "ray"):
        logger.warning(
            "%s is not supported on XPU, fallback to ray distributed executor "
            "backend.", config.distributed_executor_backend)
        config.distributed_executor_backend = "ray"
    return config