abstract.py 3.63 KB
Newer Older
1
2
# SPDX-License-Identifier: Apache-2.0

3
from typing import Type
4
5

from vllm.config import VllmConfig
6
7
8
9
10
11
12
from vllm.executor.executor_base import ExecutorBase
from vllm.executor.ray_distributed_executor import (  # noqa
    RayDistributedExecutor as RayDistributedExecutorV0)
from vllm.executor.uniproc_executor import (  # noqa
    ExecutorWithExternalLauncher as ExecutorWithExternalLauncherV0)
from vllm.executor.uniproc_executor import (  # noqa
    UniProcExecutor as UniProcExecutorV0)
13
from vllm.v1.kv_cache_interface import KVCacheConfig, KVCacheSpec
14
15
16
from vllm.v1.outputs import ModelRunnerOutput


17
18
19
20
class Executor(ExecutorBase):
    """
    Abstract class for v1 executors, mainly define some methods for v1.
    For methods shared by v0 and v1, define them in ExecutorBase"""
21

22
23
24
    @staticmethod
    def get_class(vllm_config: VllmConfig) -> Type["Executor"]:
        executor_class: Type[Executor]
25
        parallel_config = vllm_config.parallel_config
26
        distributed_executor_backend = (
27
            parallel_config.distributed_executor_backend)
28
29
30
31
32
33
34
35
        # distributed_executor_backend must be set in VllmConfig.__post_init__
        if isinstance(distributed_executor_backend, type):
            if not issubclass(distributed_executor_backend, ExecutorBase):
                raise TypeError(
                    "distributed_executor_backend must be a subclass of "
                    f"ExecutorBase. Got {distributed_executor_backend}.")
            executor_class = distributed_executor_backend
        elif distributed_executor_backend == "ray":
36
            executor_class = RayDistributedExecutor
37
38
39
        elif distributed_executor_backend == "mp":
            from vllm.v1.executor.multiproc_executor import MultiprocExecutor
            executor_class = MultiprocExecutor
40
41
42
43
44
45
        elif distributed_executor_backend == "uni":
            executor_class = UniProcExecutor
        elif distributed_executor_backend == "external_launcher":
            # TODO: make v1 scheduling deterministic
            # to support external launcher
            executor_class = ExecutorWithExternalLauncher
46
        else:
47
48
            raise ValueError("Unknown distributed executor backend: "
                             f"{distributed_executor_backend}")
49
50
        return executor_class

51
    def initialize(self, kv_cache_config: KVCacheConfig) -> None:
52
53
54
55
56
57
        """
        Initialize the KV caches and begin the model execution loop of the
        underlying workers.
        """
        self.collective_rpc("initialize_cache", args=(kv_cache_config, ))
        self.collective_rpc("compile_or_warm_up_model")
58

59
    def determine_available_memory(self) -> int:  # in bytes
60
61
62
63
64
        output = self.collective_rpc("determine_available_memory")
        # Since we use a shared centralized controller, we take the minimum
        # memory size across all workers to make sure all the memory
        # operators can be applied to all workers.
        return min(output)
65
66

    def get_kv_cache_spec(self) -> KVCacheSpec:
67
68
69
70
        output = self.collective_rpc("get_kv_cache_spec")
        for x in output:
            assert x == output[0]
        return output[0]
71
72
73
74
75

    def execute_model(
        self,
        scheduler_output,
    ) -> ModelRunnerOutput:
76
77
78
        output = self.collective_rpc("execute_model",
                                     args=(scheduler_output, ))
        return output[0]
79

80
    def profile(self, is_start: bool = True):
81
82
83
84
85
86
87
88
89
        self.collective_rpc("profile", args=(is_start, ))


class UniProcExecutor(UniProcExecutorV0, Executor):
    pass


class ExecutorWithExternalLauncher(ExecutorWithExternalLauncherV0, Executor):
    pass
90
91


92
93
class RayDistributedExecutor(RayDistributedExecutorV0, Executor):
    pass