Unverified Commit 2601f18a authored by WeiQing Chen's avatar WeiQing Chen Committed by GitHub
Browse files

[EPLB] Optimize EPLB for Async Rearrange Experts (#22179)


Signed-off-by: default avatarDavid Chen <530634352@qq.com>
Co-authored-by: default avatarSunChenxiang123 <1291824390@qq.com>
parent 4de87866
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import asyncio
import random
import pytest
import torch
import torch.distributed
from vllm.distributed.eplb.rebalance_execute import rearrange_expert_weights_inplace
from vllm.distributed.eplb.rebalance_execute import (
move_from_buffer,
rearrange_expert_weights_inplace,
transfer_layer,
)
from vllm.distributed.parallel_state import (
ensure_model_parallel_initialized,
get_tp_group,
......@@ -231,6 +236,100 @@ def verify_redundant_experts_have_same_weights(
)
def _test_async_transfer_layer_without_mtp_worker(
env,
world_size: int,
num_layers: int,
num_local_experts: int,
num_logical_experts: int,
) -> None:
set_env_vars_and_device(env)
ensure_model_parallel_initialized(
tensor_model_parallel_size=world_size, pipeline_model_parallel_size=1
)
tp_group = get_tp_group()
ep_group = tp_group.device_group
ep_rank = torch.distributed.get_rank()
device = torch.device(f"cuda:{ep_rank}")
total_physical_experts = world_size * num_local_experts
hidden_sizes = [16, 32]
redundancy_config = create_redundancy_config(
num_logical_experts,
total_physical_experts,
)
old_indices = create_expert_indices_with_redundancy(
num_layers,
num_logical_experts,
total_physical_experts,
redundancy_config,
)
new_redundancy_config = create_redundancy_config(
num_logical_experts,
total_physical_experts,
)
new_indices = create_expert_indices_with_redundancy(
num_layers,
num_logical_experts,
total_physical_experts,
new_redundancy_config,
)
expert_weights = create_expert_weights(
num_layers,
num_local_experts,
hidden_sizes,
ep_rank,
device,
old_indices,
)
expert_buffer = [torch.empty_like(w) for w in expert_weights[0]]
cuda_stream = torch.cuda.Stream(device=device)
for layer_idx in range(num_layers):
is_unchanged, is_received_locally, experts_recv_loc = asyncio.run(
transfer_layer(
old_global_expert_indices=old_indices,
new_global_expert_indices=new_indices,
expert_weights=expert_weights,
expert_weights_buffer=expert_buffer,
ep_group=ep_group,
layer=layer_idx,
cuda_stream=cuda_stream,
)
)
cuda_stream.synchronize()
move_from_buffer(
expert_weights=expert_weights[layer_idx],
expert_weights_buffer=expert_buffer,
is_unchanged=is_unchanged,
is_received_locally=is_received_locally,
experts_recv_loc=experts_recv_loc,
new_indices=new_indices[layer_idx].tolist(),
ep_group=ep_group,
)
verify_expert_weights_after_shuffle(
expert_weights,
new_indices,
hidden_sizes,
ep_rank,
num_local_experts,
)
verify_redundant_experts_have_same_weights(
expert_weights,
new_indices,
hidden_sizes,
world_size,
num_local_experts,
)
def _test_rearrange_expert_weights_with_redundancy(
env, world_size, num_layers, num_local_experts, num_logical_experts
) -> None:
......@@ -399,6 +498,32 @@ def _test_rearrange_expert_weights_no_change(env, world_size) -> None:
)
@pytest.mark.parametrize(
"world_size,num_layers,num_local_experts,num_logical_experts",
[
(2, 2, 2, 3),
],
)
def test_async_transfer_layer_without_mtp(
world_size: int,
num_layers: int,
num_local_experts: int,
num_logical_experts: int,
):
"""Exercise async EPLB transfer path without MTP/spec decode."""
if torch.cuda.device_count() < world_size:
pytest.skip(f"Need at least {world_size} GPUs to run the test")
distributed_run(
_test_async_transfer_layer_without_mtp_worker,
world_size,
num_layers,
num_local_experts,
num_logical_experts,
)
@pytest.mark.parametrize("world_size", [2, 4])
def test_rearrange_expert_weights_no_change(world_size):
"""
......
......@@ -10,10 +10,11 @@ from tests.utils import large_gpu_mark
def get_model_args(
model_name: str,
spec_model_name: str,
spec_model_name: str | None,
spec_method: str,
tp_size: int,
model_max_len: int,
use_async: bool = False,
) -> dict:
speculative_config = {
"method": spec_method,
......@@ -37,6 +38,8 @@ def get_model_args(
"enable_eplb": True,
"max_model_len": model_max_len,
}
if use_async:
model_args["eplb_config"] = {"use_async": True}
return model_args
......@@ -94,3 +97,37 @@ def test_eplb_spec_decode(
measured_value - RTOL < expected_gsm8k_value
and measured_value + RTOL > expected_gsm8k_value
), f"Expected: {expected_gsm8k_value} | Measured: {measured_value}"
@large_gpu_mark(min_gb=80)
def test_eplb_spec_decode_qwen3_next_mtp_async() -> None:
"""
Ensure async EPLB works with MTP speculative decoding for Qwen3-Next.
"""
TASK = "gsm8k"
FILTER = "exact_match,strict-match"
RTOL = 0.03
expected_gsm8k_value = 0.86
model_args = get_model_args(
model_name="Qwen/Qwen3-Next-80B-A3B-Instruct",
spec_model_name=None,
spec_method="mtp",
tp_size=4,
model_max_len=4096,
use_async=True,
)
results = lm_eval.simple_evaluate(
model="vllm",
model_args=model_args,
tasks=TASK,
batch_size=64,
num_fewshot=8,
)
measured_value = results["results"][TASK][FILTER]
assert (
measured_value - RTOL < expected_gsm8k_value
and measured_value + RTOL > expected_gsm8k_value
), f"Expected: {expected_gsm8k_value} | Measured: {measured_value}"
......@@ -60,6 +60,10 @@ class EPLBConfig:
Log the balancedness each step of expert parallelism.
This is turned off by default since it will cause communication overhead.
"""
use_async: bool = False
"""
Whether to use non-blocking EPLB.
"""
@config
......
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
"""
The async worker that transfers experts in the background.
"""
import asyncio
import threading
from typing import TYPE_CHECKING
import torch
from torch.distributed import ProcessGroup
from vllm.distributed.parallel_state import get_ep_group
from vllm.logger import init_logger
from .rebalance_execute import transfer_layer
if TYPE_CHECKING:
from .eplb_state import EplbState
logger = init_logger(__name__)
def start_async_worker(
state: "EplbState",
rank_mapping: dict[int, int] | None = None,
is_profile: bool = False,
) -> threading.Thread:
ep_group = get_ep_group().device_group
rank = ep_group.rank()
device_index = state.cuda_device_index
def thread_target() -> None:
assert device_index is not None
torch.cuda.set_device(device_index)
cuda_stream = torch.cuda.Stream(device=device_index)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(
transfer_run_periodically(
state=state,
ep_group=ep_group,
is_profile=is_profile,
rank_mapping=rank_mapping,
cuda_stream=cuda_stream,
)
)
except Exception as exc: # pragma: no cover - diagnostic path
logger.exception("async loop error (Rank %d): %s", rank, str(exc))
finally:
loop.close()
thread = threading.Thread(target=thread_target, daemon=True)
thread.start()
return thread
async def transfer_run_periodically(
state: "EplbState",
ep_group: ProcessGroup,
is_profile: bool = False,
rank_mapping: dict[int, int] | None = None,
cuda_stream: torch.cuda.Stream = None,
) -> None:
while True:
await asyncio.to_thread(state.rearrange_event.wait)
logger.info("async worker woke up for EPLB transfer")
for model_state in state.model_states.values():
if not model_state.is_async_enabled:
continue
current_num_layers = model_state.model.num_moe_layers
while (
model_state.rebalanced
and model_state.layer_to_transfer < current_num_layers
):
if (
not model_state.ep_buffer_ready
and model_state.rebalanced
and model_state.new_physical_to_logical_map is not None
):
await asyncio.to_thread(model_state.buffer_lock.acquire)
try:
if model_state.layer_to_transfer >= current_num_layers:
break
(
model_state.is_unchanged,
model_state.is_received_locally,
model_state.experts_recv_loc,
) = await transfer_layer(
old_global_expert_indices=model_state.physical_to_logical_map,
new_global_expert_indices=model_state.new_physical_to_logical_map,
expert_weights=model_state.model.expert_weights,
expert_weights_buffer=model_state.expert_buffer,
ep_group=ep_group,
is_profile=is_profile,
layer=model_state.layer_to_transfer,
cuda_stream=cuda_stream,
rank_mapping=rank_mapping,
)
event = torch.cuda.Event(blocking=False)
cuda_stream.record_event(event)
model_state.buffer_ready_event = event
model_state.ep_buffer_ready = 1
finally:
model_state.buffer_lock.release()
else:
if not model_state.rebalanced:
break
await asyncio.sleep(0.001)
state.rearrange_event.clear()
This diff is collapsed.
......@@ -100,18 +100,19 @@ def get_ep_ranks_with_expert(
return ranks_to_send, ranks_to_recv_actual
def shuffle_layer(
def move_to_buffer(
num_local_experts: int,
ep_rank: int,
old_indices: Sequence[int],
new_indices: Sequence[int],
expert_weights: Iterable[torch.Tensor],
expert_weights_buffer: Sequence[torch.Tensor],
cuda_stream: torch.cuda.Stream | None,
ep_group: ProcessGroup,
) -> None:
) -> tuple[list[bool], list[bool], dict[int, int]]:
"""
Perform expert weights rearrangement of one layer.
"""
ep_rank = ep_group.rank()
local2global = partial(
idx_local_to_global,
local_cnt=num_local_experts,
......@@ -137,7 +138,8 @@ def shuffle_layer(
if old_indices[src_global] == new_indices[dst_global]:
is_received_locally[dst] = True
for weight, buffer in zip(expert_weights, expert_weights_buffer):
buffer[dst].copy_(weight[src])
with torch.cuda.stream(cuda_stream):
buffer[dst].copy_(weight[src], non_blocking=True)
p2p_ops: list[P2POp] = []
......@@ -225,25 +227,115 @@ def shuffle_layer(
]
# 4. Execute the P2P operations. The real communication happens here.
if p2p_ops:
if p2p_ops and cuda_stream is not None:
with torch.cuda.stream(cuda_stream):
reqs = batch_isend_irecv(p2p_ops)
for req in reqs:
req.wait()
elif p2p_ops:
reqs = batch_isend_irecv(p2p_ops)
for req in reqs:
req.wait()
# wait for the communication to finish
return is_unchanged, is_received_locally, experts_recv_loc
def move_from_buffer(
expert_weights: Iterable[torch.Tensor],
expert_weights_buffer: list[torch.Tensor],
is_unchanged: list[bool],
is_received_locally: list[bool],
experts_recv_loc: dict[int, int],
new_indices: Sequence[int],
ep_group: ProcessGroup,
) -> None:
ep_rank = ep_group.rank()
num_local_experts = len(is_unchanged)
local2global = partial(
idx_local_to_global, local_cnt=num_local_experts, ep_rank=ep_rank
)
# 5. Copy the weights from the buffer back to the original weights.
for dst in range(num_local_experts):
if is_unchanged[dst]:
continue
if is_received_locally[dst]:
for weight, buffer in zip(expert_weights, expert_weights_buffer):
weight[dst].copy_(buffer[dst])
weight[dst].copy_(buffer[dst], non_blocking=True)
else:
expert = new_indices[local2global(dst)]
if expert == -1:
continue
src = experts_recv_loc[expert]
for weight, buffer in zip(expert_weights, expert_weights_buffer):
weight[dst].copy_(buffer[src])
weight[dst].copy_(buffer[src], non_blocking=True)
async def transfer_layer(
old_global_expert_indices: torch.Tensor,
new_global_expert_indices: torch.Tensor,
expert_weights: Sequence[Iterable[torch.Tensor]],
expert_weights_buffer: Sequence[torch.Tensor],
ep_group: ProcessGroup,
is_profile: bool = False,
layer: int = 0,
cuda_stream: torch.cuda.Stream | None = None,
rank_mapping: dict[int, int] | None = None,
) -> tuple[list[bool], list[bool], dict[int, int]]:
"""
Rearranges the expert weights in place according to the new expert indices.
The value of the indices arguments are logical indices of the experts,
while keys are physical.
Args:
old_global_expert_indices: Shape (num_moe_layers, num_physical_experts).
new_global_expert_indices: Shape (num_moe_layers, num_physical_experts).
expert_weights: A sequence of shape (num_moe_layers)(weight_count)
of tensors of shape (num_local_physical_experts, hidden_size_i).
For example, a linear layer may have up and down projection,
so weight_count = 2. Each weight's hidden size can be different.
ep_group: The device process group for expert parallelism.
is_profile (bool): If `True`, do not perform any actual weight copy.
This is used during profile run, where we only perform dummy
communications to reserve enough memory for the buffers.
"""
ep_size = ep_group.size()
if rank_mapping is not None:
if len(rank_mapping) == ep_group.size():
# scale down
new_global_expert_indices = _map_new_expert_indices_with_rank_mapping(
new_global_expert_indices,
rank_mapping,
)
else:
# scale up
old_global_expert_indices = _map_old_expert_indices_with_rank_mapping(
old_global_expert_indices,
rank_mapping,
ep_group.size(),
)
assert old_global_expert_indices.shape[1] == new_global_expert_indices.shape[1]
num_moe_layers, num_physical_experts = old_global_expert_indices.shape
assert len(expert_weights) == num_moe_layers
num_local_physical_experts = next(iter(expert_weights[0])).shape[0]
assert new_global_expert_indices.shape == (num_moe_layers, num_physical_experts)
assert num_physical_experts == ep_size * num_local_physical_experts
# A buffer to hold the expert weights in one layer during the exchange.
# NOTE: Currently we assume the same weights across different layers
# have the same shape.
is_unchanged, is_received_locally, experts_recv_loc = move_to_buffer(
num_local_experts=num_local_physical_experts,
old_indices=old_global_expert_indices[layer].tolist(),
new_indices=new_global_expert_indices[layer].tolist(),
expert_weights=expert_weights[layer],
expert_weights_buffer=expert_weights_buffer,
cuda_stream=cuda_stream,
ep_group=ep_group,
)
return is_unchanged, is_received_locally, experts_recv_loc
def rearrange_expert_weights_inplace(
......@@ -296,7 +388,6 @@ def rearrange_expert_weights_inplace(
num_local_physical_experts = next(iter(expert_weights[0])).shape[0]
assert new_global_expert_indices.shape == (num_moe_layers, num_physical_experts)
ep_rank = ep_group.rank()
ep_size = ep_group.size()
assert num_physical_experts == ep_size * num_local_physical_experts
......@@ -329,14 +420,24 @@ def rearrange_expert_weights_inplace(
torch.cuda.synchronize()
for layer in range(num_moe_layers):
shuffle_layer(
num_local_physical_experts,
ep_rank,
old_global_expert_indices_cpu[layer].tolist(),
new_global_expert_indices_cpu[layer].tolist(),
expert_weights[layer],
expert_weights_buffer,
ep_group,
is_unchanged, is_received_locally, experts_recv_loc = move_to_buffer(
num_local_experts=num_local_physical_experts,
old_indices=old_global_expert_indices_cpu[layer].tolist(),
new_indices=new_global_expert_indices_cpu[layer].tolist(),
expert_weights=expert_weights[layer],
expert_weights_buffer=expert_weights_buffer,
cuda_stream=None,
ep_group=ep_group,
)
move_from_buffer(
expert_weights=expert_weights[layer],
expert_weights_buffer=expert_weights_buffer,
is_unchanged=is_unchanged,
is_received_locally=is_received_locally,
experts_recv_loc=experts_recv_loc,
new_indices=new_global_expert_indices[layer].tolist(),
ep_group=ep_group,
)
......@@ -428,4 +529,4 @@ def _map_new_expert_indices_with_rank_mapping(
return mapped_expert_indices
__all__ = ["rearrange_expert_weights_inplace"]
__all__ = ["transfer_layer", "move_from_buffer"]
......@@ -3370,6 +3370,8 @@ class GPUModelRunner(
old_global_expert_indices,
rank_mapping,
)
if self.eplb_state.is_async:
self.eplb_state.start_async_loop(rank_mapping=rank_mapping)
if (
self.vllm_config.compilation_config.mode
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment