"vllm/vscode:/vscode.git/clone" did not exist on "13d63b65e0604db23c1485d370dbf9adc4e651c7"
uniproc_executor.py 3.03 KB
Newer Older
1
2
3
import os
from typing import Optional, Tuple

Robert Shaw's avatar
Robert Shaw committed
4
from vllm.config import VllmConfig
5
6
from vllm.logger import init_logger
from vllm.utils import get_distributed_init_method, get_ip, get_open_port
7
from vllm.v1.executor.abstract import Executor
8
9
10
11
12
13
from vllm.v1.outputs import ModelRunnerOutput
from vllm.v1.worker.gpu_worker import Worker

logger = init_logger(__name__)


14
class UniprocExecutor(Executor):
15

Robert Shaw's avatar
Robert Shaw committed
16
17
    def __init__(self, vllm_config: VllmConfig) -> None:
        self.vllm_config = vllm_config
18
19
20
21
22
23
24
25
26
27
        self.model_config = vllm_config.model_config
        self.cache_config = vllm_config.cache_config
        self.lora_config = vllm_config.lora_config
        self.load_config = vllm_config.load_config
        self.parallel_config = vllm_config.parallel_config
        self.scheduler_config = vllm_config.scheduler_config
        self.device_config = vllm_config.device_config
        self.speculative_config = vllm_config.speculative_config
        self.prompt_adapter_config = vllm_config.prompt_adapter_config
        self.observability_config = vllm_config.observability_config
28

29
        self.worker: Worker = self._create_worker()
30
        self.worker.init_device()
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
        self.worker.load_model()

    def _create_worker(
            self,
            local_rank: int = 0,
            rank: int = 0,
            distributed_init_method: Optional[str] = None) -> Worker:
        """Return worker init args for a given rank."""
        # see https://github.com/NVIDIA/nccl/issues/1234
        os.environ['NCCL_CUMEM_ENABLE'] = '0'

        if distributed_init_method is None:
            distributed_init_method = get_distributed_init_method(
                get_ip(), get_open_port())
        return Worker(
46
            vllm_config=self.vllm_config,
47
48
49
50
51
52
53
54
55
56
57
            local_rank=local_rank,
            rank=rank,
            distributed_init_method=distributed_init_method,
        )

    def determine_num_available_blocks(self) -> Tuple[int, int]:
        """Determine the number of available KV blocks by invoking the
        underlying worker.
        """
        return self.worker.determine_num_available_blocks()

58
    def initialize(self, num_gpu_blocks: int) -> None:
59
60
61
62
63
64
65
66
67
68
69
70
71
72
        """Initialize the KV cache by invoking the underlying worker.
        """
        # NOTE: This is logged in the executor because there can be >1 worker
        # with other executors. We could log in the engine level, but work
        # remains to abstract away the device for non-GPU configurations.
        logger.info("# GPU blocks: %d", num_gpu_blocks)
        self.worker.initialize_cache(num_gpu_blocks)
        self.worker.compile_or_warm_up_model()

    def execute_model(
        self,
        scheduler_output,
    ) -> ModelRunnerOutput:
        output = self.worker.execute_model(scheduler_output)
73
        assert output is not None
74
75
        return output

76
77
78
79
    def profile(self, is_start: bool = True):
        self.worker.profile(is_start)

    def shutdown(self):
80
        pass
81

82
    def check_health(self) -> None:
83
        # UniprocExecutor will always be healthy as long as
84
85
        # it's running.
        return