Unverified Commit 62b1bbe4 authored by Sage Moore's avatar Sage Moore Committed by GitHub
Browse files

[EPLB] Remove asyncio infrastructure from Async EPLB (#40730)


Signed-off-by: default avatarSage Moore <sage@neuralmagic.com>
parent fa4b7055
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project # SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import asyncio
import random import random
import pytest import pytest
...@@ -361,16 +360,14 @@ def _test_async_transfer_layer_without_mtp_worker( ...@@ -361,16 +360,14 @@ def _test_async_transfer_layer_without_mtp_worker(
communicator.set_stream(cuda_stream) communicator.set_stream(cuda_stream)
for layer_idx in range(num_layers): for layer_idx in range(num_layers):
transfer_metadata = asyncio.run( transfer_metadata = transfer_layer(
transfer_layer( old_layer_indices=old_indices_cpu[layer_idx],
old_layer_indices=old_indices_cpu[layer_idx], new_layer_indices=new_indices_cpu[layer_idx],
new_layer_indices=new_indices_cpu[layer_idx], expert_weights=expert_weights[layer_idx],
expert_weights=expert_weights[layer_idx], expert_weights_buffer=expert_buffer,
expert_weights_buffer=expert_buffer, ep_group=ep_group,
ep_group=ep_group, communicator=communicator,
communicator=communicator, cuda_stream=cuda_stream,
cuda_stream=cuda_stream,
)
) )
cuda_stream.synchronize() cuda_stream.synchronize()
move_from_buffer( move_from_buffer(
......
...@@ -4,7 +4,6 @@ ...@@ -4,7 +4,6 @@
The async worker that transfers experts in the background. The async worker that transfers experts in the background.
""" """
import asyncio
import threading import threading
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
...@@ -36,21 +35,15 @@ def start_async_worker( ...@@ -36,21 +35,15 @@ def start_async_worker(
assert device_index is not None assert device_index is not None
torch.accelerator.set_device_index(device_index) torch.accelerator.set_device_index(device_index)
cuda_stream = torch.cuda.Stream(device=device_index) cuda_stream = torch.cuda.Stream(device=device_index)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try: try:
loop.run_until_complete( transfer_run_periodically(
transfer_run_periodically( state=state,
state=state, eplb_group=eplb_group,
eplb_group=eplb_group, cuda_stream=cuda_stream,
cuda_stream=cuda_stream, is_profile=is_profile,
is_profile=is_profile,
)
) )
except Exception as exc: # pragma: no cover - diagnostic path except Exception as exc: # pragma: no cover - diagnostic path
logger.exception("async loop error (Rank %d): %s", rank, str(exc)) logger.exception("async loop error (Rank %d): %s", rank, str(exc))
finally:
loop.close()
thread = threading.Thread(target=thread_target, daemon=True) thread = threading.Thread(target=thread_target, daemon=True)
thread.start() thread.start()
...@@ -83,7 +76,7 @@ def run_rebalance_experts( ...@@ -83,7 +76,7 @@ def run_rebalance_experts(
return new_physical_to_logical_map return new_physical_to_logical_map
async def transfer_run_periodically( def transfer_run_periodically(
state: "EplbState", state: "EplbState",
eplb_group: ProcessGroup, eplb_group: ProcessGroup,
cuda_stream: torch.cuda.Stream, cuda_stream: torch.cuda.Stream,
...@@ -118,7 +111,7 @@ async def transfer_run_periodically( ...@@ -118,7 +111,7 @@ async def transfer_run_periodically(
# model_state.expert_buffer, which will be consumed by the main thread in # model_state.expert_buffer, which will be consumed by the main thread in
# move_to_workspace # move_to_workspace
while model_state.rebalanced and layer_idx < num_layers: while model_state.rebalanced and layer_idx < num_layers:
transfer_metadata = await transfer_layer( transfer_metadata = transfer_layer(
old_layer_indices=physical_to_logical_map_cpu[layer_idx], old_layer_indices=physical_to_logical_map_cpu[layer_idx],
new_layer_indices=new_physical_to_logical_map[layer_idx], new_layer_indices=new_physical_to_logical_map[layer_idx],
expert_weights=model_state.model.expert_weights[layer_idx], expert_weights=model_state.model.expert_weights[layer_idx],
......
...@@ -418,7 +418,7 @@ def move_from_buffer( ...@@ -418,7 +418,7 @@ def move_from_buffer(
w[dst].copy_(w[src], non_blocking=True) w[dst].copy_(w[src], non_blocking=True)
async def transfer_layer( def transfer_layer(
old_layer_indices: torch.Tensor, old_layer_indices: torch.Tensor,
new_layer_indices: torch.Tensor, new_layer_indices: torch.Tensor,
expert_weights: Sequence[torch.Tensor], expert_weights: Sequence[torch.Tensor],
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment