import os from typing import Optional, Tuple from vllm.config import (CacheConfig, DeviceConfig, LoadConfig, LoRAConfig, ModelConfig, ObservabilityConfig, ParallelConfig, PromptAdapterConfig, SchedulerConfig, SpeculativeConfig) from vllm.logger import init_logger from vllm.utils import get_distributed_init_method, get_ip, get_open_port from vllm.v1.outputs import ModelRunnerOutput from vllm.v1.worker.gpu_worker import Worker logger = init_logger(__name__) class GPUExecutor: def __init__( self, model_config: ModelConfig, cache_config: CacheConfig, parallel_config: ParallelConfig, scheduler_config: SchedulerConfig, device_config: DeviceConfig, load_config: LoadConfig, lora_config: Optional[LoRAConfig], speculative_config: Optional[SpeculativeConfig], prompt_adapter_config: Optional[PromptAdapterConfig], observability_config: Optional[ObservabilityConfig], ) -> None: self.model_config = model_config self.cache_config = cache_config self.lora_config = lora_config self.load_config = load_config self.parallel_config = parallel_config self.scheduler_config = scheduler_config self.device_config = device_config self.speculative_config = speculative_config self.prompt_adapter_config = prompt_adapter_config self.observability_config = observability_config self.worker = self._create_worker() self.worker.initialize() self.worker.load_model() def _create_worker( self, local_rank: int = 0, rank: int = 0, distributed_init_method: Optional[str] = None) -> Worker: """Return worker init args for a given rank.""" # see https://github.com/NVIDIA/nccl/issues/1234 os.environ['NCCL_CUMEM_ENABLE'] = '0' if distributed_init_method is None: distributed_init_method = get_distributed_init_method( get_ip(), get_open_port()) return Worker( vllm_config=self.vllm_config, local_rank=local_rank, rank=rank, distributed_init_method=distributed_init_method, ) def determine_num_available_blocks(self) -> Tuple[int, int]: """Determine the number of available KV blocks by invoking the underlying worker. """ return self.worker.determine_num_available_blocks() def initialize_cache(self, num_gpu_blocks: int) -> None: """Initialize the KV cache by invoking the underlying worker. """ # NOTE: This is logged in the executor because there can be >1 worker # with other executors. We could log in the engine level, but work # remains to abstract away the device for non-GPU configurations. logger.info("# GPU blocks: %d", num_gpu_blocks) self.worker.initialize_cache(num_gpu_blocks) self.worker.compile_or_warm_up_model() def execute_model( self, scheduler_output, ) -> ModelRunnerOutput: output = self.worker.execute_model(scheduler_output) return output def check_health(self) -> None: # GPUExecutor will always be healthy as long as # it's running. return