Unverified Commit 3461c8b0 authored by Sage Moore's avatar Sage Moore Committed by GitHub
Browse files

[EPLB] Refactor Async EPLB synchronization logic (#37601)


Signed-off-by: default avatarSage Moore <sage@neuralmagic.com>
Co-authored-by: default avatarTyler Michael Smith <tyler@neuralmagic.com>
parent 726efe17
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import threading
import time
import torch
from vllm.distributed.eplb.eplb_utils import CpuGpuEvent
def test_wait_blocks_until_record():
event = CpuGpuEvent()
record_stream = torch.cuda.Stream()
wait_stream = torch.cuda.Stream()
wait_returned = threading.Event()
def waiter():
event.wait(stream=wait_stream)
wait_returned.set()
t = threading.Thread(target=waiter)
t.start()
time.sleep(0.05)
assert not wait_returned.is_set(), "wait() returned before record() was called"
event.record(stream=record_stream)
t.join(timeout=5.0)
assert not event._recorded.is_set()
def test_reuse_across_multiple_cycles():
wrapper = CpuGpuEvent()
record_stream = torch.cuda.Stream()
wait_stream = torch.cuda.Stream()
NUM_CYCLES = 8
completed_cycles = []
barriers = [threading.Barrier(2) for _ in range(NUM_CYCLES)]
def waiter():
for i in range(NUM_CYCLES):
wrapper.wait(stream=wait_stream)
completed_cycles.append(True)
barriers[i].wait()
t = threading.Thread(target=waiter)
t.start()
for i in range(NUM_CYCLES):
wrapper.record(stream=record_stream)
barriers[i].wait()
t.join(timeout=10.0)
assert len(completed_cycles) == NUM_CYCLES
def test_producer_consumer():
"""
This test uses the CpuGpuEvent to synchronize reads and writes to/from a shared GPU
tensor on multiple CPU threads.
"""
worker_stream = torch.cuda.Stream()
# Create a single element counter that will be shared between two threads
buf = torch.zeros(1, device="cuda")
NUM_ROUNDS = 5
ready_cpu = [threading.Event() for _ in range(NUM_ROUNDS)]
events = [CpuGpuEvent() for _ in range(NUM_ROUNDS)]
errors: list[str] = []
# For each round, the worker thread (writer) sets the counter in buf and waits for
# the main thread to read it.
def worker():
for i in range(NUM_ROUNDS):
if i > 0:
events[i - 1].wait(stream=worker_stream)
with torch.cuda.stream(worker_stream):
buf.fill_(float(i + 1))
worker_stream.synchronize()
ready_cpu[i].set()
t = threading.Thread(target=worker)
t.start()
for i in range(NUM_ROUNDS):
ready_cpu[i].wait()
snapshot = buf.clone()
events[i].record()
val = snapshot.item()
if val != float(i + 1):
errors.append(f"round {i}: expected {i + 1:.1f}, got {val:.1f}")
t.join(timeout=10.0)
assert not errors, f"Buffer ordering errors: {errors}"
...@@ -80,7 +80,7 @@ def test_commit_eplb_maps_for_layer_logical_padding(): ...@@ -80,7 +80,7 @@ def test_commit_eplb_maps_for_layer_logical_padding():
.contiguous() .contiguous()
) )
layer = 0 layer = 0
_commit_eplb_maps_for_layer(model_state, new_phy2log, layer) _commit_eplb_maps_for_layer(model_state, new_phy2log[layer], layer)
assert torch.all(model_state.logical_to_physical_map[layer, :, 2] == -1) assert torch.all(model_state.logical_to_physical_map[layer, :, 2] == -1)
...@@ -143,7 +143,7 @@ def test_commit_eplb_maps_for_layer(): ...@@ -143,7 +143,7 @@ def test_commit_eplb_maps_for_layer():
) )
new_logcnt = torch.tensor([[2, 1, 1], [1, 2, 1]], dtype=torch.long) new_logcnt = torch.tensor([[2, 1, 1], [1, 2, 1]], dtype=torch.long)
_commit_eplb_maps_for_layer(model_state, new_phy2log, layer=0) _commit_eplb_maps_for_layer(model_state, new_phy2log[0], layer=0)
# Layer 0 updated # Layer 0 updated
assert torch.equal(model_state.physical_to_logical_map[0], new_phy2log[0]) assert torch.equal(model_state.physical_to_logical_map[0], new_phy2log[0])
......
...@@ -14,7 +14,8 @@ from torch.distributed import ProcessGroup ...@@ -14,7 +14,8 @@ from torch.distributed import ProcessGroup
from vllm.distributed.parallel_state import get_eplb_group from vllm.distributed.parallel_state import get_eplb_group
from vllm.logger import init_logger from vllm.logger import init_logger
from .rebalance_execute import transfer_layer from .eplb_utils import CpuGpuEvent
from .rebalance_execute import AsyncEplbLayerResult, transfer_layer
if TYPE_CHECKING: if TYPE_CHECKING:
from .eplb_state import EplbModelState, EplbState from .eplb_state import EplbModelState, EplbState
...@@ -60,18 +61,14 @@ def run_rebalance_experts( ...@@ -60,18 +61,14 @@ def run_rebalance_experts(
model_state: "EplbModelState", model_state: "EplbModelState",
eplb_state: "EplbState", eplb_state: "EplbState",
physical_to_logical_map_cpu: torch.Tensor, physical_to_logical_map_cpu: torch.Tensor,
) -> None: cuda_stream: torch.cuda.Stream,
) -> torch.Tensor:
assert model_state.eplb_stats is not None assert model_state.eplb_stats is not None
eplb_stats = model_state.eplb_stats eplb_stats = model_state.eplb_stats
# Wait for the main thread's all-reduce and clone to complete before
# accessing the global_expert_load_window tensor.
assert model_state.window_ready_event is not None
model_state.window_ready_event.wait()
model_state.window_ready_event = None
# Move the global expert load window to CPU for computation. # Move the global expert load window to CPU for computation.
global_expert_load_window = eplb_stats.global_expert_load_window.cpu() with torch.cuda.stream(cuda_stream):
global_expert_load_window = eplb_stats.global_expert_load_window.cpu()
# Compute new expert mappings for the model # Compute new expert mappings for the model
new_physical_to_logical_map = eplb_state.policy.rebalance_experts( new_physical_to_logical_map = eplb_state.policy.rebalance_experts(
global_expert_load_window, global_expert_load_window,
...@@ -83,7 +80,7 @@ def run_rebalance_experts( ...@@ -83,7 +80,7 @@ def run_rebalance_experts(
) )
assert new_physical_to_logical_map.device == torch.device("cpu") assert new_physical_to_logical_map.device == torch.device("cpu")
model_state.new_physical_to_logical_map = new_physical_to_logical_map return new_physical_to_logical_map
async def transfer_run_periodically( async def transfer_run_periodically(
...@@ -93,85 +90,71 @@ async def transfer_run_periodically( ...@@ -93,85 +90,71 @@ async def transfer_run_periodically(
is_profile: bool = False, is_profile: bool = False,
) -> None: ) -> None:
while True: while True:
await asyncio.to_thread(state.rearrange_event.wait) state.rearrange_event.wait(stream=cuda_stream)
logger.info("async worker woke up for EPLB transfer") logger.info("async worker woke up for EPLB transfer")
assert state.is_async assert state.is_async
for model_state in state.model_states.values(): for model_state in state.model_states.values():
layer_idx = 0
# Set the async worker's CUDA stream on the communicator # Set the async worker's CUDA stream on the communicator
model_state.communicator.set_stream(cuda_stream) model_state.communicator.set_stream(cuda_stream)
rebalancing_algorithm_executed = False num_layers = model_state.model.num_moe_layers
physical_to_logical_map_cpu = None
current_num_layers = model_state.model.num_moe_layers # Snapshot the physical_to_logical_map (synchronized with
while ( # rearrange_event) and copy it to CPU
model_state.rebalanced with torch.cuda.stream(cuda_stream):
and model_state.layer_to_transfer < current_num_layers physical_to_logical_map_cpu = model_state.physical_to_logical_map.cpu()
):
if not model_state.ep_buffer_ready and model_state.rebalanced: new_physical_to_logical_map = run_rebalance_experts(
# Polling the lock directly in the async thread avoids model_state, state, physical_to_logical_map_cpu, cuda_stream
# the thread switch overhead of asyncio.to_thread. )
# This is typically faster than offloading to a worker thread. logger.info(
while not model_state.buffer_lock.acquire(blocking=False): "Async worker computed new indices for model %s",
await asyncio.sleep(0) model_state.model_name,
try: )
if model_state.layer_to_transfer >= current_num_layers:
break # Execute one EPLB layer transfer per model forward pass. Each iteration
if ( # of this loop will copy the new set of expert weights into
not rebalancing_algorithm_executed # model_state.expert_buffer, which will be consumed by the main thread in
or model_state.new_physical_to_logical_map is None # move_to_workspace
): while model_state.rebalanced and layer_idx < num_layers:
# Move the physical_to_logical_map to CPU (
# for rebalancing and transfer_layer. is_unchanged,
physical_to_logical_map_cpu = ( is_received_locally,
model_state.physical_to_logical_map.cpu() recv_metadata,
) ) = await transfer_layer(
run_rebalance_experts( old_layer_indices=physical_to_logical_map_cpu[layer_idx],
model_state, state, physical_to_logical_map_cpu new_layer_indices=new_physical_to_logical_map[layer_idx],
) expert_weights=model_state.model.expert_weights[layer_idx],
rebalancing_algorithm_executed = True expert_weights_buffer=model_state.expert_buffer,
logger.info( communicator=model_state.communicator,
"Async worker computed new indices for model %s", ep_group=eplb_group,
model_state.model_name, is_profile=is_profile,
) cuda_stream=cuda_stream,
)
assert model_state.new_physical_to_logical_map is not None
assert physical_to_logical_map_cpu is not None # Wait until all writes to expert_buffer have finished before making the
# AsyncEplbLayerResult visible to the main thread.
layer_idx = model_state.layer_to_transfer cuda_stream.synchronize()
old_layer_indices = physical_to_logical_map_cpu[layer_idx]
new_layer_indices = model_state.new_physical_to_logical_map[ # This event guarantees that expert_buffer will not be overwritten by
layer_idx # subsequent iterations of this loop until the main thread has consumed
] # it. Record is called by the main thread after move_from_buffer().
consumed_event = CpuGpuEvent()
# Wait for the main thread to finish consuming the buffer
# before initiating an EPLB transfer on another layer. model_state.pending_result = AsyncEplbLayerResult(
if model_state.buffer_consumed_event is not None: layer_idx=layer_idx,
cuda_stream.wait_event(model_state.buffer_consumed_event) new_physical_to_logical_map=new_physical_to_logical_map[layer_idx],
model_state.buffer_consumed_event = None is_unchanged=is_unchanged,
is_received_locally=is_received_locally,
( recv_metadata=recv_metadata,
model_state.is_unchanged, consumed_event=consumed_event,
model_state.is_received_locally, )
model_state.recv_metadata,
) = await transfer_layer( # Block this thread until the main thread and main stream
old_layer_indices=old_layer_indices, # finish copying model_state.expert_buffer into
new_layer_indices=new_layer_indices, # model_state.model.expert_weights[layer_idx]
expert_weights=model_state.model.expert_weights[layer_idx], consumed_event.wait(stream=cuda_stream)
expert_weights_buffer=model_state.expert_buffer, logger.debug("Layer %d transfer complete", layer_idx)
ep_group=eplb_group, assert model_state.pending_result is None
communicator=model_state.communicator, layer_idx += 1
is_profile=is_profile,
cuda_stream=cuda_stream,
)
# block the async thread until the transfer to
# the intermediate buffer is complete.
cuda_stream.synchronize()
model_state.ep_buffer_ready = 1
finally:
model_state.buffer_lock.release()
else:
if not model_state.rebalanced:
break
await asyncio.sleep(0.001)
state.rearrange_event.clear()
...@@ -30,7 +30,6 @@ import threading ...@@ -30,7 +30,6 @@ import threading
from collections.abc import Sequence from collections.abc import Sequence
from dataclasses import dataclass from dataclasses import dataclass
import numpy as np
import torch import torch
from torch.distributed import ProcessGroup, all_reduce from torch.distributed import ProcessGroup, all_reduce
...@@ -48,9 +47,10 @@ from vllm.model_executor.models.interfaces import MixtureOfExperts ...@@ -48,9 +47,10 @@ from vllm.model_executor.models.interfaces import MixtureOfExperts
from .async_worker import start_async_worker from .async_worker import start_async_worker
from .eplb_communicator import EplbCommunicator, create_eplb_communicator from .eplb_communicator import EplbCommunicator, create_eplb_communicator
from .eplb_utils import CpuGpuEvent
from .policy import EPLB_POLICIES, AbstractEplbPolicy, DefaultEplbPolicy from .policy import EPLB_POLICIES, AbstractEplbPolicy, DefaultEplbPolicy
from .rebalance_execute import ( from .rebalance_execute import (
RecvMetadata, AsyncEplbLayerResult,
move_from_buffer, move_from_buffer,
rearrange_expert_weights_inplace, rearrange_expert_weights_inplace,
) )
...@@ -174,55 +174,20 @@ class EplbModelState: ...@@ -174,55 +174,20 @@ class EplbModelState:
""" """
The buffer to store the expert weights during transfer. The buffer to store the expert weights during transfer.
""" """
buffer_lock: threading.Lock
"""
The lock to protect the expert buffer.
"""
buffer_consumed_event: torch.cuda.Event | None
"""
CUDA event recorded after the main thread finishes consuming the buffer.
The async worker waits on this before writing to the buffer again.
"""
window_ready_event: torch.cuda.Event | None
"""
CUDA event recorded after all-reduce and clone on the main thread.
The async worker waits on this before accessing global_expert_load_window.
"""
ep_buffer_ready: int
"""
The flag indicates whether the expert buffer is ready for transfer.
0 or 1.
"""
layer_to_transfer: int
"""
The layer index to transfer in async mode.
"""
rebalanced: bool rebalanced: bool
""" """
The flag indicates whether the experts rebalance have been computed. This flag is only used when running Async EPLB. It is set to True by the main thread
""" after the new expert maps have been computed. This indicates that the async worker
pending_global_ready_check: bool should start transferring weights. move_to_workspace sets this flag to False when
""" all weights have been transferred and the new map has been successfully committed.
Whether the async EPLB needs to poll peers for buffer readiness.
rebalanced relies on the GIL to synchronize access between the main thread and
the async worker.
""" """
eplb_stats: EplbStats | None eplb_stats: EplbStats | None
""" """
EPLB stats for the model. EPLB stats for the model.
""" """
is_unchanged: np.ndarray
"""
intermediate variable between `move_to_buffer` and `move_to_workspace`.
The size is same as the num of physical experts in the current layer.
"""
is_received_locally: np.ndarray
"""
intermediate variable between `move_to_buffer` and `move_to_workspace`.
The size is same as the num of physical experts in the current layer.
"""
recv_metadata: RecvMetadata
"""
intermediate variable between `move_to_buffer` and `move_to_workspace`.
"""
cuda_device_index: int | None cuda_device_index: int | None
""" """
CUDA device index for the async EPLB worker thread. CUDA device index for the async EPLB worker thread.
...@@ -231,10 +196,14 @@ class EplbModelState: ...@@ -231,10 +196,14 @@ class EplbModelState:
""" """
The communicator for expert weight transfers. The communicator for expert weight transfers.
""" """
new_physical_to_logical_map: torch.Tensor | None = None pending_result: AsyncEplbLayerResult | None = None
""" """
intermediate variable between `move_to_buffer` and `move_to_workspace`. Set by the async worker after all writes to expert_buffer are done. Consumed
the size is same as physical_to_logical_map and reset to None by the main thread in move_to_workspace() after the contents of
expert_buffer have been transferred out. At most one result is pending at a time.
pending_result relies on the GIL to synchronize access between the main thread and
the async worker.
""" """
...@@ -289,7 +258,7 @@ class EplbState: ...@@ -289,7 +258,7 @@ class EplbState:
""" """
The flag indicates whether the EPLB is running in async mode. The flag indicates whether the EPLB is running in async mode.
""" """
self.rearrange_event = threading.Event() self.rearrange_event: CpuGpuEvent = CpuGpuEvent()
""" """
Event to signal when a new rearrangement is needed for the async thread. Event to signal when a new rearrangement is needed for the async thread.
""" """
...@@ -493,25 +462,10 @@ class EplbState: ...@@ -493,25 +462,10 @@ class EplbState:
model_name=model_config.model, model_name=model_config.model,
model=model, model=model,
expert_buffer=expert_buffer, expert_buffer=expert_buffer,
buffer_lock=threading.Lock(),
buffer_consumed_event=None,
window_ready_event=None,
ep_buffer_ready=0,
layer_to_transfer=0,
rebalanced=False, rebalanced=False,
pending_global_ready_check=False,
eplb_stats=None, eplb_stats=None,
is_unchanged=np.array([]),
is_received_locally=np.array([]),
recv_metadata=RecvMetadata(
recv_primary_mask=np.array([]),
recv_count=0,
recv_expert_ids=np.array([]),
recv_dst_rows=np.array([]),
),
cuda_device_index=self.cuda_device_index, cuda_device_index=self.cuda_device_index,
communicator=communicator, communicator=communicator,
new_physical_to_logical_map=None,
) )
self.model_states[model_config.compute_hash()] = model_state self.model_states[model_config.compute_hash()] = model_state
self.num_valid_physical_experts = model.num_physical_experts self.num_valid_physical_experts = model.num_physical_experts
...@@ -622,17 +576,17 @@ class EplbState: ...@@ -622,17 +576,17 @@ class EplbState:
self.expert_rearrangement_step += 1 self.expert_rearrangement_step += 1
if self.is_async: if self.is_async:
# Run _move_to_workspace if all ranks have finished transferring the
# new weights to the intermediate buffer
for eplb_model_state in self.model_states.values(): for eplb_model_state in self.model_states.values():
all_ranks_buffer_ready = False # rebalanced must remain consistent amongst all ranks otherwise the
if eplb_model_state.pending_global_ready_check: # all_reduce in _all_ranks_result_ready will hang
all_ranks_buffer_ready = self._all_ranks_buffer_ready( if eplb_model_state.rebalanced and self._all_ranks_result_ready(
eplb_model_state eplb_model_state
) ):
if eplb_model_state.ep_buffer_ready and all_ranks_buffer_ready: _move_to_workspace(
self.move_to_workspace(
model_state=eplb_model_state, model_state=eplb_model_state,
ep_group=ep_group, ep_rank=ep_group.rank(),
is_profile=is_profile,
) )
if self.expert_rearrangement_step >= self.expert_rearrangement_step_interval: if self.expert_rearrangement_step >= self.expert_rearrangement_step_interval:
...@@ -846,18 +800,10 @@ class EplbState: ...@@ -846,18 +800,10 @@ class EplbState:
num_nodes=num_nodes, num_nodes=num_nodes,
num_gpus=num_gpus, num_gpus=num_gpus,
) )
# Record event after clone to signal async worker
# that load stats data is ready
sync_event = torch.cuda.Event()
sync_event.record()
eplb_model_state.window_ready_event = sync_event
eplb_model_state.rebalanced = True eplb_model_state.rebalanced = True
eplb_model_state.layer_to_transfer = 0
eplb_model_state.pending_global_ready_check = True
# Signal async thread to start transferring layers # Signal async thread to start transferring layers
if self.is_async and (not is_profile): if self.is_async and (not is_profile):
self.rearrange_event.set() self.rearrange_event.record()
return None return None
def start_async_loop( def start_async_loop(
...@@ -873,121 +819,27 @@ class EplbState: ...@@ -873,121 +819,27 @@ class EplbState:
is_profile=is_profile, is_profile=is_profile,
) )
def _all_ranks_buffer_ready(self, model_state: EplbModelState) -> bool: def _all_ranks_result_ready(self, model_state: EplbModelState) -> bool:
parallel_state = get_ep_group() parallel_state = get_ep_group()
has_result = int(model_state.pending_result is not None)
cpu_group = getattr(parallel_state, "cpu_group", None) cpu_group = getattr(parallel_state, "cpu_group", None)
if cpu_group is not None and cpu_group.size() > 1: if cpu_group is not None and cpu_group.size() > 1:
flag = torch.tensor( flag = torch.tensor((has_result,), dtype=torch.int32, device="cpu")
(int(model_state.ep_buffer_ready),), dtype=torch.int32, device="cpu"
)
all_reduce(flag, group=cpu_group) all_reduce(flag, group=cpu_group)
return int(flag.item()) == cpu_group.size() return int(flag.item()) == cpu_group.size()
device_group = parallel_state.device_group device_group = parallel_state.device_group
if device_group.size() <= 1: if device_group.size() <= 1:
return bool(model_state.ep_buffer_ready) return bool(has_result)
device = getattr( device = getattr(
parallel_state, "device", model_state.physical_to_logical_map.device parallel_state, "device", model_state.physical_to_logical_map.device
) )
flag = torch.tensor( flag = torch.tensor((has_result,), dtype=torch.int32, device=device)
(int(model_state.ep_buffer_ready),), dtype=torch.int32, device=device
)
all_reduce(flag, group=device_group) all_reduce(flag, group=device_group)
return int(flag.item()) == device_group.size() return int(flag.item()) == device_group.size()
def move_to_workspace(
self,
model_state: EplbModelState,
ep_group: ProcessGroup,
is_profile: bool = False,
):
# We call move_to_workspace only when ep_buffer_ready is 1.
# It means we only need to wait for the lock for a short time.
max_retries = 6 # 1 minute max
retries = 0
while not model_state.buffer_lock.acquire(blocking=True, timeout=10.0):
retries += 1
if retries >= max_retries:
raise RuntimeError(
f"Rank {ep_group.rank()}: buffer_lock timeout after "
"{max_retries * 10}s"
)
logger.warning(
"Rank %d: EPLB buffer_lock acquire failed, retrying (%d/%d)",
ep_group.rank(),
retries,
max_retries,
)
try:
assert model_state.new_physical_to_logical_map is not None
expert_weights = model_state.model.expert_weights[
model_state.layer_to_transfer
]
expert_weights_buffer = model_state.expert_buffer
new_indices = model_state.new_physical_to_logical_map[
model_state.layer_to_transfer
].numpy()
move_from_buffer(
expert_weights=expert_weights,
expert_weights_buffers=expert_weights_buffer,
is_unchanged=model_state.is_unchanged,
is_received_locally=model_state.is_received_locally,
recv_metadata=model_state.recv_metadata,
new_indices=new_indices,
ep_rank=ep_group.rank(),
)
transferred_layer = model_state.layer_to_transfer
transferred_layer = model_state.layer_to_transfer
assert model_state.new_physical_to_logical_map is not None
_commit_eplb_maps_for_layer(
model_state,
new_physical_to_logical_map=model_state.new_physical_to_logical_map,
layer=transferred_layer,
)
# Record event after consuming buffer to signal async thread
# that it's safe to overwrite the intermediate buffer
consumed_event = torch.cuda.Event()
consumed_event.record()
model_state.buffer_consumed_event = consumed_event
# After the main thread consumes, advance layer_to_transfer
model_state.layer_to_transfer += 1
model_state.ep_buffer_ready = 0
logger.debug(
"model %s successfully move_to_workspace layer %d",
model_state.model_name,
transferred_layer,
)
if model_state.layer_to_transfer >= model_state.model.num_moe_layers:
self.post_eplb(model_state)
model_state.rebalanced = False
model_state.layer_to_transfer = 0
model_state.pending_global_ready_check = False
logger.info(
"finish async transfer for model %s rank %d layer %d",
model_state.model_name,
ep_group.rank(),
model_state.model.num_moe_layers,
)
finally:
try:
model_state.buffer_lock.release()
except Exception as e:
logger.error(
"Rank %d: buffer_lock release failed in move_to_workspace: %s",
ep_group.rank(),
str(e),
)
def post_eplb(self, model_state: EplbModelState) -> None:
assert model_state.new_physical_to_logical_map is not None
model_state.new_physical_to_logical_map = None
def _allreduce_list(self, tensor_list: list[torch.Tensor]) -> list[torch.Tensor]: def _allreduce_list(self, tensor_list: list[torch.Tensor]) -> list[torch.Tensor]:
""" """
All-reduce a list of tensors. All-reduce a list of tensors.
...@@ -1225,7 +1077,7 @@ def _commit_eplb_maps_for_layer( ...@@ -1225,7 +1077,7 @@ def _commit_eplb_maps_for_layer(
""" """
# Commit physical_to_logical_map # Commit physical_to_logical_map
src = new_physical_to_logical_map[layer] src = new_physical_to_logical_map
dst = model_state.physical_to_logical_map[layer] dst = model_state.physical_to_logical_map[layer]
assert src.shape == dst.shape, ( assert src.shape == dst.shape, (
"The number of physical experts must stay the same while running Async EPLB. " "The number of physical experts must stay the same while running Async EPLB. "
...@@ -1284,3 +1136,33 @@ def _commit_eplb_maps( ...@@ -1284,3 +1136,33 @@ def _commit_eplb_maps(
src = new_replica_count src = new_replica_count
dst = model_state.logical_replica_count dst = model_state.logical_replica_count
dst.copy_(src, non_blocking=True) dst.copy_(src, non_blocking=True)
def _move_to_workspace(
model_state: EplbModelState,
ep_rank: int,
) -> None:
result = model_state.pending_result
assert result is not None
move_from_buffer(
expert_weights=model_state.model.expert_weights[result.layer_idx],
expert_weights_buffers=model_state.expert_buffer,
is_unchanged=result.is_unchanged,
is_received_locally=result.is_received_locally,
recv_metadata=result.recv_metadata,
new_indices=result.new_physical_to_logical_map.numpy(),
ep_rank=ep_rank,
)
_commit_eplb_maps_for_layer(
model_state,
new_physical_to_logical_map=result.new_physical_to_logical_map,
layer=result.layer_idx,
)
if result.layer_idx == model_state.model.num_moe_layers - 1:
model_state.rebalanced = False
# Reset pending_result before unblocking the async worker
model_state.pending_result = None
result.consumed_event.record()
...@@ -3,6 +3,9 @@ ...@@ -3,6 +3,9 @@
"""Utility functions for EPLB (Expert Parallel Load Balancing).""" """Utility functions for EPLB (Expert Parallel Load Balancing)."""
import os import os
import threading
import torch
from vllm.config import ParallelConfig from vllm.config import ParallelConfig
from vllm.logger import init_logger from vllm.logger import init_logger
...@@ -10,6 +13,54 @@ from vllm.logger import init_logger ...@@ -10,6 +13,54 @@ from vllm.logger import init_logger
logger = init_logger(__name__) logger = init_logger(__name__)
class CpuGpuEvent:
"""
Combines a CUDA event with a CPU threading event to enforce record->wait
ordering across two threads.
This class is designed for exactly two threads: one producer that calls
record() and one consumer that calls wait(). Using it with more than two
threads is not supported and will produce undefined behavior.
CUDA events alone are insufficient for cross-thread synchronization because
waiting on an unrecorded CUDA event is a no-op. The wait will return
immediately instead of blocking. This class adds a threading.Event so
that the waiting thread blocks on the CPU side until record() is called, at
which point the CUDA event is guaranteed to be in-flight and event.wait() will
correctly synchronize the GPU stream.
"""
def __init__(self):
self._event = torch.cuda.Event()
self._recorded = threading.Event()
def wait(self, stream: torch.cuda.Stream | None = None):
"""
Blocks the calling thread until record finishes. Used to guarantee that the
record kernel is called before wait.
Should only be called by the Async Eplb thread.
"""
self._recorded.wait()
self._event.wait(stream)
self._recorded.clear()
def record(self, stream: torch.cuda.Stream | None = None):
"""
Unblocks the waiting thread after calling event.record().
Should only be called by the main thread.
"""
if self._recorded.is_set():
raise RuntimeError(
"CpuGpuEvent.record() called before the previous event was "
"consumed by wait()"
)
self._event = torch.cuda.Event()
self._event.record(stream)
self._recorded.set()
def override_envs_for_eplb(parallel_config: ParallelConfig) -> None: def override_envs_for_eplb(parallel_config: ParallelConfig) -> None:
""" """
Override environment variables for EPLB when specific conditions are met. Override environment variables for EPLB when specific conditions are met.
......
...@@ -13,7 +13,11 @@ import numpy as np ...@@ -13,7 +13,11 @@ import numpy as np
import torch import torch
from torch.distributed import ProcessGroup, all_gather from torch.distributed import ProcessGroup, all_gather
from .eplb_communicator import EplbCommunicator from vllm.distributed.eplb.eplb_communicator import EplbCommunicator
from vllm.distributed.eplb.eplb_utils import CpuGpuEvent
from vllm.logger import init_logger
logger = init_logger(__name__)
@dataclass @dataclass
...@@ -34,6 +38,34 @@ class RecvMetadata: ...@@ -34,6 +38,34 @@ class RecvMetadata:
MoveToBufferResult = tuple[np.ndarray, np.ndarray, RecvMetadata] MoveToBufferResult = tuple[np.ndarray, np.ndarray, RecvMetadata]
@dataclass
class AsyncEplbLayerResult:
"""
The result of one completed async EPLB layer transfer.
"""
layer_idx: int
"""Index of the MoE layer that was transferred."""
new_physical_to_logical_map: torch.Tensor
"""
New physical→logical mapping for layers_idx, on CPU.
Shape: (num_physical_experts)
"""
is_unchanged: np.ndarray
"""Per-physical-expert flag: weight was not moved during transfer."""
is_received_locally: np.ndarray
"""Per-physical-expert flag: weight was received on this rank."""
recv_metadata: RecvMetadata
"""Metadata describing what was received during transfer_layer."""
consumed_event: CpuGpuEvent
"""
Event used to synchronize access to the intermediate buffer. The async worker calls
wait() after it finishes transferring weights to the intermediate buffer. The main
thread calls record() after it finishes transferring weights out of the intermediate
buffer in _move_to_workspace()
"""
def get_ep_ranks_with_experts_batch( def get_ep_ranks_with_experts_batch(
expert_ids: np.ndarray, expert_ids: np.ndarray,
num_local_experts: int, num_local_experts: int,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment