Unverified Commit a111d015 authored by youkaichao's avatar youkaichao Committed by GitHub
Browse files

[platforms] absorb worker cls difference into platforms folder (#10555)


Signed-off-by: default avataryoukaichao <youkaichao@gmail.com>
Co-authored-by: default avatarNick Hill <nhill@redhat.com>
parent 446c7806
...@@ -926,56 +926,56 @@ class LoadConfig: ...@@ -926,56 +926,56 @@ class LoadConfig:
f"{rocm_supported_load_format}") f"{rocm_supported_load_format}")
@dataclass
class ParallelConfig: class ParallelConfig:
"""Configuration for the distributed execution. """Configuration for the distributed execution."""
Args: pipeline_parallel_size: int = 1 # Number of pipeline parallel groups.
pipeline_parallel_size: Number of pipeline parallel groups. tensor_parallel_size: int = 1 # Number of tensor parallel groups.
tensor_parallel_size: Number of tensor parallel groups.
worker_use_ray: Deprecated, use distributed_executor_backend instead.
max_parallel_loading_workers: Maximum number of multiple batches
when load model sequentially. To avoid RAM OOM when using tensor
parallel and large models.
disable_custom_all_reduce: Disable the custom all-reduce kernel and
fall back to NCCL.
tokenizer_pool_config: Config for the tokenizer pool.
If None, will use synchronous tokenization.
ray_workers_use_nsight: Whether to profile Ray workers with nsight, see
https://docs.ray.io/en/latest/ray-observability/user-guides/profiling.html#profiling-nsight-profiler.
placement_group: ray distributed model workers placement group.
distributed_executor_backend: Backend to use for distributed model
workers, either "ray" or "mp" (multiprocessing). If the product
of pipeline_parallel_size and tensor_parallel_size is less than
or equal to the number of GPUs available, "mp" will be used to
keep processing on a single host. Otherwise, this will default
to "ray" if Ray is installed and fail otherwise. Note that tpu
and hpu only support Ray for distributed inference.
"""
def __init__( # Deprecated, use distributed_executor_backend instead.
self, worker_use_ray: Optional[bool] = None
pipeline_parallel_size: int,
tensor_parallel_size: int, # Maximum number of multiple batches
worker_use_ray: Optional[bool] = None, # when load model sequentially. To avoid RAM OOM when using tensor
max_parallel_loading_workers: Optional[int] = None, # parallel and large models.
disable_custom_all_reduce: bool = False, max_parallel_loading_workers: Optional[int] = None
tokenizer_pool_config: Optional[TokenizerPoolConfig] = None,
ray_workers_use_nsight: bool = False, # Disable the custom all-reduce kernel and fall back to NCCL.
placement_group: Optional["PlacementGroup"] = None, disable_custom_all_reduce: bool = False
distributed_executor_backend: Optional[Union[
str, Type["ExecutorBase"]]] = None, # Config for the tokenizer pool. If None, will use synchronous tokenization.
) -> None: tokenizer_pool_config: Optional[TokenizerPoolConfig] = None
self.pipeline_parallel_size = pipeline_parallel_size
self.tensor_parallel_size = tensor_parallel_size # Whether to profile Ray workers with nsight, see https://docs.ray.io/en/latest/ray-observability/user-guides/profiling.html#profiling-nsight-profiler.
self.distributed_executor_backend = distributed_executor_backend ray_workers_use_nsight: bool = False
self.max_parallel_loading_workers = max_parallel_loading_workers
self.disable_custom_all_reduce = disable_custom_all_reduce # ray distributed model workers placement group.
self.tokenizer_pool_config = tokenizer_pool_config placement_group: Optional["PlacementGroup"] = None
self.ray_workers_use_nsight = ray_workers_use_nsight
self.placement_group = placement_group # Backend to use for distributed model
self.world_size = pipeline_parallel_size * self.tensor_parallel_size # workers, either "ray" or "mp" (multiprocessing). If the product
# of pipeline_parallel_size and tensor_parallel_size is less than
if worker_use_ray: # or equal to the number of GPUs available, "mp" will be used to
# keep processing on a single host. Otherwise, this will default
# to "ray" if Ray is installed and fail otherwise. Note that tpu
# and hpu only support Ray for distributed inference.
distributed_executor_backend: Optional[Union[str,
Type["ExecutorBase"]]] = None
# the full name of the worker class to use. If "auto", the worker class
# will be determined based on the platform.
worker_cls: str = "auto"
world_size: int = field(init=False)
rank: int = 0
def __post_init__(self) -> None:
self.world_size = self.pipeline_parallel_size * \
self.tensor_parallel_size
if self.worker_use_ray:
if self.distributed_executor_backend is None: if self.distributed_executor_backend is None:
self.distributed_executor_backend = "ray" self.distributed_executor_backend = "ray"
elif not self.use_ray: elif not self.use_ray:
...@@ -1026,7 +1026,6 @@ class ParallelConfig: ...@@ -1026,7 +1026,6 @@ class ParallelConfig:
backend) backend)
self._verify_args() self._verify_args()
self.rank: int = 0
@property @property
def use_ray(self) -> bool: def use_ray(self) -> bool:
...@@ -1059,100 +1058,97 @@ class ParallelConfig: ...@@ -1059,100 +1058,97 @@ class ParallelConfig:
"run with Ray.") "run with Ray.")
@dataclass
class SchedulerConfig: class SchedulerConfig:
"""Scheduler configuration. """Scheduler configuration."""
Args: task: str = "generate" # The task to use the model for.
task: The task to use the model for.
max_num_batched_tokens: Maximum number of tokens to be processed in # Maximum number of tokens to be processed in a single iteration.
a single iteration. max_num_batched_tokens: int = field(default=None) # type: ignore
max_num_seqs: Maximum number of sequences to be processed in a single
iteration. # Maximum number of sequences to be processed in a single iteration.
max_model_len: Maximum length of a sequence (including prompt max_num_seqs: int = 128
and generated text).
num_lookahead_slots: The number of slots to allocate per sequence per # Maximum length of a sequence (including prompt and generated text).
step, beyond the known token ids. This is used in speculative max_model_len: int = 8192
decoding to store KV activations of tokens which may or may not be
accepted. # The number of slots to allocate per sequence per
delay_factor: Apply a delay (of delay factor multiplied by previous # step, beyond the known token ids. This is used in speculative
prompt latency) before scheduling next prompt. # decoding to store KV activations of tokens which may or may not be
enable_chunked_prefill: If True, prefill requests can be chunked based # accepted.
on the remaining max_num_batched_tokens. num_lookahead_slots: int = 0
preemption_mode: Whether to perform preemption by swapping or
recomputation. If not specified, we determine the mode as follows: # Apply a delay (of delay factor multiplied by previous
We use recomputation by default since it incurs lower overhead than # prompt latency) before scheduling next prompt.
swapping. However, when the sequence group has multiple sequences delay_factor: float = 0.0
(e.g., beam search), recomputation is not currently supported. In
such a case, we use swapping instead. # If True, prefill requests can be chunked based
send_delta_data: Private API. If used, scheduler sends delta data to # on the remaining max_num_batched_tokens.
workers instead of an entire data. It should be enabled only enable_chunked_prefill: bool = False
when SPMD worker architecture is enabled. I.e.,
VLLM_USE_RAY_SPMD_WORKER=1 is_multimodal_model: bool = False
policy: The scheduling policy to use. "fcfs" (default) or "priority".
""" # Whether to perform preemption by swapping or
# recomputation. If not specified, we determine the mode as follows:
# We use recomputation by default since it incurs lower overhead than
# swapping. However, when the sequence group has multiple sequences
# (e.g., beam search), recomputation is not currently supported. In
# such a case, we use swapping instead.
preemption_mode: Optional[str] = None
num_scheduler_steps: int = 1
def __init__(self, multi_step_stream_outputs: bool = False
task: _Task,
max_num_batched_tokens: Optional[int], # Private API. If used, scheduler sends delta data to
max_num_seqs: int, # workers instead of an entire data. It should be enabled only
max_model_len: int, # when SPMD worker architecture is enabled. I.e.,
num_lookahead_slots: int = 0, # VLLM_USE_RAY_SPMD_WORKER=1
delay_factor: float = 0.0, send_delta_data: bool = False
enable_chunked_prefill: bool = False,
is_multimodal_model: bool = False, # The scheduling policy to use. "fcfs" (default) or "priority".
preemption_mode: Optional[str] = None, policy: str = "fcfs"
num_scheduler_steps: int = 1,
multi_step_stream_outputs: bool = False, chunked_prefill_enabled: bool = field(init=False)
send_delta_data: bool = False,
policy: str = "fcfs") -> None: def __post_init__(self) -> None:
if max_num_batched_tokens is None: if self.max_num_batched_tokens is None:
if enable_chunked_prefill: if self.enable_chunked_prefill:
if num_scheduler_steps > 1: if self.num_scheduler_steps > 1:
# Multi-step Chunked-Prefill doesn't allow prompt-chunking # Multi-step Chunked-Prefill doesn't allow prompt-chunking
# for now. Have max_num_batched_tokens set to max_model_len # for now. Have max_num_batched_tokens set to max_model_len
# so we don't reject sequences on account of a short # so we don't reject sequences on account of a short
# max_num_batched_tokens. # max_num_batched_tokens.
max_num_batched_tokens = max(max_model_len, 2048) self.max_num_batched_tokens = max(self.max_model_len, 2048)
else: else:
# It is the values that have the best balance between ITL # It is the values that have the best balance between ITL
# and TTFT on A100. Note it is not optimized for throughput. # and TTFT on A100. Note it is not optimized for throughput.
max_num_batched_tokens = 512 self.max_num_batched_tokens = 512
else: else:
# If max_model_len is too short, use 2048 as the default value # If max_model_len is too short, use 2048 as the default value
# for higher throughput. # for higher throughput.
max_num_batched_tokens = max(max_model_len, 2048) self.max_num_batched_tokens = max(self.max_model_len, 2048)
if task == "embedding": if self.task == "embedding":
# For embedding, choose specific value for higher throughput # For embedding, choose specific value for higher throughput
max_num_batched_tokens = max( self.max_num_batched_tokens = max(
max_num_batched_tokens, self.max_num_batched_tokens,
_EMBEDDING_MODEL_MAX_NUM_BATCHED_TOKENS, _EMBEDDING_MODEL_MAX_NUM_BATCHED_TOKENS,
) )
if is_multimodal_model: if self.is_multimodal_model:
# The value needs to be at least the number of multimodal tokens # The value needs to be at least the number of multimodal tokens
max_num_batched_tokens = max( self.max_num_batched_tokens = max(
max_num_batched_tokens, self.max_num_batched_tokens,
_MULTIMODAL_MODEL_MAX_NUM_BATCHED_TOKENS, _MULTIMODAL_MODEL_MAX_NUM_BATCHED_TOKENS,
) )
self.max_num_batched_tokens = max_num_batched_tokens if self.enable_chunked_prefill:
if enable_chunked_prefill:
logger.info( logger.info(
"Chunked prefill is enabled with max_num_batched_tokens=%d.", "Chunked prefill is enabled with max_num_batched_tokens=%d.",
self.max_num_batched_tokens) self.max_num_batched_tokens)
self.task: Final = task self.chunked_prefill_enabled = self.enable_chunked_prefill
self.max_num_seqs = max_num_seqs
self.max_model_len = max_model_len
self.num_lookahead_slots = num_lookahead_slots
self.delay_factor = delay_factor
self.chunked_prefill_enabled = enable_chunked_prefill
self.preemption_mode = preemption_mode
self.num_scheduler_steps = num_scheduler_steps
self.multi_step_stream_outputs = multi_step_stream_outputs
self.send_delta_data = send_delta_data
self.policy = policy
self._verify_args() self._verify_args()
def _verify_args(self) -> None: def _verify_args(self) -> None:
...@@ -2293,10 +2289,10 @@ class VllmConfig: ...@@ -2293,10 +2289,10 @@ class VllmConfig:
model_config: ModelConfig = field(default=None, init=True) # type: ignore model_config: ModelConfig = field(default=None, init=True) # type: ignore
cache_config: CacheConfig = field(default=None, init=True) # type: ignore cache_config: CacheConfig = field(default=None, init=True) # type: ignore
parallel_config: ParallelConfig = field(default=None, parallel_config: ParallelConfig = field(default_factory=ParallelConfig,
init=True) # type: ignore init=True)
scheduler_config: SchedulerConfig = field(default=None, scheduler_config: SchedulerConfig = field(default_factory=SchedulerConfig,
init=True) # type: ignore init=True)
device_config: DeviceConfig = field(default=None, device_config: DeviceConfig = field(default=None,
init=True) # type: ignore init=True) # type: ignore
load_config: LoadConfig = field(default=None, init=True) # type: ignore load_config: LoadConfig = field(default=None, init=True) # type: ignore
......
...@@ -191,6 +191,7 @@ class EngineArgs: ...@@ -191,6 +191,7 @@ class EngineArgs:
override_neuron_config: Optional[Dict[str, Any]] = None override_neuron_config: Optional[Dict[str, Any]] = None
override_pooler_config: Optional[PoolerConfig] = None override_pooler_config: Optional[PoolerConfig] = None
compilation_config: Optional[CompilationConfig] = None compilation_config: Optional[CompilationConfig] = None
worker_cls: str = "auto"
def __post_init__(self): def __post_init__(self):
if not self.tokenizer: if not self.tokenizer:
...@@ -887,6 +888,12 @@ class EngineArgs: ...@@ -887,6 +888,12 @@ class EngineArgs:
'compilers, using -O without space is also ' 'compilers, using -O without space is also '
'supported. -O3 is equivalent to -O 3.') 'supported. -O3 is equivalent to -O 3.')
parser.add_argument(
'--worker-cls',
type=str,
default="auto",
help='The worker class to use for distributed execution.')
return parser return parser
@classmethod @classmethod
...@@ -999,7 +1006,9 @@ class EngineArgs: ...@@ -999,7 +1006,9 @@ class EngineArgs:
self.tokenizer_pool_extra_config, self.tokenizer_pool_extra_config,
), ),
ray_workers_use_nsight=self.ray_workers_use_nsight, ray_workers_use_nsight=self.ray_workers_use_nsight,
distributed_executor_backend=self.distributed_executor_backend) distributed_executor_backend=self.distributed_executor_backend,
worker_cls=self.worker_cls,
)
max_model_len = model_config.max_model_len max_model_len = model_config.max_model_len
use_long_context = max_model_len > 32768 use_long_context = max_model_len > 32768
......
...@@ -115,13 +115,8 @@ class CPUExecutor(ExecutorBase): ...@@ -115,13 +115,8 @@ class CPUExecutor(ExecutorBase):
local_rank: int = 0, local_rank: int = 0,
rank: int = 0, rank: int = 0,
): ):
worker_module_name = "vllm.worker.cpu_worker"
worker_class_name = "CPUWorker"
wrapper = WorkerWrapperBase( wrapper = WorkerWrapperBase(vllm_config=self.vllm_config)
worker_module_name=worker_module_name,
worker_class_name=worker_class_name,
)
assert self.distributed_init_method is not None assert self.distributed_init_method is not None
......
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Type, Union from typing import Any, Dict, List, Optional, Set, Tuple, Union
from vllm.executor.executor_base import ExecutorAsyncBase, ExecutorBase from vllm.executor.executor_base import ExecutorAsyncBase, ExecutorBase
from vllm.logger import init_logger from vllm.logger import init_logger
...@@ -8,19 +8,14 @@ from vllm.prompt_adapter.request import PromptAdapterRequest ...@@ -8,19 +8,14 @@ from vllm.prompt_adapter.request import PromptAdapterRequest
from vllm.sequence import ExecuteModelRequest, PoolerOutput from vllm.sequence import ExecuteModelRequest, PoolerOutput
from vllm.utils import (get_distributed_init_method, get_ip, get_open_port, from vllm.utils import (get_distributed_init_method, get_ip, get_open_port,
make_async) make_async)
from vllm.worker.worker_base import WorkerBase, WorkerWrapperBase from vllm.worker.worker_base import WorkerWrapperBase
logger = init_logger(__name__) logger = init_logger(__name__)
def create_worker(worker_module_name: str, worker_class_name: str, def create_worker(**kwargs):
worker_class_fn: Optional[Callable[[], Type[WorkerBase]]], vllm_config = kwargs.get("vllm_config")
**kwargs): wrapper = WorkerWrapperBase(vllm_config=vllm_config)
wrapper = WorkerWrapperBase(
worker_module_name=worker_module_name,
worker_class_name=worker_class_name,
worker_class_fn=worker_class_fn,
)
wrapper.init_worker(**kwargs) wrapper.init_worker(**kwargs)
return wrapper.worker return wrapper.worker
...@@ -57,43 +52,11 @@ class GPUExecutor(ExecutorBase): ...@@ -57,43 +52,11 @@ class GPUExecutor(ExecutorBase):
or (rank % self.parallel_config.tensor_parallel_size == 0), or (rank % self.parallel_config.tensor_parallel_size == 0),
) )
def _get_worker_module_and_class(
self) -> Tuple[str, str, Optional[Callable[[], Type[WorkerBase]]]]:
worker_class_fn = None
if self.scheduler_config.is_multi_step:
worker_module_name = "vllm.worker.multi_step_worker"
worker_class_name = "MultiStepWorker"
elif self.speculative_config:
worker_module_name = "vllm.spec_decode.spec_decode_worker"
worker_class_name = "create_spec_worker"
else:
worker_module_name = "vllm.worker.worker"
worker_class_name = "Worker"
return (worker_module_name, worker_class_name, worker_class_fn)
def _get_create_worker_kwargs(
self,
local_rank: int = 0,
rank: int = 0,
distributed_init_method: Optional[str] = None) -> Dict:
worker_kwargs = self._get_worker_kwargs(local_rank, rank,
distributed_init_method)
(worker_module_name, worker_class_name,
worker_class_fn) = self._get_worker_module_and_class()
worker_kwargs.update(
worker_module_name=worker_module_name,
worker_class_name=worker_class_name,
worker_class_fn=worker_class_fn,
)
return worker_kwargs
def _create_worker(self, def _create_worker(self,
local_rank: int = 0, local_rank: int = 0,
rank: int = 0, rank: int = 0,
distributed_init_method: Optional[str] = None): distributed_init_method: Optional[str] = None):
return create_worker(**self._get_create_worker_kwargs( return create_worker(**self._get_worker_kwargs(
local_rank=local_rank, local_rank=local_rank,
rank=rank, rank=rank,
distributed_init_method=distributed_init_method)) distributed_init_method=distributed_init_method))
......
...@@ -48,10 +48,7 @@ class HPUExecutor(ExecutorBase): ...@@ -48,10 +48,7 @@ class HPUExecutor(ExecutorBase):
local_rank: int = 0, local_rank: int = 0,
rank: int = 0, rank: int = 0,
distributed_init_method: Optional[str] = None): distributed_init_method: Optional[str] = None):
wrapper = WorkerWrapperBase( wrapper = WorkerWrapperBase(vllm_config=self.vllm_config)
worker_module_name="vllm.worker.hpu_worker",
worker_class_name="HPUWorker",
)
wrapper.init_worker(**self._get_worker_kwargs(local_rank, rank, wrapper.init_worker(**self._get_worker_kwargs(local_rank, rank,
distributed_init_method)) distributed_init_method))
return wrapper.worker return wrapper.worker
......
...@@ -90,7 +90,7 @@ class MultiprocessingGPUExecutor(DistributedGPUExecutor): ...@@ -90,7 +90,7 @@ class MultiprocessingGPUExecutor(DistributedGPUExecutor):
result_handler, result_handler,
partial( partial(
create_worker, create_worker,
**self._get_create_worker_kwargs( **self._get_worker_kwargs(
rank=rank, rank=rank,
local_rank=rank, local_rank=rank,
distributed_init_method=distributed_init_method, distributed_init_method=distributed_init_method,
......
...@@ -7,6 +7,7 @@ from vllm.model_executor.layers.sampler import SamplerOutput ...@@ -7,6 +7,7 @@ from vllm.model_executor.layers.sampler import SamplerOutput
from vllm.sequence import ExecuteModelRequest from vllm.sequence import ExecuteModelRequest
from vllm.utils import (get_distributed_init_method, get_ip, get_open_port, from vllm.utils import (get_distributed_init_method, get_ip, get_open_port,
make_async) make_async)
from vllm.worker.worker_base import WorkerWrapperBase
logger = init_logger(__name__) logger = init_logger(__name__)
...@@ -25,10 +26,10 @@ class NeuronExecutor(ExecutorBase): ...@@ -25,10 +26,10 @@ class NeuronExecutor(ExecutorBase):
self._init_worker() self._init_worker()
def _init_worker(self): def _init_worker(self):
from vllm.worker.neuron_worker import NeuronWorker wrapper = WorkerWrapperBase(vllm_config=self.vllm_config)
distributed_init_method = get_distributed_init_method( distributed_init_method = get_distributed_init_method(
get_ip(), get_open_port()) get_ip(), get_open_port())
self.driver_worker = NeuronWorker( self.driver_worker = wrapper.init_worker(
vllm_config=self.vllm_config, vllm_config=self.vllm_config,
local_rank=0, local_rank=0,
rank=0, rank=0,
......
...@@ -14,6 +14,7 @@ from vllm.platforms import current_platform ...@@ -14,6 +14,7 @@ from vllm.platforms import current_platform
from vllm.sequence import ExecuteModelRequest from vllm.sequence import ExecuteModelRequest
from vllm.utils import (GiB_bytes, get_distributed_init_method, get_ip, from vllm.utils import (GiB_bytes, get_distributed_init_method, get_ip,
get_open_port, make_async) get_open_port, make_async)
from vllm.worker.worker_base import WorkerWrapperBase
logger = init_logger(__name__) logger = init_logger(__name__)
...@@ -38,15 +39,12 @@ class OpenVINOExecutor(ExecutorBase): ...@@ -38,15 +39,12 @@ class OpenVINOExecutor(ExecutorBase):
self._init_worker() self._init_worker()
def _init_worker(self): def _init_worker(self):
from vllm.worker.openvino_worker import OpenVINOWorker
assert ( wrapper = WorkerWrapperBase(vllm_config=self.vllm_config)
self.parallel_config.world_size == 1
), "OpenVINOExecutor only supports single CPU socket currently."
distributed_init_method = get_distributed_init_method( distributed_init_method = get_distributed_init_method(
get_ip(), get_open_port()) get_ip(), get_open_port())
self.driver_worker = OpenVINOWorker( self.driver_worker = wrapper.init_worker(
ov_core=self.ov_core, ov_core=self.ov_core,
vllm_config=self.vllm_config, vllm_config=self.vllm_config,
local_rank=0, local_rank=0,
......
...@@ -91,17 +91,6 @@ class RayGPUExecutor(DistributedGPUExecutor): ...@@ -91,17 +91,6 @@ class RayGPUExecutor(DistributedGPUExecutor):
return ray_remote_kwargs return ray_remote_kwargs
def _get_worker_wrapper_args(self) -> Dict[str, Any]:
(worker_module_name, worker_class_name,
worker_class_fn) = self._get_worker_module_and_class()
return dict(
worker_module_name=worker_module_name,
worker_class_name=worker_class_name,
worker_class_fn=worker_class_fn,
trust_remote_code=self.model_config.trust_remote_code,
)
# child class could overwrite this to return actual env vars. # child class could overwrite this to return actual env vars.
def _get_env_vars_to_be_updated(self): def _get_env_vars_to_be_updated(self):
return self._env_vars_for_all_workers return self._env_vars_for_all_workers
...@@ -135,7 +124,6 @@ class RayGPUExecutor(DistributedGPUExecutor): ...@@ -135,7 +124,6 @@ class RayGPUExecutor(DistributedGPUExecutor):
# Create the workers. # Create the workers.
driver_ip = get_ip() driver_ip = get_ip()
worker_wrapper_kwargs = self._get_worker_wrapper_args()
for bundle_id, bundle in enumerate(placement_group.bundle_specs): for bundle_id, bundle in enumerate(placement_group.bundle_specs):
if not bundle.get("GPU", 0): if not bundle.get("GPU", 0):
continue continue
...@@ -150,7 +138,7 @@ class RayGPUExecutor(DistributedGPUExecutor): ...@@ -150,7 +138,7 @@ class RayGPUExecutor(DistributedGPUExecutor):
num_gpus=num_gpus, num_gpus=num_gpus,
scheduling_strategy=scheduling_strategy, scheduling_strategy=scheduling_strategy,
**ray_remote_kwargs, **ray_remote_kwargs,
)(RayWorkerWrapper).remote(**worker_wrapper_kwargs) )(RayWorkerWrapper).remote(vllm_config=self.vllm_config)
if self.use_ray_spmd_worker: if self.use_ray_spmd_worker:
self.workers.append(worker) self.workers.append(worker)
...@@ -161,7 +149,7 @@ class RayGPUExecutor(DistributedGPUExecutor): ...@@ -161,7 +149,7 @@ class RayGPUExecutor(DistributedGPUExecutor):
# as the resource holder for the driver process. # as the resource holder for the driver process.
self.driver_dummy_worker = worker self.driver_dummy_worker = worker
self.driver_worker = RayWorkerWrapper( self.driver_worker = RayWorkerWrapper(
**worker_wrapper_kwargs) vllm_config=self.vllm_config)
else: else:
# Else, added to the list of workers. # Else, added to the list of workers.
self.workers.append(worker) self.workers.append(worker)
......
...@@ -2,8 +2,7 @@ import asyncio ...@@ -2,8 +2,7 @@ import asyncio
import os import os
from collections import defaultdict from collections import defaultdict
from itertools import islice, repeat from itertools import islice, repeat
from typing import (TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
Type)
import msgspec import msgspec
...@@ -18,7 +17,6 @@ from vllm.sequence import ExecuteModelRequest ...@@ -18,7 +17,6 @@ from vllm.sequence import ExecuteModelRequest
from vllm.utils import (_run_task_with_lock, get_distributed_init_method, from vllm.utils import (_run_task_with_lock, get_distributed_init_method,
get_ip, get_open_port, get_vllm_instance_id, get_ip, get_open_port, get_vllm_instance_id,
make_async) make_async)
from vllm.worker.worker_base import WorkerBase
if ray is not None: if ray is not None:
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
...@@ -81,33 +79,6 @@ class RayHPUExecutor(DistributedGPUExecutor): ...@@ -81,33 +79,6 @@ class RayHPUExecutor(DistributedGPUExecutor):
def finish_measurements(self): def finish_measurements(self):
self._run_workers("finish_measurements") self._run_workers("finish_measurements")
def _get_worker_module_and_class(
self
) -> Tuple[str, str, Optional[Callable[[],
Type[WorkerBase]]]]: # noqa: F821
worker_class_fn = None
if self.scheduler_config.is_multi_step:
raise NotImplementedError(
"Multi-step execution is not implemented for HPU")
elif self.speculative_config:
raise NotImplementedError(
"Speculative decoding is not implemented for HPU")
else:
worker_module_name = "vllm.worker.hpu_worker"
worker_class_name = "HPUWorker"
return (worker_module_name, worker_class_name, worker_class_fn)
def _get_worker_wrapper_args(self) -> Dict[str, Any]:
(worker_module_name, worker_class_name,
worker_class_fn) = self._get_worker_module_and_class()
return dict(
worker_module_name=worker_module_name,
worker_class_name=worker_class_name,
worker_class_fn=worker_class_fn,
trust_remote_code=self.model_config.trust_remote_code,
)
def _init_workers_ray(self, placement_group: "PlacementGroup", def _init_workers_ray(self, placement_group: "PlacementGroup",
**ray_remote_kwargs): **ray_remote_kwargs):
# Otherwise, the ray workers are allocated with a full GPU. # Otherwise, the ray workers are allocated with a full GPU.
...@@ -128,7 +99,6 @@ class RayHPUExecutor(DistributedGPUExecutor): ...@@ -128,7 +99,6 @@ class RayHPUExecutor(DistributedGPUExecutor):
# Create the workers. # Create the workers.
driver_ip = get_ip() driver_ip = get_ip()
worker_wrapper_kwargs = self._get_worker_wrapper_args()
for bundle_id, bundle in enumerate(placement_group.bundle_specs): for bundle_id, bundle in enumerate(placement_group.bundle_specs):
if not bundle.get("HPU", 0): if not bundle.get("HPU", 0):
continue continue
...@@ -144,7 +114,7 @@ class RayHPUExecutor(DistributedGPUExecutor): ...@@ -144,7 +114,7 @@ class RayHPUExecutor(DistributedGPUExecutor):
resources={'HPU': num_gpus}, resources={'HPU': num_gpus},
scheduling_strategy=scheduling_strategy, scheduling_strategy=scheduling_strategy,
**ray_remote_kwargs, **ray_remote_kwargs,
)(RayWorkerWrapper).remote(**worker_wrapper_kwargs) )(RayWorkerWrapper).remote(vllm_config=self.vllm_config)
if self.use_ray_spmd_worker: if self.use_ray_spmd_worker:
self.workers.append(worker) self.workers.append(worker)
...@@ -155,7 +125,7 @@ class RayHPUExecutor(DistributedGPUExecutor): ...@@ -155,7 +125,7 @@ class RayHPUExecutor(DistributedGPUExecutor):
# as the resource holder for the driver process. # as the resource holder for the driver process.
self.driver_dummy_worker = worker self.driver_dummy_worker = worker
self.driver_worker = RayWorkerWrapper( self.driver_worker = RayWorkerWrapper(
**worker_wrapper_kwargs) vllm_config=self.vllm_config)
else: else:
# Else, added to the list of workers. # Else, added to the list of workers.
self.workers.append(worker) self.workers.append(worker)
......
...@@ -69,14 +69,6 @@ class RayTPUExecutor(TPUExecutor): ...@@ -69,14 +69,6 @@ class RayTPUExecutor(TPUExecutor):
placement_group_bundle_index=bundle_id, placement_group_bundle_index=bundle_id,
) )
assert self.speculative_config is None
if self.scheduler_config.is_multi_step:
worker_module_name = "vllm.worker.multi_step_tpu_worker"
worker_class_name = "MultiStepTPUWorker"
else:
worker_module_name = "vllm.worker.tpu_worker"
worker_class_name = "TPUWorker"
# GKE does not fetch environment information from metadata server # GKE does not fetch environment information from metadata server
# and instead sets these from within the Ray process. Therefore we # and instead sets these from within the Ray process. Therefore we
# need to override the Ray environment variables manually. # need to override the Ray environment variables manually.
...@@ -95,11 +87,7 @@ class RayTPUExecutor(TPUExecutor): ...@@ -95,11 +87,7 @@ class RayTPUExecutor(TPUExecutor):
resources={"TPU": 1}, resources={"TPU": 1},
scheduling_strategy=scheduling_strategy, scheduling_strategy=scheduling_strategy,
**ray_remote_kwargs, **ray_remote_kwargs,
)(RayWorkerWrapper).remote( )(RayWorkerWrapper).remote(vllm_config=self.vllm_config)
worker_module_name=worker_module_name,
worker_class_name=worker_class_name,
trust_remote_code=self.model_config.trust_remote_code,
)
if override_env: if override_env:
worker.override_env_vars.remote(override_env) worker.override_env_vars.remote(override_env)
...@@ -109,10 +97,7 @@ class RayTPUExecutor(TPUExecutor): ...@@ -109,10 +97,7 @@ class RayTPUExecutor(TPUExecutor):
# as the resource holder for the driver process. # as the resource holder for the driver process.
self.driver_dummy_worker = worker self.driver_dummy_worker = worker
self.driver_worker = RayWorkerWrapper( self.driver_worker = RayWorkerWrapper(
worker_module_name=worker_module_name, vllm_config=self.vllm_config)
worker_class_name=worker_class_name,
trust_remote_code=self.model_config.trust_remote_code,
)
else: else:
# Else, added to the list of workers. # Else, added to the list of workers.
self.workers.append(worker) self.workers.append(worker)
......
from typing import Callable, List, Optional, Tuple, Type, Union from typing import List, Optional, Union
from vllm.executor.executor_base import ExecutorAsyncBase from vllm.executor.executor_base import ExecutorAsyncBase
from vllm.executor.gpu_executor import GPUExecutor from vllm.executor.gpu_executor import GPUExecutor
...@@ -6,7 +6,6 @@ from vllm.logger import init_logger ...@@ -6,7 +6,6 @@ from vllm.logger import init_logger
from vllm.model_executor.layers.sampler import SamplerOutput from vllm.model_executor.layers.sampler import SamplerOutput
from vllm.sequence import ExecuteModelRequest, PoolerOutput from vllm.sequence import ExecuteModelRequest, PoolerOutput
from vllm.utils import make_async from vllm.utils import make_async
from vllm.worker.worker_base import WorkerBase
logger = init_logger(__name__) logger = init_logger(__name__)
...@@ -22,17 +21,6 @@ class XPUExecutor(GPUExecutor): ...@@ -22,17 +21,6 @@ class XPUExecutor(GPUExecutor):
GPUExecutor._init_executor(self) GPUExecutor._init_executor(self)
def _get_worker_module_and_class(
self) -> Tuple[str, str, Optional[Callable[[], Type[WorkerBase]]]]:
worker_class_fn = None
if self.speculative_config is not None:
raise NotImplementedError(
"XPU does not support speculative decoding")
else:
worker_module_name = "vllm.worker.xpu_worker"
worker_class_name = "XPUWorker"
return (worker_module_name, worker_class_name, worker_class_fn)
def execute_model( def execute_model(
self, execute_model_req: ExecuteModelRequest self, execute_model_req: ExecuteModelRequest
) -> Optional[List[Union[SamplerOutput, PoolerOutput]]]: ) -> Optional[List[Union[SamplerOutput, PoolerOutput]]]:
......
...@@ -84,3 +84,5 @@ class CpuPlatform(Platform): ...@@ -84,3 +84,5 @@ class CpuPlatform(Platform):
"distributed executor backend."), "distributed executor backend."),
parallel_config.distributed_executor_backend) parallel_config.distributed_executor_backend)
parallel_config.distributed_executor_backend = "mp" parallel_config.distributed_executor_backend = "mp"
if parallel_config.worker_cls == "auto":
parallel_config.worker_cls = "vllm.worker.cpu_worker.CPUWorker"
...@@ -4,7 +4,7 @@ pynvml. However, it should not initialize cuda context. ...@@ -4,7 +4,7 @@ pynvml. However, it should not initialize cuda context.
import os import os
from functools import lru_cache, wraps from functools import lru_cache, wraps
from typing import Callable, List, Tuple, TypeVar from typing import TYPE_CHECKING, Callable, List, Tuple, TypeVar
import pynvml import pynvml
import torch import torch
...@@ -16,6 +16,11 @@ from vllm.logger import init_logger ...@@ -16,6 +16,11 @@ from vllm.logger import init_logger
from .interface import DeviceCapability, Platform, PlatformEnum from .interface import DeviceCapability, Platform, PlatformEnum
if TYPE_CHECKING:
from vllm.config import VllmConfig
else:
VllmConfig = None
logger = init_logger(__name__) logger = init_logger(__name__)
_P = ParamSpec("_P") _P = ParamSpec("_P")
...@@ -157,3 +162,17 @@ class CudaPlatform(Platform): ...@@ -157,3 +162,17 @@ class CudaPlatform(Platform):
" machine has no NVLink equipped.") " machine has no NVLink equipped.")
return False return False
return True return True
@classmethod
def check_and_update_config(cls, vllm_config: VllmConfig) -> None:
parallel_config = vllm_config.parallel_config
scheduler_config = vllm_config.scheduler_config
if parallel_config.worker_cls == "auto":
if scheduler_config.is_multi_step:
parallel_config.worker_cls = \
"vllm.worker.multi_step_worker.MultiStepWorker"
elif vllm_config.speculative_config:
parallel_config.worker_cls = \
"vllm.spec_decode.spec_decode_worker.create_spec_worker"
else:
parallel_config.worker_cls = "vllm.worker.worker.Worker"
from typing import TYPE_CHECKING
import torch import torch
from .interface import Platform, PlatformEnum, _Backend from .interface import Platform, PlatformEnum, _Backend
if TYPE_CHECKING:
from vllm.config import VllmConfig
else:
VllmConfig = None
class HpuPlatform(Platform): class HpuPlatform(Platform):
_enum = PlatformEnum.HPU _enum = PlatformEnum.HPU
...@@ -14,3 +21,19 @@ class HpuPlatform(Platform): ...@@ -14,3 +21,19 @@ class HpuPlatform(Platform):
@staticmethod @staticmethod
def inference_mode(): def inference_mode():
return torch.no_grad() return torch.no_grad()
@classmethod
def check_and_update_config(cls, vllm_config: VllmConfig) -> None:
scheduler_config = vllm_config.scheduler_config
if scheduler_config.is_multi_step:
raise NotImplementedError(
"Multi-step execution is not implemented for HPU")
if vllm_config.speculative_config is not None:
raise NotImplementedError(
"Speculative decoding is not implemented for HPU")
parallel_config = vllm_config.parallel_config
if parallel_config.worker_cls == "auto":
parallel_config.worker_cls = "vllm.worker.hpu_worker.HPUWorker"
from typing import TYPE_CHECKING
from .interface import Platform, PlatformEnum from .interface import Platform, PlatformEnum
if TYPE_CHECKING:
from vllm.config import VllmConfig
else:
VllmConfig = None
class NeuronPlatform(Platform): class NeuronPlatform(Platform):
_enum = PlatformEnum.NEURON _enum = PlatformEnum.NEURON
...@@ -8,3 +15,10 @@ class NeuronPlatform(Platform): ...@@ -8,3 +15,10 @@ class NeuronPlatform(Platform):
@classmethod @classmethod
def get_device_name(cls, device_id: int = 0) -> str: def get_device_name(cls, device_id: int = 0) -> str:
return "neuron" return "neuron"
@classmethod
def check_and_update_config(cls, vllm_config: VllmConfig) -> None:
parallel_config = vllm_config.parallel_config
if parallel_config.worker_cls == "auto":
parallel_config.worker_cls = \
"vllm.worker.neuron_worker.NeuronWorker"
from typing import TYPE_CHECKING
import torch import torch
import vllm.envs as envs import vllm.envs as envs
...@@ -5,6 +7,11 @@ from vllm.logger import init_logger ...@@ -5,6 +7,11 @@ from vllm.logger import init_logger
from .interface import Platform, PlatformEnum, _Backend from .interface import Platform, PlatformEnum, _Backend
if TYPE_CHECKING:
from vllm.config import VllmConfig
else:
VllmConfig = None
logger = init_logger(__name__) logger = init_logger(__name__)
...@@ -38,3 +45,14 @@ class OpenVinoPlatform(Platform): ...@@ -38,3 +45,14 @@ class OpenVinoPlatform(Platform):
def is_pin_memory_available(self) -> bool: def is_pin_memory_available(self) -> bool:
logger.warning("Pin memory is not supported on OpenViNO.") logger.warning("Pin memory is not supported on OpenViNO.")
return False return False
@classmethod
def check_and_update_config(cls, vllm_config: VllmConfig) -> None:
parallel_config = vllm_config.parallel_config
assert (
parallel_config.world_size == 1
), "OpenVINOExecutor only supports single CPU socket currently."
if parallel_config.worker_cls == "auto":
parallel_config.worker_cls = \
"vllm.worker.openvino_worker.OpenVINOWorker"
import os import os
from functools import lru_cache from functools import lru_cache
from typing import TYPE_CHECKING
import torch import torch
...@@ -7,6 +8,11 @@ from vllm.logger import init_logger ...@@ -7,6 +8,11 @@ from vllm.logger import init_logger
from .interface import DeviceCapability, Platform, PlatformEnum, _Backend from .interface import DeviceCapability, Platform, PlatformEnum, _Backend
if TYPE_CHECKING:
from vllm.config import VllmConfig
else:
VllmConfig = None
logger = init_logger(__name__) logger = init_logger(__name__)
try: try:
...@@ -58,3 +64,17 @@ class RocmPlatform(Platform): ...@@ -58,3 +64,17 @@ class RocmPlatform(Platform):
def get_device_total_memory(cls, device_id: int = 0) -> int: def get_device_total_memory(cls, device_id: int = 0) -> int:
device_props = torch.cuda.get_device_properties(device_id) device_props = torch.cuda.get_device_properties(device_id)
return device_props.total_memory return device_props.total_memory
@classmethod
def check_and_update_config(cls, vllm_config: VllmConfig) -> None:
parallel_config = vllm_config.parallel_config
scheduler_config = vllm_config.scheduler_config
if parallel_config.worker_cls == "auto":
if scheduler_config.is_multi_step:
parallel_config.worker_cls = \
"vllm.worker.multi_step_worker.MultiStepWorker"
elif vllm_config.speculative_config:
parallel_config.worker_cls = \
"vllm.spec_decode.spec_decode_worker.create_spec_worker"
else:
parallel_config.worker_cls = "vllm.worker.worker.Worker"
...@@ -48,3 +48,15 @@ class TpuPlatform(Platform): ...@@ -48,3 +48,15 @@ class TpuPlatform(Platform):
if compilation_config.backend == "": if compilation_config.backend == "":
compilation_config.backend = "openxla" compilation_config.backend = "openxla"
assert vllm_config.speculative_config is None, \
"TPU does not support speculative decoding"
parallel_config = vllm_config.parallel_config
scheduler_config = vllm_config.scheduler_config
if parallel_config.worker_cls == "auto":
if scheduler_config.is_multi_step:
parallel_config.worker_cls = \
"vllm.worker.multi_step_tpu_worker.MultiStepTPUWorker"
else:
parallel_config.worker_cls = "vllm.worker.tpu_worker.TPUWorker"
...@@ -57,6 +57,10 @@ class XPUPlatform(Platform): ...@@ -57,6 +57,10 @@ class XPUPlatform(Platform):
"mode.") "mode.")
model_config.enforce_eager = True model_config.enforce_eager = True
if vllm_config.speculative_config is not None:
raise NotImplementedError(
"XPU does not support speculative decoding")
# check and update parallel config # check and update parallel config
parallel_config = vllm_config.parallel_config parallel_config = vllm_config.parallel_config
if (parallel_config.distributed_executor_backend is not None if (parallel_config.distributed_executor_backend is not None
...@@ -66,3 +70,5 @@ class XPUPlatform(Platform): ...@@ -66,3 +70,5 @@ class XPUPlatform(Platform):
" executor backend.", " executor backend.",
parallel_config.distributed_executor_backend) parallel_config.distributed_executor_backend)
parallel_config.distributed_executor_backend = "ray" parallel_config.distributed_executor_backend = "ray"
if parallel_config.worker_cls == "auto":
parallel_config.worker_cls = "vllm.worker.xpu_worker.XPUWorker"
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment