Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
OpenDAS
vllm_cscc
Commits
00e13357
Commit
00e13357
authored
Aug 01, 2025
by
zhuwenwen
Browse files
[feat]支持v1 engine mtp cudagraph
parent
3de379de
Changes
5
Hide whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
482 additions
and
10 deletions
+482
-10
vllm/config.py
vllm/config.py
+5
-0
vllm/v1/attention/backends/mla/common.py
vllm/v1/attention/backends/mla/common.py
+44
-9
vllm/v1/attention/backends/utils.py
vllm/v1/attention/backends/utils.py
+2
-0
vllm/v1/core/sched/scheduler.py
vllm/v1/core/sched/scheduler.py
+424
-1
vllm/v1/worker/gpu_model_runner.py
vllm/v1/worker/gpu_model_runner.py
+7
-0
No files found.
vllm/config.py
View file @
00e13357
...
@@ -4801,6 +4801,11 @@ class VllmConfig:
...
@@ -4801,6 +4801,11 @@ class VllmConfig:
size
for
size
in
batch_size_capture_list
size
for
size
in
batch_size_capture_list
if
size
<=
max_num_tokens
if
size
<=
max_num_tokens
]
]
# add for spec decode
if
self
.
speculative_config
is
not
None
and
self
.
speculative_config
.
num_lookahead_slots
>
0
:
batch_size_capture_list
=
list
(
map
(
lambda
x
:
x
*
(
1
+
self
.
speculative_config
.
num_lookahead_slots
),
batch_size_capture_list
))
self
.
compilation_config
.
init_with_cudagraph_sizes
(
self
.
compilation_config
.
init_with_cudagraph_sizes
(
batch_size_capture_list
)
batch_size_capture_list
)
...
...
vllm/v1/attention/backends/mla/common.py
View file @
00e13357
...
@@ -488,6 +488,10 @@ class MLACommonMetadataBuilder(AttentionMetadataBuilder[M]):
...
@@ -488,6 +488,10 @@ class MLACommonMetadataBuilder(AttentionMetadataBuilder[M]):
self
.
use_spec_decode
=
False
self
.
use_spec_decode
=
False
self
.
num_scheduled_tokens_np
=
np
.
zeros
(
scheduler_config
.
max_num_seqs
,
dtype
=
np
.
int32
)
self
.
num_scheduled_tokens_np
=
np
.
zeros
(
scheduler_config
.
max_num_seqs
,
dtype
=
np
.
int32
)
# support for cudagraph spec docoding
self
.
spec_decode_block_table_tensor
=
None
self
.
spec_decode_seq_lens
=
None
def
_build_fi_prefill_wrappers
(
self
,
prefill
:
FlashInferPrefillMetadata
):
def
_build_fi_prefill_wrappers
(
self
,
prefill
:
FlashInferPrefillMetadata
):
qo_indptr
=
prefill
.
query_start_loc
qo_indptr
=
prefill
.
query_start_loc
...
@@ -589,11 +593,30 @@ class MLACommonMetadataBuilder(AttentionMetadataBuilder[M]):
...
@@ -589,11 +593,30 @@ class MLACommonMetadataBuilder(AttentionMetadataBuilder[M]):
Currently, only decode is supported for full cudagraphs with MLA.
Currently, only decode is supported for full cudagraphs with MLA.
"""
"""
m
=
common_attn_metadata
m
=
common_attn_metadata
assert
m
.
num_reqs
==
m
.
num_actual_tokens
,
\
#
assert m.num_reqs == m.num_actual_tokens, \
"MLA only supports decode-only full CUDAGraph capture. "
\
#
"MLA only supports decode-only full CUDAGraph capture. " \
"Make sure all cudagraph capture sizes <= max_num_seq."
#
"Make sure all cudagraph capture sizes <= max_num_seq."
m
.
max_query_len
=
1
# decode-only
# m.max_query_len = 1 # decode-only
self
.
use_spec_decode
=
m
.
num_speculative_tokens
>
0
# support for cudagraph spec docoding
if
self
.
use_spec_decode
:
for
i
in
range
(
m
.
num_reqs
):
self
.
num_scheduled_tokens_np
[
i
]
=
m
.
num_actual_tokens
//
m
.
num_reqs
if
self
.
spec_decode_block_table_tensor
is
None
:
max_num_reqs
=
m
.
seq_lens
.
shape
[
0
]
block_table_tensor
=
self
.
block_table
.
get_device_tensor
()
tokens_per_seq
=
1
+
m
.
num_speculative_tokens
self
.
spec_decode_block_table_tensor
=
torch
.
zeros
((
block_table_tensor
.
shape
[
0
]
*
tokens_per_seq
,
block_table_tensor
.
shape
[
1
]),
dtype
=
block_table_tensor
.
dtype
,
device
=
m
.
seq_lens
.
device
)
self
.
spec_decode_seq_lens
=
torch
.
zeros
(
max_num_reqs
*
tokens_per_seq
,
dtype
=
m
.
seq_lens
.
dtype
,
device
=
m
.
seq_lens
.
device
)
return
self
.
build
(
0
,
m
)
return
self
.
build
(
0
,
m
)
...
@@ -742,10 +765,19 @@ class MLACommonMetadataBuilder(AttentionMetadataBuilder[M]):
...
@@ -742,10 +765,19 @@ class MLACommonMetadataBuilder(AttentionMetadataBuilder[M]):
seq_lens
.
device
,
non_blocking
=
True
)
seq_lens
.
device
,
non_blocking
=
True
)
decode_seq_lens
=
decode_seq_lens
-
seq_lens_minus
decode_seq_lens
=
decode_seq_lens
-
seq_lens_minus
decode_metadata
=
self
.
_build_decode
(
if
self
.
spec_decode_block_table_tensor
is
not
None
:
block_table_tensor
=
decode_block_table_tensor
,
self
.
spec_decode_block_table_tensor
[:
self
.
_num_decode_tokens
].
copy_
(
decode_block_table_tensor
)
seq_lens
=
decode_seq_lens
,
self
.
spec_decode_seq_lens
[:
self
.
_num_decode_tokens
].
copy_
(
decode_seq_lens
)
)
decode_metadata
=
self
.
_build_decode
(
block_table_tensor
=
self
.
spec_decode_block_table_tensor
[:
self
.
_num_decode_tokens
,
...],
seq_lens
=
self
.
spec_decode_seq_lens
[:
self
.
_num_decode_tokens
],
)
else
:
decode_metadata
=
self
.
_build_decode
(
block_table_tensor
=
decode_block_table_tensor
,
seq_lens
=
decode_seq_lens
,
)
else
:
else
:
decode_metadata
=
self
.
_build_decode
(
decode_metadata
=
self
.
_build_decode
(
block_table_tensor
=
block_table_tensor
[:
num_decodes
,
...],
block_table_tensor
=
block_table_tensor
[:
num_decodes
,
...],
...
@@ -775,7 +807,10 @@ class MLACommonMetadataBuilder(AttentionMetadataBuilder[M]):
...
@@ -775,7 +807,10 @@ class MLACommonMetadataBuilder(AttentionMetadataBuilder[M]):
def
can_run_in_cudagraph
(
def
can_run_in_cudagraph
(
self
,
common_attn_metadata
:
CommonAttentionMetadata
)
->
bool
:
self
,
common_attn_metadata
:
CommonAttentionMetadata
)
->
bool
:
return
common_attn_metadata
.
max_query_len
==
1
# return common_attn_metadata.max_query_len == 1
if
not
self
.
use_spec_decode
:
return
common_attn_metadata
.
max_query_len
==
1
return
self
.
_num_prefills
==
0
class
MLACommonImpl
(
MLAAttentionImpl
[
M
],
Generic
[
M
]):
class
MLACommonImpl
(
MLAAttentionImpl
[
M
],
Generic
[
M
]):
...
...
vllm/v1/attention/backends/utils.py
View file @
00e13357
...
@@ -55,6 +55,8 @@ class CommonAttentionMetadata:
...
@@ -55,6 +55,8 @@ class CommonAttentionMetadata:
"""Longest query in batch"""
"""Longest query in batch"""
num_rejected_tokens
:
list
[
int
]
num_rejected_tokens
:
list
[
int
]
"""(batch_size,), record the rejected tokens number in cpu and gpu"""
"""(batch_size,), record the rejected tokens number in cpu and gpu"""
num_speculative_tokens
:
int
=
0
"""Number of speculative tokens"""
block_table_tensor
:
torch
.
Tensor
block_table_tensor
:
torch
.
Tensor
slot_mapping
:
torch
.
Tensor
slot_mapping
:
torch
.
Tensor
...
...
vllm/v1/core/sched/scheduler.py
View file @
00e13357
...
@@ -141,6 +141,7 @@ class Scheduler(SchedulerInterface):
...
@@ -141,6 +141,7 @@ class Scheduler(SchedulerInterface):
cache_size
=
encoder_cache_size
)
cache_size
=
encoder_cache_size
)
speculative_config
=
vllm_config
.
speculative_config
speculative_config
=
vllm_config
.
speculative_config
self
.
speculative_config
=
speculative_config
self
.
use_eagle
=
False
self
.
use_eagle
=
False
self
.
num_spec_tokens
=
self
.
num_lookahead_tokens
=
0
self
.
num_spec_tokens
=
self
.
num_lookahead_tokens
=
0
...
@@ -162,7 +163,7 @@ class Scheduler(SchedulerInterface):
...
@@ -162,7 +163,7 @@ class Scheduler(SchedulerInterface):
)
)
self
.
use_pp
=
self
.
parallel_config
.
pipeline_parallel_size
>
1
self
.
use_pp
=
self
.
parallel_config
.
pipeline_parallel_size
>
1
def
schedule
(
self
)
->
SchedulerOutput
:
def
schedule
_default
(
self
)
->
SchedulerOutput
:
# NOTE(woosuk) on the scheduling algorithm:
# NOTE(woosuk) on the scheduling algorithm:
# There's no "decoding phase" nor "prefill phase" in the scheduler.
# There's no "decoding phase" nor "prefill phase" in the scheduler.
# Each request just has the num_computed_tokens and
# Each request just has the num_computed_tokens and
...
@@ -582,6 +583,428 @@ class Scheduler(SchedulerInterface):
...
@@ -582,6 +583,428 @@ class Scheduler(SchedulerInterface):
self
.
_update_after_schedule
(
scheduler_output
)
self
.
_update_after_schedule
(
scheduler_output
)
return
scheduler_output
return
scheduler_output
def
schedule_split_pd
(
self
)
->
SchedulerOutput
:
# Give priority to scheduling waiting requests
scheduled_new_reqs
:
list
[
Request
]
=
[]
scheduled_resumed_reqs
:
list
[
Request
]
=
[]
scheduled_running_reqs
:
list
[
Request
]
=
[]
preempted_reqs
:
list
[
Request
]
=
[]
# NOTE: structured_output_request_ids maps
# a request's (request that uses structured output)
# request_id to the running request index.
# This will helps us determine to slice the grammar bitmask
# and only applies valid mask for requests that
# uses structured decoding.
structured_output_request_ids
:
dict
[
str
,
int
]
=
{}
req_to_new_block_ids
:
dict
[
str
,
tuple
[
list
[
int
],
...]]
=
{}
num_scheduled_tokens
:
dict
[
str
,
int
]
=
{}
token_budget
=
self
.
max_num_scheduled_tokens
# Encoder-related.
scheduled_encoder_inputs
:
dict
[
str
,
list
[
int
]]
=
{}
encoder_budget
=
self
.
max_num_encoder_input_tokens
# Spec decode-related.
scheduled_spec_decode_tokens
:
dict
[
str
,
list
[
int
]]
=
{}
# For logging.
scheduled_timestamp
=
time
.
monotonic
()
# Use a temporary RequestQueue to collect requests that need to be
# skipped and put back at the head of the waiting queue later
skipped_waiting_requests
=
create_request_queue
(
self
.
policy
)
req_index
=
len
(
self
.
running
)
# First, schedule the WAITING requests.
while
self
.
waiting
and
token_budget
>
0
:
if
len
(
self
.
running
)
==
self
.
max_num_running_reqs
:
break
request
=
self
.
waiting
.
peek_request
()
# KVTransfer: skip request if still waiting for remote kvs.
if
request
.
status
==
RequestStatus
.
WAITING_FOR_REMOTE_KVS
:
is_ready
=
self
.
_update_waiting_for_remote_kv
(
request
)
if
is_ready
:
request
.
status
=
RequestStatus
.
WAITING
else
:
logger
.
debug
(
"%s is still in WAITING_FOR_REMOTE_KVS state."
,
request
.
request_id
)
self
.
waiting
.
pop_request
()
skipped_waiting_requests
.
prepend_request
(
request
)
continue
# Skip request if the structured output request is still waiting
# for FSM compilation.
if
request
.
status
==
RequestStatus
.
WAITING_FOR_FSM
:
structured_output_req
=
request
.
structured_output_request
if
structured_output_req
and
structured_output_req
.
grammar
:
request
.
status
=
RequestStatus
.
WAITING
else
:
self
.
waiting
.
pop_request
()
skipped_waiting_requests
.
prepend_request
(
request
)
continue
# Check that adding the request still respects the max_loras
# constraint.
if
(
self
.
lora_config
and
request
.
lora_request
and
(
len
(
scheduled_loras
)
==
self
.
lora_config
.
max_loras
and
request
.
lora_request
.
lora_int_id
not
in
scheduled_loras
)):
# Scheduling would exceed max_loras, skip.
self
.
waiting
.
pop_request
()
skipped_waiting_requests
.
prepend_request
(
request
)
continue
num_external_computed_tokens
=
0
load_kv_async
=
False
# Get already-cached tokens.
if
request
.
num_computed_tokens
==
0
:
# Get locally-cached tokens.
new_computed_blocks
,
num_new_local_computed_tokens
=
\
self
.
kv_cache_manager
.
get_computed_blocks
(
request
)
# Get externally-cached tokens if using a KVConnector.
if
self
.
connector
is
not
None
:
num_external_computed_tokens
,
load_kv_async
=
(
self
.
connector
.
get_num_new_matched_tokens
(
request
,
num_new_local_computed_tokens
))
# Total computed tokens (local + external).
num_computed_tokens
=
(
num_new_local_computed_tokens
+
num_external_computed_tokens
)
# KVTransfer: WAITING reqs have num_computed_tokens > 0
# after async KV recvs are completed.
else
:
new_computed_blocks
=
(
self
.
kv_cache_manager
.
create_empty_block_list
())
num_new_local_computed_tokens
=
0
num_computed_tokens
=
request
.
num_computed_tokens
encoder_inputs_to_schedule
=
None
new_encoder_budget
=
encoder_budget
# KVTransfer: loading remote KV, do not allocate for new work.
if
load_kv_async
:
assert
num_external_computed_tokens
>
0
num_new_tokens
=
0
# Number of tokens to be scheduled.
else
:
# We use `request.num_tokens` instead of
# `request.num_prompt_tokens` to consider the resumed
# requests, which have output tokens.
num_new_tokens
=
request
.
num_tokens
-
num_computed_tokens
if
(
0
<
self
.
scheduler_config
.
long_prefill_token_threshold
<
num_new_tokens
):
num_new_tokens
=
(
self
.
scheduler_config
.
long_prefill_token_threshold
)
# chunked prefill has to be enabled explicitly to allow
# pooling requests to be chunked
if
not
self
.
scheduler_config
.
chunked_prefill_enabled
and
\
num_new_tokens
>
token_budget
:
self
.
waiting
.
pop_request
()
skipped_waiting_requests
.
prepend_request
(
request
)
continue
num_new_tokens
=
min
(
num_new_tokens
,
token_budget
)
assert
num_new_tokens
>
0
# Schedule encoder inputs.
if
request
.
has_encoder_inputs
:
(
encoder_inputs_to_schedule
,
num_new_tokens
,
new_encoder_budget
)
=
self
.
_try_schedule_encoder_inputs
(
request
,
num_computed_tokens
,
num_new_tokens
,
encoder_budget
)
if
num_new_tokens
==
0
:
# The request cannot be scheduled.
break
new_blocks
=
self
.
kv_cache_manager
.
allocate_slots
(
request
,
num_new_tokens
+
num_external_computed_tokens
,
num_new_local_computed_tokens
,
new_computed_blocks
,
num_lookahead_tokens
=
self
.
num_lookahead_tokens
,
delay_cache_blocks
=
load_kv_async
,
)
if
new_blocks
is
None
:
# The request cannot be scheduled.
break
# KVTransfer: the connector uses this info to determine
# if a load is needed. Note that
# This information is used to determine if a load is
# needed for this request.
if
self
.
connector
is
not
None
:
self
.
connector
.
update_state_after_alloc
(
request
,
new_computed_blocks
+
new_blocks
,
num_external_computed_tokens
,
)
# Request was already popped from self.waiting
# unless it was re-added above due to new_blocks being None.
request
=
self
.
waiting
.
pop_request
()
if
load_kv_async
:
# If loading async, allocate memory and put request
# into the WAITING_FOR_REMOTE_KV state.
skipped_waiting_requests
.
prepend_request
(
request
)
request
.
status
=
RequestStatus
.
WAITING_FOR_REMOTE_KVS
continue
if
request
.
use_structured_output
:
structured_output_request_ids
[
request
.
request_id
]
=
(
req_index
)
req_index
+=
1
self
.
running
.
append
(
request
)
if
self
.
log_stats
:
request
.
record_event
(
EngineCoreEventType
.
SCHEDULED
,
scheduled_timestamp
)
if
request
.
status
==
RequestStatus
.
WAITING
:
scheduled_new_reqs
.
append
(
request
)
elif
request
.
status
==
RequestStatus
.
PREEMPTED
:
scheduled_resumed_reqs
.
append
(
request
)
else
:
raise
RuntimeError
(
f
"Invalid request status:
{
request
.
status
}
"
)
if
self
.
lora_config
and
request
.
lora_request
:
scheduled_loras
.
add
(
request
.
lora_request
.
lora_int_id
)
req_to_new_block_ids
[
request
.
request_id
]
=
(
self
.
kv_cache_manager
.
get_block_ids
(
request
.
request_id
))
num_scheduled_tokens
[
request
.
request_id
]
=
num_new_tokens
token_budget
-=
num_new_tokens
request
.
status
=
RequestStatus
.
RUNNING
request
.
num_computed_tokens
=
num_computed_tokens
# Count the number of prefix cached tokens.
if
request
.
num_cached_tokens
<
0
:
request
.
num_cached_tokens
=
num_computed_tokens
# Encoder-related.
if
encoder_inputs_to_schedule
:
scheduled_encoder_inputs
[
request
.
request_id
]
=
(
encoder_inputs_to_schedule
)
# Allocate the encoder cache.
for
i
in
encoder_inputs_to_schedule
:
self
.
encoder_cache_manager
.
allocate
(
request
,
i
)
encoder_budget
=
new_encoder_budget
# Put back any skipped requests at the head of the waiting queue
if
skipped_waiting_requests
:
self
.
waiting
.
prepend_requests
(
skipped_waiting_requests
)
# Next, schedule the RUNNING requests.
if
not
scheduled_new_reqs
and
not
scheduled_resumed_reqs
:
req_index
=
0
while
req_index
<
len
(
self
.
running
)
and
token_budget
>
0
:
request
=
self
.
running
[
req_index
]
num_new_tokens
=
(
request
.
num_tokens_with_spec
-
request
.
num_computed_tokens
)
if
(
0
<
self
.
scheduler_config
.
long_prefill_token_threshold
<
num_new_tokens
):
num_new_tokens
=
(
self
.
scheduler_config
.
long_prefill_token_threshold
)
num_new_tokens
=
min
(
num_new_tokens
,
token_budget
)
# Make sure the input position does not exceed the max model len.
# This is necessary when using spec decoding.
num_new_tokens
=
min
(
num_new_tokens
,
self
.
max_model_len
-
1
-
request
.
num_computed_tokens
)
# Schedule encoder inputs.
encoder_inputs_to_schedule
=
None
new_encoder_budget
=
encoder_budget
if
request
.
has_encoder_inputs
:
(
encoder_inputs_to_schedule
,
num_new_tokens
,
new_encoder_budget
)
=
self
.
_try_schedule_encoder_inputs
(
request
,
request
.
num_computed_tokens
,
num_new_tokens
,
encoder_budget
)
if
num_new_tokens
==
0
:
# The request cannot be scheduled because one of the following
# reasons:
# 1. No new tokens to schedule. This may happen when PP>1 and
# we have already scheduled all prompt tokens but they are
# not finished yet.
# 2. The encoder budget is exhausted.
# 3. The encoder cache is exhausted.
# NOTE(woosuk): Here, by doing `continue` instead of `break`,
# we do not strictly follow the FCFS scheduling policy and
# allow the lower-priority requests to be scheduled.
req_index
+=
1
continue
num_draft_tokens
=
max
(
num_new_tokens
+
request
.
num_computed_tokens
-
request
.
num_tokens
,
0
)
while
True
:
new_blocks
=
self
.
kv_cache_manager
.
allocate_slots
(
request
,
num_new_tokens
,
num_draft_tokens
=
num_draft_tokens
,
num_lookahead_tokens
=
self
.
num_lookahead_tokens
)
if
new_blocks
is
None
:
# The request cannot be scheduled.
# Preempt the lowest-priority request.
if
self
.
policy
==
SchedulingPolicy
.
PRIORITY
:
preempted_req
=
max
(
self
.
running
,
key
=
lambda
r
:
(
r
.
priority
,
r
.
arrival_time
),
)
self
.
running
.
remove
(
preempted_req
)
else
:
preempted_req
=
self
.
running
.
pop
()
self
.
kv_cache_manager
.
free
(
preempted_req
)
preempted_req
.
status
=
RequestStatus
.
PREEMPTED
preempted_req
.
num_computed_tokens
=
0
if
self
.
log_stats
:
preempted_req
.
record_event
(
EngineCoreEventType
.
PREEMPTED
,
scheduled_timestamp
)
self
.
waiting
.
prepend_request
(
preempted_req
)
preempted_reqs
.
append
(
preempted_req
)
if
preempted_req
==
request
:
# No more request to preempt.
can_schedule
=
False
break
else
:
# The request can be scheduled.
can_schedule
=
True
break
if
not
can_schedule
:
break
assert
new_blocks
is
not
None
# Schedule the request.
scheduled_running_reqs
.
append
(
request
)
if
request
.
use_structured_output
:
# PERF: in case of chunked prefill,
# request might not include any new tokens.
# Therefore, we might introduce some additional
# cycle to fill in the bitmask, which could be a big no-op.
structured_output_request_ids
[
request
.
request_id
]
=
req_index
req_to_new_block_ids
[
request
.
request_id
]
=
(
new_blocks
.
get_block_ids
())
num_scheduled_tokens
[
request
.
request_id
]
=
num_new_tokens
token_budget
-=
num_new_tokens
req_index
+=
1
# Speculative decode related.
if
request
.
spec_token_ids
:
num_scheduled_spec_tokens
=
(
num_new_tokens
+
request
.
num_computed_tokens
-
request
.
num_tokens
)
if
num_scheduled_spec_tokens
>
0
:
# Trim spec_token_ids list to num_scheduled_spec_tokens.
del
request
.
spec_token_ids
[
num_scheduled_spec_tokens
:]
scheduled_spec_decode_tokens
[
request
.
request_id
]
=
(
request
.
spec_token_ids
)
# Encoder-related.
if
encoder_inputs_to_schedule
:
scheduled_encoder_inputs
[
request
.
request_id
]
=
(
encoder_inputs_to_schedule
)
# Allocate the encoder cache.
for
i
in
encoder_inputs_to_schedule
:
self
.
encoder_cache_manager
.
allocate
(
request
,
i
)
encoder_budget
=
new_encoder_budget
# Record the LoRAs in scheduled_running_reqs
scheduled_loras
:
set
[
int
]
=
set
()
if
self
.
lora_config
:
scheduled_loras
=
set
(
req
.
lora_request
.
lora_int_id
for
req
in
scheduled_running_reqs
if
req
.
lora_request
and
req
.
lora_request
.
lora_int_id
>
0
)
assert
len
(
scheduled_loras
)
<=
self
.
lora_config
.
max_loras
# Check if the scheduling constraints are satisfied.
total_num_scheduled_tokens
=
sum
(
num_scheduled_tokens
.
values
())
assert
total_num_scheduled_tokens
<=
self
.
max_num_scheduled_tokens
assert
token_budget
>=
0
assert
len
(
self
.
running
)
<=
self
.
max_num_running_reqs
# Since some requests in the RUNNING queue may not be scheduled in
# this step, the total number of scheduled requests can be smaller than
# len(self.running).
assert
(
len
(
scheduled_new_reqs
)
+
len
(
scheduled_resumed_reqs
)
+
len
(
scheduled_running_reqs
)
<=
len
(
self
.
running
))
# Get the longest common prefix among all requests in the running queue.
# This can be potentially used for cascade attention.
num_common_prefix_blocks
=
[
0
]
*
len
(
self
.
kv_cache_config
.
kv_cache_groups
)
if
self
.
running
:
any_request
=
self
.
running
[
0
]
num_common_prefix_blocks
=
(
self
.
kv_cache_manager
.
get_num_common_prefix_blocks
(
any_request
,
len
(
self
.
running
)))
grammar_bitmask
=
self
.
structured_output_manager
.
grammar_bitmask
(
self
.
requests
,
structured_output_request_ids
,
scheduled_spec_decode_tokens
,
)
# Construct the scheduler output.
new_reqs_data
=
[
NewRequestData
.
from_request
(
req
,
req_to_new_block_ids
[
req
.
request_id
])
for
req
in
scheduled_new_reqs
]
cached_reqs_data
=
self
.
_make_cached_request_data
(
scheduled_running_reqs
,
scheduled_resumed_reqs
,
num_scheduled_tokens
,
scheduled_spec_decode_tokens
,
req_to_new_block_ids
,
)
scheduler_output
=
SchedulerOutput
(
scheduled_new_reqs
=
new_reqs_data
,
scheduled_cached_reqs
=
cached_reqs_data
,
num_scheduled_tokens
=
num_scheduled_tokens
,
total_num_scheduled_tokens
=
total_num_scheduled_tokens
,
scheduled_spec_decode_tokens
=
scheduled_spec_decode_tokens
,
scheduled_encoder_inputs
=
scheduled_encoder_inputs
,
num_common_prefix_blocks
=
num_common_prefix_blocks
,
# finished_req_ids is an existing state in the scheduler,
# instead of being newly scheduled in this step.
# It contains the request IDs that are finished in between
# the previous and the current steps.
finished_req_ids
=
self
.
finished_req_ids
,
free_encoder_input_ids
=
self
.
encoder_cache_manager
.
get_freed_ids
(),
structured_output_request_ids
=
structured_output_request_ids
,
grammar_bitmask
=
grammar_bitmask
,
)
# NOTE(Kuntai): this function is designed for multiple purposes:
# 1. Plan the KV cache store
# 2. Wrap up all the KV cache load / save ops into an opaque object
# 3. Clear the internal states of the connector
if
self
.
connector
is
not
None
:
meta
=
self
.
connector
.
build_connector_meta
(
scheduler_output
)
scheduler_output
.
kv_connector_metadata
=
meta
events
=
self
.
kv_cache_manager
.
take_events
()
if
events
:
batch
=
KVEventBatch
(
ts
=
time
.
time
(),
events
=
events
)
self
.
kv_event_publisher
.
publish
(
batch
)
self
.
_update_after_schedule
(
scheduler_output
)
return
scheduler_output
def
schedule
(
self
)
->
SchedulerOutput
:
if
self
.
num_spec_tokens
>
0
:
return
self
.
schedule_split_pd
()
else
:
return
self
.
schedule_default
()
def
_update_after_schedule
(
def
_update_after_schedule
(
self
,
self
,
...
...
vllm/v1/worker/gpu_model_runner.py
View file @
00e13357
...
@@ -2091,6 +2091,10 @@ class GPUModelRunner(LoRAModelRunnerMixin):
...
@@ -2091,6 +2091,10 @@ class GPUModelRunner(LoRAModelRunnerMixin):
max_num_reqs
=
self
.
scheduler_config
.
max_num_seqs
max_num_reqs
=
self
.
scheduler_config
.
max_num_seqs
num_reqs
=
min
(
num_tokens
,
max_num_reqs
)
num_reqs
=
min
(
num_tokens
,
max_num_reqs
)
min_tokens_per_req
=
num_tokens
//
num_reqs
min_tokens_per_req
=
num_tokens
//
num_reqs
if
not
is_profile
and
self
.
speculative_config
is
not
None
and
self
.
speculative_config
.
num_lookahead_slots
>
0
:
min_tokens_per_req
=
(
1
+
self
.
speculative_config
.
num_lookahead_slots
)
num_reqs
=
num_tokens
//
min_tokens_per_req
num_scheduled_tokens_list
=
[
min_tokens_per_req
]
*
num_reqs
num_scheduled_tokens_list
=
[
min_tokens_per_req
]
*
num_reqs
num_scheduled_tokens_list
[
-
1
]
+=
num_tokens
%
num_reqs
num_scheduled_tokens_list
[
-
1
]
+=
num_tokens
%
num_reqs
assert
sum
(
num_scheduled_tokens_list
)
==
num_tokens
assert
sum
(
num_scheduled_tokens_list
)
==
num_tokens
...
@@ -2107,6 +2111,8 @@ class GPUModelRunner(LoRAModelRunnerMixin):
...
@@ -2107,6 +2111,8 @@ class GPUModelRunner(LoRAModelRunnerMixin):
self
.
seq_lens_np
[
num_reqs
:]
=
0
self
.
seq_lens_np
[
num_reqs
:]
=
0
self
.
seq_lens
[:
num_reqs
].
copy_
(
self
.
seq_lens_cpu
[:
num_reqs
],
self
.
seq_lens
[:
num_reqs
].
copy_
(
self
.
seq_lens_cpu
[:
num_reqs
],
non_blocking
=
True
)
non_blocking
=
True
)
num_speculative_tokens
=
0
if
self
.
speculative_config
is
None
else
self
.
speculative_config
.
num_lookahead_slots
for
kv_cache_group_id
,
kv_cache_group_spec
in
enumerate
(
for
kv_cache_group_id
,
kv_cache_group_spec
in
enumerate
(
self
.
kv_cache_config
.
kv_cache_groups
):
self
.
kv_cache_config
.
kv_cache_groups
):
...
@@ -2121,6 +2127,7 @@ class GPUModelRunner(LoRAModelRunnerMixin):
...
@@ -2121,6 +2127,7 @@ class GPUModelRunner(LoRAModelRunnerMixin):
num_reqs
=
num_reqs
,
num_reqs
=
num_reqs
,
num_actual_tokens
=
num_tokens
,
num_actual_tokens
=
num_tokens
,
max_query_len
=
num_tokens
,
max_query_len
=
num_tokens
,
num_speculative_tokens
=
num_speculative_tokens
,
block_table_tensor
=
self
.
input_batch
.
block_table
[
block_table_tensor
=
self
.
input_batch
.
block_table
[
kv_cache_group_id
].
get_device_tensor
()[:
num_reqs
],
kv_cache_group_id
].
get_device_tensor
()[:
num_reqs
],
slot_mapping
=
self
.
input_batch
.
slot_mapping
=
self
.
input_batch
.
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment