Unverified Commit cb62e86f authored by ZhengHongming888's avatar ZhengHongming888 Committed by GitHub
Browse files

Add NUMA Core binding in nixl_connector for CPU xPyD (#32365)


Signed-off-by: default avatarHongming Zheng <hongming.zheng@intel.com>
Signed-off-by: default avatarZhengHongming888 <hongming.zheng@intel.com>
Co-authored-by: default avatargemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
parent 781ddf78
...@@ -926,6 +926,17 @@ class NixlConnectorWorker: ...@@ -926,6 +926,17 @@ class NixlConnectorWorker:
else: else:
self.use_host_buffer = self.kv_buffer_device == "cpu" self.use_host_buffer = self.kv_buffer_device == "cpu"
# reserve different cores for start_load_kv() from model_forward()
if self.device_type == "cpu":
numa_core_list = current_platform.discover_numa_topology()
# setup one last core in each numa for kv transfer.
rsv_cores_for_kv = [
max(each_numa_core_list) for each_numa_core_list in numa_core_list
]
if rsv_cores_for_kv:
os.sched_setaffinity(0, rsv_cores_for_kv)
# support for oot platform which can't register nixl memory # support for oot platform which can't register nixl memory
# type based on kv_buffer_device # type based on kv_buffer_device
nixl_memory_type = current_platform.get_nixl_memory_type() nixl_memory_type = current_platform.get_nixl_memory_type()
......
...@@ -213,6 +213,13 @@ class CpuPlatform(Platform): ...@@ -213,6 +213,13 @@ class CpuPlatform(Platform):
cache_config.cpu_kvcache_space_bytes = CpuPlatform.get_device_total_memory() cache_config.cpu_kvcache_space_bytes = CpuPlatform.get_device_total_memory()
# reserve at least one core for nixl_connector under p/d case
if vllm_config.kv_transfer_config and (
envs.VLLM_CPU_NUM_OF_RESERVED_CPU == 0
or envs.VLLM_CPU_NUM_OF_RESERVED_CPU is None
):
os.environ["VLLM_CPU_NUM_OF_RESERVED_CPU"] = "1"
parallel_config = vllm_config.parallel_config parallel_config = vllm_config.parallel_config
if ( if (
parallel_config.world_size > 1 parallel_config.world_size > 1
...@@ -396,6 +403,60 @@ class CpuPlatform(Platform): ...@@ -396,6 +403,60 @@ class CpuPlatform(Platform):
return allowed_numa_nodes_list, logical_cpu_list return allowed_numa_nodes_list, logical_cpu_list
@classmethod
def discover_numa_topology(cls) -> list[list[int]]:
"""
Discover NUMA topology and keep the last physical core of each numa
into one core group list for nixl start_kv_load()
"""
SYS_NODE = "/sys/devices/system/node"
SYS_CPU = "/sys/devices/system/cpu"
if not (os.path.exists(SYS_NODE) and os.path.exists(SYS_CPU)):
return []
core_rsv_for_kv = []
for node in os.listdir(SYS_NODE):
if not node.startswith("node") or not node[4:].isdigit():
continue
node_path = f"{SYS_NODE}/{node}"
seen_phys = set()
for cpu in os.listdir(node_path):
if not cpu.startswith("cpu") or not cpu[3:].isdigit():
continue
cpu_id = int(cpu[3:])
# thread_siblings based on cpu_id
path = f"{SYS_CPU}/cpu{cpu_id}/topology/thread_siblings_list"
if os.path.exists(path):
try:
with open(path) as f:
s = f.read()
cpus: list[int] = []
for part in s.strip().split(","):
if "-" in part:
a, b = map(int, part.split("-"))
cpus.extend(range(a, b + 1))
else:
cpus.append(int(part))
siblings = cpus if cpus else [cpu_id]
except (OSError, ValueError):
siblings = [cpu_id]
else:
siblings = [cpu_id]
phys = min(siblings)
if phys not in seen_phys:
seen_phys.add(phys)
if len(seen_phys) > 0:
core_rsv_for_kv.append(list(seen_phys))
return core_rsv_for_kv
@classmethod @classmethod
def is_pin_memory_available(cls) -> bool: def is_pin_memory_available(cls) -> bool:
return False return False
......
...@@ -54,6 +54,7 @@ class CPUWorker(Worker): ...@@ -54,6 +54,7 @@ class CPUWorker(Worker):
def init_device(self): def init_device(self):
# Setup OpenMP threads affinity. # Setup OpenMP threads affinity.
omp_cpuids = envs.VLLM_CPU_OMP_THREADS_BIND omp_cpuids = envs.VLLM_CPU_OMP_THREADS_BIND
# Under numa binding some cores reserved for kv transfer in nixl_connector.py
if omp_cpuids == "auto" and platform.system() == "Linux": if omp_cpuids == "auto" and platform.system() == "Linux":
cpu_arch = current_platform.get_cpu_architecture() cpu_arch = current_platform.get_cpu_architecture()
if cpu_arch in (CpuArchEnum.POWERPC, CpuArchEnum.S390X): if cpu_arch in (CpuArchEnum.POWERPC, CpuArchEnum.S390X):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment