"vscode:/vscode.git/clone" did not exist on "f6084f63248a89df52bed9d9c24d6604f87e51f3"
cpu.py 15.7 KB
Newer Older
1
# SPDX-License-Identifier: Apache-2.0
2
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
3

4
import glob
5
import json
6
import os
7
import platform
8
import subprocess
9
import sys
10
from dataclasses import dataclass
11
from importlib.util import find_spec
12
from typing import TYPE_CHECKING
13

14
import regex as re
15
16
import torch

17
from vllm import envs
18
from vllm.logger import init_logger
19
from vllm.utils import DEFAULT_MAX_NUM_BATCHED_TOKENS
20

21
from .interface import CpuArchEnum, Platform, PlatformEnum
22
23

logger = init_logger(__name__)
24

25
if TYPE_CHECKING:
26
    from vllm.attention.backends.registry import _Backend
27
28
    from vllm.config import VllmConfig
else:
29
    _Backend = None
30
31
    VllmConfig = None

32

33
def get_max_threads(pid=0):
34
    if hasattr(os, "sched_getaffinity"):
35
        return len(os.sched_getaffinity(pid))
36
    elif platform.system() == "Darwin":
37
38
39
40
41
        return os.cpu_count()
    else:
        raise NotImplementedError("Unsupported OS")


42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
@dataclass
class LogicalCPUInfo:
    id: int = -1
    physical_core: int = -1
    numa_node: int = -1

    @classmethod
    def _int(cls, value: str) -> int:
        try:
            int_value = int(value)
        except Exception:
            int_value = -1
        return int_value

    @staticmethod
    def json_decoder(obj_dict: dict):
        id = obj_dict.get("cpu")
        physical_core = obj_dict.get("core")
        numa_node = obj_dict.get("node")

        if not (id is None or physical_core is None or numa_node is None):
            return LogicalCPUInfo(
                id=LogicalCPUInfo._int(id),
                physical_core=LogicalCPUInfo._int(physical_core),
66
67
                numa_node=LogicalCPUInfo._int(numa_node),
            )
68
69
70
71
        else:
            return obj_dict


72
73
class CpuPlatform(Platform):
    _enum = PlatformEnum.CPU
74
    device_name: str = "cpu"
75
    device_type: str = "cpu"
76
    dispatch_key: str = "CPU"
77
    dist_backend: str = "gloo"
78
    device_control_env_var = "CPU_VISIBLE_MEMORY_NODES"
79

80
    @property
81
    def supported_dtypes(self) -> list[torch.dtype]:
82
83
        if self.get_cpu_architecture() == CpuArchEnum.POWERPC:
            return [torch.bfloat16, torch.float32]
84
85
86
87
88
89
90
91
92
        elif self.get_cpu_architecture() == CpuArchEnum.ARM and sys.platform.startswith(
            "darwin"
        ):
            if (
                subprocess.check_output(
                    ["sysctl -n hw.optional.arm.FEAT_BF16"], shell=True
                ).strip()
                == b"1"
            ):
93
                return [torch.bfloat16, torch.float16, torch.float32]
94
            return [torch.float16, torch.float32]
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
        elif self.get_cpu_architecture() == CpuArchEnum.RISCV:
            # Workaround for Issue #25655: RISC-V scheduler bug with float16
            #
            # Background:
            # - RISC-V currently uses scalar code path
            # - There is a latent bug in the vLLM scheduler that provides
            # invalid
            #   physical_block_idx values under certain conditions
            # - This bug causes segmentation faults when using float16
            # dtype on RISC-V
            # - Testing shows that forcing float32 successfully bypasses
            # this issue
            #
            # Technical details:
            # - The bug manifests as out-of-bounds physical_block_idx in
            # block_tables
            # - Only occurs on RISC-V hardware
            # tested on Sophgo SG2044
            # - Does not reproduce on x86 or other architectures
            # - Root cause is in Python-level scheduling logic,
            # not C++ kernels
            #
            # This is a temporary workaround until the scheduler bug is fixed.
            # See: https://github.com/vllm-project/vllm/issues/25655
            return [torch.float32]
120
121
122
        # x86/aarch64 CPU has supported both bf16 and fp16 natively.
        return [torch.bfloat16, torch.float16, torch.float32]

123
124
    @classmethod
    def get_device_name(cls, device_id: int = 0) -> str:
125
126
        return "cpu"

127
    @classmethod
128
129
130
131
132
    def get_attn_backend_cls(
        cls,
        selected_backend: "_Backend",
        head_size: int,
        dtype: torch.dtype,
133
        kv_cache_dtype: str | None,
134
135
136
137
138
139
        block_size: int,
        use_v1: bool,
        use_mla: bool,
        has_sink: bool,
        use_sparse: bool,
    ) -> str:
140
        from vllm.attention.backends.registry import _Backend
141

142
        if selected_backend and selected_backend != _Backend.TORCH_SDPA:
143
            logger.info("Cannot use %s backend on CPU.", selected_backend)
Thien Tran's avatar
Thien Tran committed
144
        if use_mla:
145
            raise NotImplementedError("MLA is not supported on CPU.")
146
        if use_sparse:
147
            raise NotImplementedError("Sparse Attention is not supported on CPU.")
148
        logger.info("Using Torch SDPA backend.")
149
150
151
        if not use_v1:
            raise ValueError("CPU backend only supports V1.")
        return "vllm.v1.attention.backends.cpu_attn.TorchSDPABackend"
152

153
154
    @classmethod
    def get_device_total_memory(cls, device_id: int = 0) -> int:
155
        from vllm.utils.mem_constants import GiB_bytes
156
157
158
159
160
161

        kv_cache_space = envs.VLLM_CPU_KVCACHE_SPACE
        if kv_cache_space is None:
            kv_cache_space = 4 * GiB_bytes  # type: ignore
            logger.warning_once(
                "Environment variable VLLM_CPU_KVCACHE_SPACE (GiB) "
162
163
                "for CPU backend is not set, using 4 by default."
            )
164
165
166
167
        else:
            kv_cache_space *= GiB_bytes

        return kv_cache_space
168

169
170
171
172
173
174
175
    @classmethod
    def set_device(cls, device: torch.device) -> None:
        """
        Set the device for the current platform.
        """
        torch.cpu.set_device(device)

176
177
    @classmethod
    def inference_mode(cls):
178
        return torch.no_grad()
179
180
181
182
183

    @classmethod
    def check_and_update_config(cls, vllm_config: VllmConfig) -> None:
        model_config = vllm_config.model_config

184
185
        if model_config is not None:
            model_config.disable_cascade_attn = True
186

187
188
        cache_config = vllm_config.cache_config

189
        ipex_available = find_spec("intel_extension_for_pytorch") is not None
190

191
        if cache_config and cache_config.block_size is None:
192
            cache_config.block_size = 128 if ipex_available else 16
193

194
        if not ipex_available and cache_config.block_size != 16:
195
196
            raise RuntimeError(
                f"--block-size={cache_config.block_size} requires"
197
198
                " intel_extension_for_pytorch"
            )
199

200
        scheduler_config = vllm_config.scheduler_config
201
202
203
204
205
206
207
208
        if (
            scheduler_config.chunked_prefill_enabled
            or cache_config.enable_prefix_caching
        ) and cache_config.cache_dtype != "auto":
            raise RuntimeError(
                "Chunked-prefill and prefix-cache on the CPU "
                "backend is not compatible with FP8 KV cache."
            )
209
210
211
212

        if cache_config.cache_dtype == "fp8_e4m3":
            cache_config.cache_dtype = "fp8_e5m2"
            logger.warning(
213
214
215
216
217
218
219
220
221
222
223
224
                "CPU backend doesn't support fp8_e4m3 KV cache type, cast to fp8_e5m2."
            )

        if (
            cache_config.cache_dtype != "auto"
            and model_config is not None
            and model_config.dtype == torch.half
        ):
            logger.warning(
                "FP8 KV cache on the CPU backend only does not"
                " support fp16 for now, cast to bf16."
            )
225
226
            model_config.dtype = torch.bfloat16

227
        cache_config.cpu_kvcache_space_bytes = CpuPlatform.get_device_total_memory()
228
229

        parallel_config = vllm_config.parallel_config
230
231
232
233
234
235
236
237
238
239
240
241
        if (
            parallel_config.world_size > 1
            and parallel_config.distributed_executor_backend is not None
            and parallel_config.distributed_executor_backend != "mp"
        ):
            logger.warning(
                (
                    "%s is not supported on CPU, fallback to mp "
                    "distributed executor backend."
                ),
                parallel_config.distributed_executor_backend,
            )
242
            parallel_config.distributed_executor_backend = "mp"
243
        if parallel_config.worker_cls == "auto":
244
            parallel_config.worker_cls = "vllm.v1.worker.cpu_worker.CPUWorker"
245
246
        # Disable DBO
        if parallel_config.enable_dbo:
247
            logger.warning("Dual-Batch Overlap is not supported on CPU, disabled.")
248
            parallel_config.enable_dbo = False
249
250

        # Note: workaround for v1 gpu_model_runner
251
        from vllm.config import CompilationMode
252

253
254
255
        vllm_config.compilation_config.cudagraph_capture_sizes = []

        compilation_config = vllm_config.compilation_config
256
        if vllm_config.compilation_config.mode == CompilationMode.VLLM_COMPILE:
257
258
259
260
261
262
263
264
265
266
267
268
            # Note: vLLM V1 is using PIECEWISE level compilation, which will
            # take time to compile kernels just-in-time with the inductor
            # backend. For CPU CI tests, most of them are executed fast and
            # compilations consume too much time, even with torch compile
            # cache. So use VLLM_CPU_CI_ENV to indicate the CI environment,
            # and just execute model with dynamo + eager mode to save time.
            # VLLM_CPU_CI_ENV is only used as an internal variable.
            if os.environ.get("VLLM_CPU_CI_ENV", "0") != "0":
                backend = "eager"
            else:
                backend = "inductor"

269
            compilation_config.mode = CompilationMode.DYNAMO_TRACE_ONCE
270
            compilation_config.backend = backend
271
272
273
274
275
276
277
278
            compilation_config.inductor_compile_config.update(
                {
                    "dce": True,
                    "size_asserts": False,
                    "nan_asserts": False,
                    "epilogue_fusion": True,
                }
            )
279
280

        if vllm_config.lora_config is not None:
281
            compilation_config.mode = CompilationMode.NONE
282

283
284
285
286
287
288
        assert vllm_config.device_config.device_type == "cpu"

        #
        # Environment variables for CPU executor
        #

289
290
291
        os.environ["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn"

        # Note: to avoid the error 'nthreads cannot be larger than environment
292
        # variable "NUMEXPR_MAX_THREADS" (64)'.
293
        os.environ["NUMEXPR_MAX_THREADS"] = str(get_max_threads())
294

295
296
297
298
299
300
301
        if envs.VLLM_CPU_OMP_THREADS_BIND != "nobind":
            # Set default threads num for OpenMP parallel
            os.environ["OMP_NUM_THREADS"] = str(torch.get_num_threads())
        else:
            # In this case, setting the OpenMP configuration via
            # OMP_NUM_THREADS is up to the user.
            logger.info("Disabling binding processes to CPU cores...")
302

303
304
305
        # Disable torch async compiling which won't work with daemonic processes
        os.environ["TORCHINDUCTOR_COMPILE_THREADS"] = "1"

306
        # Disable multi-stream for shared experts as no Stream on CPU
307
        os.environ["VLLM_DISABLE_SHARED_EXPERTS_STREAM"] = "1"
308

309
        # Intel OpenMP setting
310
311
        ld_preload_str = os.getenv("LD_PRELOAD", "")
        if "libiomp5.so" in ld_preload_str:
312
313
            # The time(milliseconds) that a thread should wait after
            # completing the execution of a parallel region, before sleeping.
314
            os.environ["KMP_BLOCKTIME"] = "1"
315
            # Prevents the CPU to run into low performance state
316
            os.environ["KMP_TPAUSE"] = "0"
317
            # Provides fine granularity parallelism
318
319
320
            os.environ["KMP_FORKJOIN_BARRIER_PATTERN"] = "dist,dist"
            os.environ["KMP_PLAIN_BARRIER_PATTERN"] = "dist,dist"
            os.environ["KMP_REDUCTION_BARRIER_PATTERN"] = "dist,dist"
321

322
323
        if (
            platform.system() == "Linux"
324
325
            and Platform.get_cpu_architecture()
            in (CpuArchEnum.ARM, CpuArchEnum.POWERPC)
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
            and not ("libomp" in ld_preload_str or "libgomp" in ld_preload_str)
        ):
            # We need to LD_PRELOAD PyTorch's libgomp, otherwise only
            # one core will be properly utilized when we thread-bind
            # See: https://github.com/vllm-project/vllm/issues/27369
            # TODO: Remove once:
            # https://github.com/pytorch/pytorch/issues/166087 is fixed

            # We need to find the location of PyTorch's libgomp
            torch_pkg = os.path.dirname(torch.__file__)
            site_root = os.path.dirname(torch_pkg)
            torch_libs = os.path.join(site_root, "torch.libs")
            pytorch_libgomp_so_candidates = glob.glob(
                os.path.join(torch_libs, "libgomp-*.so*")
            )
            if pytorch_libgomp_so_candidates:
                pytorch_libgomp_so = pytorch_libgomp_so_candidates[0]
                if ld_preload_str:
                    ld_preload_str += ":"
                ld_preload_str += pytorch_libgomp_so
                os.environ["LD_PRELOAD"] = ld_preload_str

348
349
        # To hint IPEX uses shared memory based AllReduce
        os.environ["LOCAL_WORLD_SIZE"] = str(
350
351
            vllm_config.parallel_config.tensor_parallel_size
        )
352

353
        if model_config is not None and model_config.use_mla:
354
355
            logger.info(
                "MLA is enabled on a non-GPU platform; forcing chunked "
356
357
                "prefill and prefix caching to be disabled."
            )
358
359
360
361
            vllm_config.scheduler_config.enable_chunked_prefill = False
            vllm_config.scheduler_config.chunked_prefill_enabled = False
            vllm_config.scheduler_config.max_num_batched_tokens = max(
                vllm_config.scheduler_config.max_model_len,
362
363
                DEFAULT_MAX_NUM_BATCHED_TOKENS,
            )
364

365
    @classmethod
366
    def get_allowed_cpu_core_node_list(cls) -> tuple[list[int], list[LogicalCPUInfo]]:
367
368
369
        assert platform.system() == "Linux"

        # Init LogicalCPUInfo from lscpu
370
371
372
        lscpu_output = subprocess.check_output(
            "lscpu -J -e=CPU,CORE,NODE", shell=True, text=True
        )
373
        lscpu_output = re.sub(r'"node":\s*-\s*(,|\n)', r'"node": 0\1', lscpu_output)
374
        logical_cpu_list: list[LogicalCPUInfo] = json.loads(
375
376
            lscpu_output, object_hook=LogicalCPUInfo.json_decoder
        )["cpus"]
377
378
379

        # Filter CPUs with invalid attributes
        logical_cpu_list = [
380
381
            x
            for x in logical_cpu_list
382
383
384
385
            if -1 not in (x.id, x.physical_core, x.numa_node)
        ]

        # Filter allowed CPUs
386
387
388
389
390
        if hasattr(os, "sched_getaffinity"):
            allowed_cpu_id_list = os.sched_getaffinity(0)
        else:
            raise NotImplementedError("Unsupported OS")
        logical_cpu_list = [x for x in logical_cpu_list if x.id in allowed_cpu_id_list]
391
392
393
394
395
396
397

        # Get allowed NUMA nodes
        allowed_numa_nodes = set()
        for x in logical_cpu_list:
            allowed_numa_nodes.add(x.numa_node)  # type: ignore
        allowed_numa_nodes_list = sorted(allowed_numa_nodes)

398
        env_key = CpuPlatform.device_control_env_var
399
400
        if env_key in os.environ and os.environ[env_key] != "":
            visible_nodes = [int(s) for s in os.environ[env_key].split(",")]
401
402
403
404
            allowed_numa_nodes_list = [
                x for x in visible_nodes if x in allowed_cpu_id_list
            ]

405
406
        return allowed_numa_nodes_list, logical_cpu_list

407
408
409
410
    @classmethod
    def is_pin_memory_available(cls) -> bool:
        logger.warning("Pin memory is not supported on CPU.")
        return False
411
412
413
414

    @classmethod
    def get_punica_wrapper(cls) -> str:
        return "vllm.lora.punica_wrapper.punica_cpu.PunicaWrapperCPU"
415
416
417
418
419
420
421

    @classmethod
    def get_device_communicator_cls(cls) -> str:
        """
        Get device specific communicator class for distributed communication.
        """
        return "vllm.distributed.device_communicators.cpu_communicator.CpuCommunicator"  # noqa
422
423
424
425
426

    @classmethod
    def supports_structured_output(cls) -> bool:
        return True

427
428
429
    @classmethod
    def opaque_attention_op(cls) -> bool:
        return True
430
431
432
433

    @classmethod
    def support_hybrid_kv_cache(cls) -> bool:
        return True