Unverified Commit cbd361fd authored by Charles Ashby's avatar Charles Ashby Committed by GitHub
Browse files

[CPU][Distributed] Fix Enable _CPUSHMDistributed only when TP/PP ranks share...


[CPU][Distributed] Fix Enable _CPUSHMDistributed only when TP/PP ranks share the same SHM group name (#34169)
Signed-off-by: default avatarCharles Ashby <charlesa.l@hotmail.com>
parent c212202d
...@@ -35,8 +35,15 @@ class CpuCommunicator(DeviceCommunicatorBase): ...@@ -35,8 +35,15 @@ class CpuCommunicator(DeviceCommunicatorBase):
) )
and hasattr(torch.ops._C, "init_shm_manager") and hasattr(torch.ops._C, "init_shm_manager")
and (unique_name.startswith("tp") or unique_name.startswith("pp")) and (unique_name.startswith("tp") or unique_name.startswith("pp"))
and self._all_group_ranks_share_shm_group_name()
): ):
self.dist_module = _CPUSHMDistributed(self) self.dist_module = _CPUSHMDistributed(self)
elif unique_name.startswith("tp") or unique_name.startswith("pp"):
logger.info(
"CPU SHM communicator disabled for group %s: ranks do not share "
"the same SHM group name, falling back to torch.distributed.",
unique_name,
)
if self.use_all2all: if self.use_all2all:
if self.all2all_backend != "naive": # type: ignore[has-type] if self.all2all_backend != "naive": # type: ignore[has-type]
...@@ -52,6 +59,20 @@ class CpuCommunicator(DeviceCommunicatorBase): ...@@ -52,6 +59,20 @@ class CpuCommunicator(DeviceCommunicatorBase):
self.all2all_manager = NaiveAll2AllManager(self.cpu_group) self.all2all_manager = NaiveAll2AllManager(self.cpu_group)
logger.info("Using naive all2all manager.") logger.info("Using naive all2all manager.")
def _all_group_ranks_share_shm_group_name(self) -> bool:
"""
CPUSHM requires all ranks in this group to agree on one SHM group name.
This is a lightweight consistency check for VLLM_DIST_IDENT/name inputs.
"""
local_name = _CPUSHMDistributed.make_group_name(self)
names: list[str] = [""] * self.world_size
torch.distributed.all_gather_object(
names,
local_name,
group=self.device_group,
)
return len(set(names)) == 1
def all_reduce(self, input_): def all_reduce(self, input_):
self.dist_module.all_reduce(input_, group=self.device_group) self.dist_module.all_reduce(input_, group=self.device_group)
return input_ return input_
...@@ -193,17 +214,21 @@ class CpuCommunicator(DeviceCommunicatorBase): ...@@ -193,17 +214,21 @@ class CpuCommunicator(DeviceCommunicatorBase):
class _CPUSHMDistributed: class _CPUSHMDistributed:
def __init__(self, communicator: CpuCommunicator): def __init__(self, communicator: CpuCommunicator):
instance_identifier = os.environ["VLLM_DIST_IDENT"]
unique_name = communicator.unique_name
instance_identifier = f"{instance_identifier}-{unique_name}"
self.communicator = communicator self.communicator = communicator
group_ranks = [str(rank) for rank in self.communicator.ranks] self.group_name = self.make_group_name(communicator)
shm_group_identifier = f"[{'-'.join(group_ranks)}]"
self.group_name = f"{instance_identifier}-{shm_group_identifier}-cpushm"
self.handle = self._init_cpu_shm() self.handle = self._init_cpu_shm()
@staticmethod
def make_group_name(communicator: CpuCommunicator) -> str:
instance_identifier = os.environ["VLLM_DIST_IDENT"]
unique_name = communicator.unique_name
instance_identifier = f"{instance_identifier}-{unique_name}"
group_ranks = [str(rank) for rank in communicator.ranks]
shm_group_identifier = f"[{'-'.join(group_ranks)}]"
return f"{instance_identifier}-{shm_group_identifier}-cpushm"
def _init_cpu_shm(self) -> int: def _init_cpu_shm(self) -> int:
thread_num_tensor = torch.tensor( thread_num_tensor = torch.tensor(
[torch.get_num_threads()], [torch.get_num_threads()],
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment