"vllm/model_executor/models/exaone4.py" did not exist on "15c6a079b1ccea03587031c9d709fb9dce0a1d93"
llm_engine.py 10.6 KB
Newer Older
1
2
# SPDX-License-Identifier: Apache-2.0

3
4
from collections.abc import Mapping
from typing import Optional, Union
5

6
7
from typing_extensions import TypeVar

8
import vllm.envs as envs
9
from vllm.config import ParallelConfig, VllmConfig
10
11
from vllm.engine.arg_utils import EngineArgs
from vllm.engine.metrics_types import StatLoggerBase
12
from vllm.inputs import INPUT_REGISTRY, InputRegistry, PromptType
13
14
from vllm.logger import init_logger
from vllm.lora.request import LoRARequest
15
from vllm.multimodal import MULTIMODAL_REGISTRY, MultiModalRegistry
16
from vllm.outputs import RequestOutput
17
18
from vllm.pooling_params import PoolingParams
from vllm.prompt_adapter.request import PromptAdapterRequest
19
from vllm.sampling_params import SamplingParams
20
21
from vllm.transformers_utils.tokenizer_group import (
    BaseTokenizerGroup, init_tokenizer_from_configs)
22
from vllm.usage.usage_lib import UsageContext
23
from vllm.v1.engine.core_client import EngineCoreClient
24
from vllm.v1.engine.output_processor import OutputProcessor
25
from vllm.v1.engine.parallel_sampling import SyncParallelSamplingManager
26
from vllm.v1.engine.processor import Processor
27
from vllm.v1.executor.abstract import Executor
28
29
30

logger = init_logger(__name__)

31
32
_G = TypeVar("_G", bound=BaseTokenizerGroup, default=BaseTokenizerGroup)

33
34

class LLMEngine:
35
    """Legacy LLMEngine for backwards compatibility."""
36
37
38

    def __init__(
        self,
39
        vllm_config: VllmConfig,
40
        executor_class: type[Executor],
41
42
        log_stats: bool,
        usage_context: UsageContext = UsageContext.ENGINE_CONTEXT,
43
        stat_loggers: Optional[dict[str, StatLoggerBase]] = None,
44
        input_registry: InputRegistry = INPUT_REGISTRY,
45
        mm_registry: MultiModalRegistry = MULTIMODAL_REGISTRY,
46
        use_cached_outputs: bool = False,
47
        multiprocess_mode: bool = False,
48
    ) -> None:
49
        self.vllm_config = vllm_config
50
        self.model_config = vllm_config.model_config
51
        self.cache_config = vllm_config.cache_config
52

53
54
55
        # Bookkeeping for parallel sampling requests
        self.parallel_manager = SyncParallelSamplingManager()

56
57
58
59
60
61
62
        # important: init dp group before init the engine_core
        self.parallel_config = vllm_config.parallel_config
        self.dp_enabled = self.parallel_config.data_parallel_size > 1  # noqa
        self.should_execute_dummy_batch = False
        if self.dp_enabled:
            self.dp_group = self.parallel_config.stateless_init_dp_group()

63
64
65
66
67
        # Tokenizer (+ ensure liveness if running in another process).
        self.tokenizer = init_tokenizer_from_configs(
            model_config=vllm_config.model_config,
            scheduler_config=vllm_config.scheduler_config,
            parallel_config=vllm_config.parallel_config,
68
            lora_config=vllm_config.lora_config)
69
70
71
        self.tokenizer.ping()

        # Processor (convert Inputs --> EngineCoreRequests)
72
73
74
75
76
77
        self.processor = Processor(model_config=vllm_config.model_config,
                                   cache_config=vllm_config.cache_config,
                                   lora_config=vllm_config.lora_config,
                                   tokenizer=self.tokenizer,
                                   input_registry=input_registry,
                                   mm_registry=mm_registry)
78

79
80
81
        # OutputProcessor (convert EngineCoreOutputs --> RequestOutput).
        self.output_processor = OutputProcessor(self.tokenizer,
                                                log_stats=False)
82
83
84
85
86

        # EngineCore (gets EngineCoreRequests and gives EngineCoreOutputs)
        self.engine_core = EngineCoreClient.make_client(
            multiprocess_mode=multiprocess_mode,
            asyncio_mode=False,
87
88
            vllm_config=vllm_config,
            executor_class=executor_class,
89
            log_stats=False,  # FIXME: implement
90
        )
91

92
93
94
95
        if not multiprocess_mode:
            # for v0 compatibility
            self.model_executor = self.engine_core.engine_core.model_executor  # type: ignore

96
97
98
99
100
    @classmethod
    def from_engine_args(
        cls,
        engine_args: EngineArgs,
        usage_context: UsageContext = UsageContext.ENGINE_CONTEXT,
101
        stat_loggers: Optional[dict[str, StatLoggerBase]] = None,
102
        enable_multiprocessing: bool = False,
103
104
    ) -> "LLMEngine":
        """Creates an LLM engine from the engine arguments."""
105

106
        # Create the engine configs.
107
        vllm_config = engine_args.create_engine_config(usage_context)
108
        executor_class = Executor.get_class(vllm_config)
109

110
        if envs.VLLM_ENABLE_V1_MULTIPROCESSING:
111
112
113
114
115
116
117
118
119
120
121
122
            logger.debug("Enabling multiprocessing for LLMEngine.")
            enable_multiprocessing = True

        # Create the LLMEngine.
        return cls(vllm_config=vllm_config,
                   executor_class=executor_class,
                   log_stats=not engine_args.disable_log_stats,
                   usage_context=usage_context,
                   stat_loggers=stat_loggers,
                   multiprocess_mode=enable_multiprocessing)

    def get_num_unfinished_requests(self) -> int:
123
124
        return self.parallel_manager.get_num_unfinished_requests(
            self.output_processor.get_num_unfinished_requests())
125
126

    def has_unfinished_requests(self) -> bool:
127
128
129
130
131
132
133
134
135
136
137
        has_unfinished = self.output_processor.has_unfinished_requests()
        if not self.dp_enabled:
            return has_unfinished
        return self.has_unfinished_requests_dp(has_unfinished)

    def has_unfinished_requests_dp(self, has_unfinished: bool) -> bool:
        aggregated_has_unfinished = ParallelConfig.has_unfinished_dp(
            self.dp_group, has_unfinished)
        if not has_unfinished and aggregated_has_unfinished:
            self.should_execute_dummy_batch = True
        return aggregated_has_unfinished
138
139
140
141
142

    @classmethod
    def validate_outputs(cls, outputs, output_type):
        return outputs

143
    def abort_request(self, request_ids: list[str]) -> None:
144
145
146
        """Remove request_ids from EngineCore and Detokenizer."""

        self.engine_core.abort_requests(request_ids)
147
        self.output_processor.abort_requests(request_ids)
148

149
150
151
152
153
154
155
156
157
158
159
    def add_request(
        self,
        request_id: str,
        prompt: PromptType,
        params: Union[SamplingParams, PoolingParams],
        arrival_time: Optional[float] = None,
        lora_request: Optional[LoRARequest] = None,
        trace_headers: Optional[Mapping[str, str]] = None,
        prompt_adapter_request: Optional[PromptAdapterRequest] = None,
        priority: int = 0,
    ) -> None:
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
        """Add request."""
        kwargs = dict(request_id=request_id,
                      prompt=prompt,
                      params=params,
                      arrival_time=arrival_time,
                      lora_request=lora_request,
                      trace_headers=trace_headers,
                      prompt_adapter_request=prompt_adapter_request,
                      priority=priority)
        # Handle parallel sampling requests differently.
        if params is None or isinstance(params,
                                        PoolingParams) or params.n == 1:
            self._add_request(**kwargs)
        else:
            # Special handling for parallel sampling requests
            self.parallel_manager.add_request_parallel_sampling(
                add_request=self._add_request, **kwargs)

    def _add_request(
        self,
        request_id: str,
        prompt: PromptType,
        params: Union[SamplingParams, PoolingParams],
        arrival_time: Optional[float] = None,
        lora_request: Optional[LoRARequest] = None,
        trace_headers: Optional[Mapping[str, str]] = None,
        prompt_adapter_request: Optional[PromptAdapterRequest] = None,
        priority: int = 0,
    ) -> None:
        """Add request, `n=1`"""
190
        # 1) Process raw inputs into the request.
191
192
193
194
195
        request = self.processor.process_inputs(request_id, prompt, params,
                                                arrival_time, lora_request,
                                                trace_headers,
                                                prompt_adapter_request,
                                                priority)
196

197
198
        # 2) Make a new RequestState and queue.
        self.output_processor.add_request(request)
199

200
        # 3) Add the request to EngineCore.
201
        self.engine_core.add_request(request)
202

203
    def step(self) -> list[RequestOutput]:
204

205
206
207
208
209
        if self.should_execute_dummy_batch:
            self.should_execute_dummy_batch = False
            self.engine_core.execute_dummy_batch()
            return []

210
        # 1) Get EngineCoreOutput from the EngineCore.
211
        outputs = self.engine_core.get_output()
212

213
214
        # 2) Process EngineCoreOutputs.
        processed_outputs = self.output_processor.process_outputs(
215
            outputs.outputs)
216

217
218
        # 3) Abort any reqs that finished due to stop strings.
        self.engine_core.abort_requests(processed_outputs.reqs_to_abort)
219

220
221
222
223
        request_outputs = processed_outputs.request_outputs

        # 4) Process unfinished parallel sampling requests
        return self.parallel_manager.step(request_outputs)
224

225
    def get_model_config(self):
226
        return self.model_config
227

228
    def start_profile(self):
229
        self.engine_core.profile(True)
230

231
    def stop_profile(self):
232
        self.engine_core.profile(False)
233

234
235
236
    def reset_prefix_cache(self):
        self.engine_core.reset_prefix_cache()

237
238
239
240
241
242
    def sleep(self, level: int = 1):
        self.engine_core.sleep(level)

    def wake_up(self):
        self.engine_core.wake_up()

243
244
    def get_tokenizer_group(
        self,
245
        group_type: type[_G] = BaseTokenizerGroup,
246
247
248
249
250
251
252
253
254
255
256
257
    ) -> _G:
        tokenizer_group = self.tokenizer

        if tokenizer_group is None:
            raise ValueError("Unable to get tokenizer because "
                             "skip_tokenizer_init is True")
        if not isinstance(tokenizer_group, group_type):
            raise TypeError("Invalid type of tokenizer group. "
                            f"Expected type: {group_type}, but "
                            f"found type: {type(tokenizer_group)}")

        return tokenizer_group
258
259
260
261
262
263
264
265
266

    def add_lora(self, lora_request: LoRARequest) -> bool:
        """Load a new LoRA adapter into the engine for future requests."""
        return self.engine_core.add_lora(lora_request)

    def remove_lora(self, lora_id: int) -> bool:
        """Remove an already loaded LoRA adapter."""
        return self.engine_core.remove_lora(lora_id)

267
    def list_loras(self) -> set[int]:
268
269
270
271
272
273
        """List all registered adapters."""
        return self.engine_core.list_loras()

    def pin_lora(self, lora_id: int) -> bool:
        """Prevent an adapter from being evicted."""
        return self.engine_core.pin_lora(lora_id)