Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
OpenDAS
vllm_cscc
Commits
82ae97d2
Commit
82ae97d2
authored
May 08, 2026
by
wangmin6
Browse files
Revert "去掉PP异步调度相关代码"
parent
dad7d083
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
57 additions
and
9 deletions
+57
-9
vllm/v1/core/sched/scheduler.py
vllm/v1/core/sched/scheduler.py
+8
-8
vllm/v1/worker/gpu_model_runner.py
vllm/v1/worker/gpu_model_runner.py
+49
-1
No files found.
vllm/v1/core/sched/scheduler.py
View file @
82ae97d2
...
@@ -365,14 +365,14 @@ class Scheduler(SchedulerInterface):
...
@@ -365,14 +365,14 @@ class Scheduler(SchedulerInterface):
# do not schedule another step for the same request while it still has
# do not schedule another step for the same request while it still has
# output placeholders for PP.
# output placeholders for PP.
# TODO: support PP + async scheduling without this limit
# TODO: support PP + async scheduling without this limit
if
self
.
use_pp
:
#
if self.use_pp:
if
(
envs
.
VLLM_USE_PP_BALANCE
and
#
if (envs.VLLM_USE_PP_BALANCE and
len
(
scheduled_new_reqs
)
+
len
(
scheduled_resumed_reqs
)
#
len(scheduled_new_reqs) + len(scheduled_resumed_reqs)
+
len
(
scheduled_running_reqs
)
>=
max_batch_running
):
#
+ len(scheduled_running_reqs) >= max_batch_running):
break
#
break
if
request
.
num_output_placeholders
>
0
:
#
if request.num_output_placeholders > 0:
req_index
+=
1
#
req_index += 1
continue
#
continue
if
(
if
(
request
.
num_output_placeholders
>
0
request
.
num_output_placeholders
>
0
...
...
vllm/v1/worker/gpu_model_runner.py
View file @
82ae97d2
...
@@ -4070,7 +4070,9 @@ class GPUModelRunner(
...
@@ -4070,7 +4070,9 @@ class GPUModelRunner(
self
.
kv_connector_output
=
None
self
.
kv_connector_output
=
None
if
self
.
execute_model_state
is
None
:
if
self
.
execute_model_state
is
None
:
# Nothing to do (PP non-final rank case), output isn't used.
# receive sampled token ids from the last PP rank.
if
self
.
use_async_scheduling
and
get_pp_group
().
world_size
>
1
:
self
.
_pp_receive_prev_sampled_token_ids_to_input_batch
()
if
not
kv_connector_output
:
if
not
kv_connector_output
:
return
None
# type: ignore[return-value]
return
None
# type: ignore[return-value]
...
@@ -4112,6 +4114,13 @@ class GPUModelRunner(
...
@@ -4112,6 +4114,13 @@ class GPUModelRunner(
sampler_output
.
sampled_token_ids
,
scheduler_output
sampler_output
.
sampled_token_ids
,
scheduler_output
)
)
if
self
.
use_async_scheduling
:
pp
=
get_pp_group
()
if
pp
.
world_size
>
1
and
pp
.
is_last_rank
:
self
.
_pp_broadcast_prev_sampled_token_ids
(
sampler_output
.
sampled_token_ids
)
self
.
_draft_token_ids
=
None
self
.
_draft_token_ids
=
None
self
.
_draft_token_req_ids
=
None
self
.
_draft_token_req_ids
=
None
self
.
input_batch
.
prev_sampled_token_ids
=
None
self
.
input_batch
.
prev_sampled_token_ids
=
None
...
@@ -4271,6 +4280,45 @@ class GPUModelRunner(
...
@@ -4271,6 +4280,45 @@ class GPUModelRunner(
)
)
return
async_output
return
async_output
def
_pp_broadcast_prev_sampled_token_ids
(
self
,
sampled_token_ids
:
torch
.
Tensor
)
->
None
:
"""Broadcast sampled token ids (GPU) from last PP stage"""
pp
=
get_pp_group
()
assert
pp
.
is_last_rank
# `prev_sampled_token_ids` is expected to have shape [num_reqs, 1].
assert
sampled_token_ids
.
dim
()
==
2
and
sampled_token_ids
.
shape
[
-
1
]
==
1
,
(
"PP+async expects sampled_token_ids to have shape [num_reqs, 1]"
)
torch
.
distributed
.
broadcast
(
sampled_token_ids
,
src
=
pp
.
rank
,
group
=
pp
.
device_group
)
def
_pp_receive_prev_sampled_token_ids_to_input_batch
(
self
)
->
None
:
"""Receive sampled token ids broadcast from last PP stage"""
pp
=
get_pp_group
()
assert
not
pp
.
is_last_rank
num_reqs
=
self
.
input_batch
.
num_reqs
# `prev_sampled_token_ids` is expected to have shape [num_reqs, 1].
recv
=
torch
.
empty
((
num_reqs
,
1
),
dtype
=
torch
.
int32
,
device
=
self
.
device
)
torch
.
distributed
.
broadcast
(
recv
,
src
=
pp
.
last_rank
,
group
=
pp
.
device_group
)
self
.
input_batch
.
prev_sampled_token_ids
=
recv
# construct `prev_req_id_to_index` here so `_prepare_input_ids`
# can map req_id -> previous batch row
discard_req_indices
=
np
.
nonzero
(
self
.
discard_request_mask
.
np
[:
num_reqs
])[
0
]
discard_req_indices_set
=
set
(
discard_req_indices
)
prev_req_id_to_index
:
dict
[
str
,
int
]
=
{}
for
i
,
req_id
in
enumerate
(
self
.
input_batch
.
req_ids
):
if
i
in
discard_req_indices_set
:
continue
prev_req_id_to_index
[
req_id
]
=
i
# PP+async scheduling: advance per-request local cached output length by
# appending a placeholder (-1) token id.
if
(
req_state
:
=
self
.
requests
.
get
(
req_id
))
is
not
None
:
req_state
.
output_token_ids
.
append
(
-
1
)
self
.
input_batch
.
prev_req_id_to_index
=
prev_req_id_to_index
def
take_draft_token_ids
(
self
)
->
DraftTokenIds
|
None
:
def
take_draft_token_ids
(
self
)
->
DraftTokenIds
|
None
:
if
not
self
.
num_spec_tokens
or
not
self
.
_draft_token_req_ids
:
if
not
self
.
num_spec_tokens
or
not
self
.
_draft_token_req_ids
:
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment