Commit dad7d083 authored by 王敏's avatar 王敏
Browse files

去掉PP异步调度相关代码

parent ca35113a
......@@ -365,14 +365,14 @@ class Scheduler(SchedulerInterface):
# do not schedule another step for the same request while it still has
# output placeholders for PP.
# TODO: support PP + async scheduling without this limit
# if self.use_pp:
# if (envs.VLLM_USE_PP_BALANCE and
# len(scheduled_new_reqs) + len(scheduled_resumed_reqs)
# + len(scheduled_running_reqs) >= max_batch_running):
# break
# if request.num_output_placeholders > 0:
# req_index += 1
# continue
if self.use_pp:
if (envs.VLLM_USE_PP_BALANCE and
len(scheduled_new_reqs) + len(scheduled_resumed_reqs)
+ len(scheduled_running_reqs) >= max_batch_running):
break
if request.num_output_placeholders > 0:
req_index += 1
continue
if (
request.num_output_placeholders > 0
......
......@@ -4070,9 +4070,7 @@ class GPUModelRunner(
self.kv_connector_output = None
if self.execute_model_state is None:
# receive sampled token ids from the last PP rank.
if self.use_async_scheduling and get_pp_group().world_size > 1:
self._pp_receive_prev_sampled_token_ids_to_input_batch()
# Nothing to do (PP non-final rank case), output isn't used.
if not kv_connector_output:
return None # type: ignore[return-value]
......@@ -4114,13 +4112,6 @@ class GPUModelRunner(
sampler_output.sampled_token_ids, scheduler_output
)
if self.use_async_scheduling:
pp = get_pp_group()
if pp.world_size > 1 and pp.is_last_rank:
self._pp_broadcast_prev_sampled_token_ids(
sampler_output.sampled_token_ids
)
self._draft_token_ids = None
self._draft_token_req_ids = None
self.input_batch.prev_sampled_token_ids = None
......@@ -4280,45 +4271,6 @@ class GPUModelRunner(
)
return async_output
def _pp_broadcast_prev_sampled_token_ids(
self, sampled_token_ids: torch.Tensor
) -> None:
"""Broadcast sampled token ids (GPU) from last PP stage"""
pp = get_pp_group()
assert pp.is_last_rank
# `prev_sampled_token_ids` is expected to have shape [num_reqs, 1].
assert sampled_token_ids.dim() == 2 and sampled_token_ids.shape[-1] == 1, (
"PP+async expects sampled_token_ids to have shape [num_reqs, 1]"
)
torch.distributed.broadcast(
sampled_token_ids, src=pp.rank, group=pp.device_group
)
def _pp_receive_prev_sampled_token_ids_to_input_batch(self) -> None:
"""Receive sampled token ids broadcast from last PP stage"""
pp = get_pp_group()
assert not pp.is_last_rank
num_reqs = self.input_batch.num_reqs
# `prev_sampled_token_ids` is expected to have shape [num_reqs, 1].
recv = torch.empty((num_reqs, 1), dtype=torch.int32, device=self.device)
torch.distributed.broadcast(recv, src=pp.last_rank, group=pp.device_group)
self.input_batch.prev_sampled_token_ids = recv
# construct `prev_req_id_to_index` here so `_prepare_input_ids`
# can map req_id -> previous batch row
discard_req_indices = np.nonzero(self.discard_request_mask.np[:num_reqs])[0]
discard_req_indices_set = set(discard_req_indices)
prev_req_id_to_index: dict[str, int] = {}
for i, req_id in enumerate(self.input_batch.req_ids):
if i in discard_req_indices_set:
continue
prev_req_id_to_index[req_id] = i
# PP+async scheduling: advance per-request local cached output length by
# appending a placeholder (-1) token id.
if (req_state := self.requests.get(req_id)) is not None:
req_state.output_token_ids.append(-1)
self.input_batch.prev_req_id_to_index = prev_req_id_to_index
def take_draft_token_ids(self) -> DraftTokenIds | None:
if not self.num_spec_tokens or not self._draft_token_req_ids:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment