# SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright contributors to the vLLM project import os import sys from typing import Any import torch from vllm.config import VllmConfig from vllm.logger import init_logger from vllm.platforms import CpuArchEnum, current_platform from vllm.profiler.wrapper import TorchProfilerWrapper from vllm.utils.torch_utils import set_random_seed from vllm.v1.worker.cpu_model_runner import CPUModelRunner from vllm.v1.worker.gpu_worker import Worker, init_worker_distributed_environment logger = init_logger(__name__) class CPUWorker(Worker): def __init__( self, vllm_config: VllmConfig, local_rank: int, rank: int, distributed_init_method: str, is_driver_worker: bool = False, ): super().__init__( vllm_config, local_rank, rank, distributed_init_method, is_driver_worker=is_driver_worker, ) self.parallel_config.disable_custom_all_reduce = True # Torch profiler. Enabled and configured through profiler_config. self.profiler: Any | None = None profiler_config = vllm_config.profiler_config if profiler_config.profiler == "torch": worker_name = f"{vllm_config.instance_id}-rank-{self.rank}" self.profiler = TorchProfilerWrapper( profiler_config, worker_name=worker_name, local_rank=self.local_rank, activities=["CPU"], ) def init_device(self): # Check whether critical libraries are loaded def check_preloaded_libs(name: str): ld_preload_list = os.environ.get("LD_PRELOAD", "") if name not in ld_preload_list: logger.warning( "%s is not found in LD_PRELOAD. " "For best performance, please follow the section " "`set LD_PRELOAD` in " "https://docs.vllm.ai/en/latest/getting_started/installation/cpu/ " "to setup required pre-loaded libraries.", name, ) if sys.platform.startswith("linux"): check_preloaded_libs("libtcmalloc") if current_platform.get_cpu_architecture() == CpuArchEnum.X86: check_preloaded_libs("libiomp") def skip_set_num_threads(x: int): logger.warning( "CPU backend doesn't allow to use " "`torch.set_num_threads` after the thread binding, skip it." ) torch.set_num_threads = skip_set_num_threads # Note: unique identifier for creating allreduce shared memory os.environ["VLLM_DIST_IDENT"] = self.distributed_init_method.split(":")[-1] # Initialize the distributed environment. init_worker_distributed_environment( self.vllm_config, self.rank, self.distributed_init_method, self.local_rank, current_platform.dist_backend, ) # Set random seed. set_random_seed(self.model_config.seed) # Construct the model runner self.model_runner: CPUModelRunner = CPUModelRunner( self.vllm_config, torch.device("cpu") ) def sleep(self, level: int = 1) -> None: logger.warning("sleep mode is not supported on CPU, ignore it.") pass def wake_up(self, tags: list[str] | None = None) -> None: logger.warning("sleep mode is not supported on CPU, ignore it.") pass def determine_available_memory(self) -> int: return self.cache_config.cpu_kvcache_space_bytes or 0 def compile_or_warm_up_model(self) -> float: # Reset the seed to ensure that the random state is not affected by # the model initialization and profiling. set_random_seed(self.model_config.seed) self.model_runner.warming_up_model() return self.compilation_config.compilation_time def profile(self, is_start: bool = True, profile_prefix: str | None = None): if self.profiler is None: raise RuntimeError("Profiler is not enabled.") if is_start: self.profiler.start() else: self.profiler.stop()