cpu_worker.py 6.06 KB
Newer Older
1
2
# SPDX-License-Identifier: Apache-2.0
import os
3
from importlib import util
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
from typing import Optional

import torch

from vllm import envs
from vllm.config import VllmConfig
from vllm.distributed.parallel_state import get_pp_group, get_tp_group
from vllm.logger import init_logger
from vllm.model_executor.utils import set_random_seed
from vllm.sequence import IntermediateTensors
from vllm.v1.core.sched.output import SchedulerOutput
from vllm.v1.outputs import ModelRunnerOutput
from vllm.v1.worker.cpu_model_runner import CPUModelRunner
from vllm.v1.worker.gpu_worker import (Worker,
                                       init_worker_distributed_environment)

logger = init_logger(__name__)


class CPUWorker(Worker):

    def __init__(self,
                 vllm_config: VllmConfig,
                 local_rank: int,
                 rank: int,
                 distributed_init_method: str,
                 is_driver_worker: bool = False):
        super().__init__(vllm_config,
                         local_rank,
                         rank,
                         distributed_init_method,
                         is_driver_worker=is_driver_worker)

        self.parallel_config.disable_custom_all_reduce = True

    def init_device(self):
        # Setup OpenMP threads affinity.
        omp_cpuids = envs.VLLM_CPU_OMP_THREADS_BIND
42
43
44
45
        self.local_omp_cpuid = "all"
        if omp_cpuids == "auto":
            self.local_omp_cpuid = self.get_cpus_id_binding_based_on_numa_nodes(
            )
46
47
        else:
            self.local_omp_cpuid = omp_cpuids.split("|")[self.rank]
48
49

        if self.local_omp_cpuid != "all":
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
            ret = torch.ops._C_utils.init_cpu_threads_env(self.local_omp_cpuid)
            if ret:
                logger.info(ret)

        # Note: unique identifier for creating allreduce shared memory
        os.environ["VLLM_DIST_IDENT"] = self.distributed_init_method.split(
            ":")[-1]
        # Initialize the distributed environment.
        init_worker_distributed_environment(self.vllm_config, self.rank,
                                            self.distributed_init_method,
                                            self.local_rank, "gloo")
        # Set random seed.
        set_random_seed(self.model_config.seed)

        # Construct the model runner
        self.model_runner: CPUModelRunner = CPUModelRunner(
            self.vllm_config, torch.device("cpu"))

    def sleep(self, level: int = 1) -> None:
        logger.warning("sleep mode is not supported on CPU, ignore it.")
        pass

    def wake_up(self, tags: Optional[list[str]] = None) -> None:
        logger.warning("sleep mode is not supported on CPU, ignore it.")
        pass

    def determine_available_memory(self) -> int:
        return self.cache_config.cpu_kvcache_space_bytes  # type: ignore

    def compile_or_warm_up_model(self) -> None:
        # Reset the seed to ensure that the random state is not affected by
        # the model initialization and profiling.
        set_random_seed(self.model_config.seed)
        self.model_runner.warming_up_model()

    @torch.inference_mode()
    def execute_model(
        self,
        scheduler_output: "SchedulerOutput",
    ) -> Optional[ModelRunnerOutput]:
        intermediate_tensors = None
        if not get_pp_group().is_first_rank:
            intermediate_tensors = IntermediateTensors(
                get_pp_group().recv_tensor_dict(
                    all_gather_group=get_tp_group()))

        output = self.model_runner.execute_model(scheduler_output,
                                                 intermediate_tensors)

        if not get_pp_group().is_last_rank:
            assert isinstance(output, IntermediateTensors)
            get_pp_group().send_tensor_dict(output.tensors,
                                            all_gather_group=get_tp_group())
            return None

        assert isinstance(output, ModelRunnerOutput)
        return output if self.is_driver_worker else None
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152

    def get_cpus_id_binding_based_on_numa_nodes(self) -> str:
        """Return CPUs id binding based on NUMA nodes.
        """
        rank_to_cpus = self.local_omp_cpuid
        # Setup OpenMP thread affinity based on NUMA nodes automatically
        world_size = self.vllm_config.parallel_config.world_size
        libnuma_found = util.find_spec("numa") is not None
        psutil_found = util.find_spec("psutil") is not None
        if libnuma_found and psutil_found:
            import psutil
            from numa import info
            cpu_count = psutil.cpu_count(logical=False)
            cpus_allow_list = psutil.Process().cpu_affinity()
            numa_size = info.get_num_configured_nodes()
            cpu_count_per_numa = cpu_count // numa_size
            num_of_reserved_cpu = min(envs.VLLM_CPU_NUM_OF_RESERVED_CPU,
                                      cpu_count_per_numa // 2)

            # check allow node_to_cpus list
            node_to_cpus = []
            for i in range(numa_size):
                node_intersect = set(
                    info.node_to_cpus(i)).intersection(cpus_allow_list)
                if bool(node_intersect):
                    node_to_cpus.append(list(node_intersect))

            if world_size > len(node_to_cpus):
                logger.error(
                    "Auto thread-binding failed due to "
                    "world size: %d is larger than "
                    "allowed NUMA nodes number: %d."
                    "Please try to bind threads manually.", world_size,
                    len(node_to_cpus))
            else:
                end = cpu_count_per_numa - num_of_reserved_cpu
                rank_to_cpus_list = node_to_cpus[self.rank][:end]
                rank_to_cpus = ','.join(str(x) for x in rank_to_cpus_list)
                logger.info("auto thread-binding list: %s", rank_to_cpus)
        else:
            logger.warning(
                "Auto thread-binding is not supported due to "
                "the lack of package numa and psutil,"
                "fallback to no thread-binding. To get better performance,"
                "please try to manually bind threads.")
        return rank_to_cpus