llm_engine.py 6.24 KB
Newer Older
1
from typing import Dict, List, Mapping, Optional, Type, Union
2

3
from vllm.config import VllmConfig
4
5
from vllm.engine.arg_utils import EngineArgs
from vllm.engine.metrics_types import StatLoggerBase
6
7
from vllm.envs import VLLM_ENABLE_V1_MULTIPROCESSING
from vllm.inputs import INPUT_REGISTRY, InputRegistry, PromptType
8
9
from vllm.logger import init_logger
from vllm.lora.request import LoRARequest
10
from vllm.multimodal import MULTIMODAL_REGISTRY, MultiModalRegistry
11
from vllm.outputs import RequestOutput
12
13
from vllm.pooling_params import PoolingParams
from vllm.prompt_adapter.request import PromptAdapterRequest
14
15
from vllm.sampling_params import SamplingParams
from vllm.transformers_utils.tokenizer_group import init_tokenizer_from_configs
16
from vllm.usage.usage_lib import UsageContext
17
18
19
from vllm.v1.engine.core_client import EngineCoreClient
from vllm.v1.engine.detokenizer import Detokenizer
from vllm.v1.engine.processor import Processor
20
21
22
23
24
25
from vllm.v1.executor.gpu_executor import GPUExecutor

logger = init_logger(__name__)


class LLMEngine:
26
    """Legacy LLMEngine for backwards compatibility."""
27
28
29

    def __init__(
        self,
30
        vllm_config: VllmConfig,
31
32
33
34
35
        executor_class: Type[GPUExecutor],
        log_stats: bool,
        usage_context: UsageContext = UsageContext.ENGINE_CONTEXT,
        stat_loggers: Optional[Dict[str, StatLoggerBase]] = None,
        input_registry: InputRegistry = INPUT_REGISTRY,
36
        mm_registry: MultiModalRegistry = MULTIMODAL_REGISTRY,
37
        use_cached_outputs: bool = False,
38
        multiprocess_mode: bool = False,
39
    ) -> None:
40

41
42
43
44
45
46
47
48
49
50
51
52
53
54
        # TODO: Can we avoid this?
        self.model_config = vllm_config.model_config

        # Tokenizer (+ ensure liveness if running in another process).
        self.tokenizer = init_tokenizer_from_configs(
            model_config=vllm_config.model_config,
            scheduler_config=vllm_config.scheduler_config,
            parallel_config=vllm_config.parallel_config,
            enable_lora=bool(vllm_config.lora_config))
        self.tokenizer.ping()

        # Processor (convert Inputs --> EngineCoreRequests)
        self.processor = Processor(vllm_config.model_config,
                                   vllm_config.lora_config, self.tokenizer,
55
                                   input_registry, mm_registry)
56
57

        # Detokenizer (converts EngineCoreOutputs --> RequestOutput)
58
59
60
61
62
63
        self.detokenizer = Detokenizer(
            tokenizer_name=vllm_config.model_config.tokenizer,
            tokenizer_mode=vllm_config.model_config.tokenizer_mode,
            trust_remote_code=vllm_config.model_config.trust_remote_code,
            revision=vllm_config.model_config.tokenizer_revision,
        )
64
65
66
67
68
69
70
71

        # EngineCore (gets EngineCoreRequests and gives EngineCoreOutputs)
        self.engine_core = EngineCoreClient.make_client(
            vllm_config,
            executor_class,
            usage_context,
            multiprocess_mode=multiprocess_mode,
            asyncio_mode=False,
72
        )
73
74
75
76
77
78
79

    @classmethod
    def from_engine_args(
        cls,
        engine_args: EngineArgs,
        usage_context: UsageContext = UsageContext.ENGINE_CONTEXT,
        stat_loggers: Optional[Dict[str, StatLoggerBase]] = None,
80
        enable_multiprocessing: bool = False,
81
82
    ) -> "LLMEngine":
        """Creates an LLM engine from the engine arguments."""
83

84
        # Create the engine configs.
85
        vllm_config = engine_args.create_engine_config(usage_context)
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
        executor_class = cls._get_executor_cls(vllm_config)

        if VLLM_ENABLE_V1_MULTIPROCESSING:
            logger.debug("Enabling multiprocessing for LLMEngine.")
            enable_multiprocessing = True

        # Create the LLMEngine.
        return cls(vllm_config=vllm_config,
                   executor_class=executor_class,
                   log_stats=not engine_args.disable_log_stats,
                   usage_context=usage_context,
                   stat_loggers=stat_loggers,
                   multiprocess_mode=enable_multiprocessing)

    @classmethod
    def _get_executor_cls(cls, vllm_config: VllmConfig):
        return GPUExecutor
103
104
105
106

    def stop_remote_worker_execution_loop(self) -> None:
        raise NotImplementedError("TP not implemented yet.")

107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
    def get_num_unfinished_requests(self) -> int:
        return self.detokenizer.get_num_unfinished_requests()

    def has_unfinished_requests(self) -> bool:
        return self.detokenizer.has_unfinished_requests()

    @classmethod
    def validate_outputs(cls, outputs, output_type):
        return outputs

    def abort_request(self, request_ids: List[str]) -> None:
        """Remove request_ids from EngineCore and Detokenizer."""

        self.engine_core.abort_requests(request_ids)
        self.detokenizer.abort_requests(request_ids)

123
124
125
126
127
128
129
130
131
132
133
134
    def add_request(
        self,
        request_id: str,
        prompt: PromptType,
        params: Union[SamplingParams, PoolingParams],
        arrival_time: Optional[float] = None,
        lora_request: Optional[LoRARequest] = None,
        trace_headers: Optional[Mapping[str, str]] = None,
        prompt_adapter_request: Optional[PromptAdapterRequest] = None,
        priority: int = 0,
    ) -> None:

135
136
137
138
        # 1) Process raw inputs into the request.
        detokenizer_req, engine_core_req = self.processor.process_inputs(
            request_id, prompt, params, arrival_time, lora_request,
            trace_headers, prompt_adapter_request, priority)
139

140
141
        # 2) Add the request to Detokenizer.
        self.detokenizer.add_request(detokenizer_req)
142

143
144
        # 3) Add the request to EngineCore.
        self.engine_core.add_request(engine_core_req)
145
146
147

    def step(self) -> List[RequestOutput]:

148
149
        # 1) Get EngineCoreOutput from the EngineCore.
        engine_core_outputs = self.engine_core.get_output()
150

151
152
153
        # 2) Detokenizer the EngineCoreOutput.
        request_outputs, requests_to_abort = self.detokenizer.step(
            engine_core_outputs)
154

155
156
157
        # 3) Abort requests that finished due to stopping criteria.
        if requests_to_abort:
            self.abort_request(requests_to_abort)
158

159
        return request_outputs
160

161
    # TODO(rob): Can we get rid of these?
162

163
    def get_model_config(self):
164
165
        pass

166
    def start_profile(self):
167
168
        pass

169
170
    def stop_profile(self):
        pass
171

172
173
    def get_tokenizer_group(self, group_type):
        pass