Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
OpenDAS
vllm_cscc
Commits
9ef6f50a
Commit
9ef6f50a
authored
Feb 08, 2026
by
王敏
Browse files
[feat]支持prefill和decode调度分离
parent
e807ec39
Changes
1
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
597 additions
and
1 deletion
+597
-1
vllm/v1/core/sched/scheduler.py
vllm/v1/core/sched/scheduler.py
+597
-1
No files found.
vllm/v1/core/sched/scheduler.py
View file @
9ef6f50a
...
...
@@ -12,6 +12,7 @@ import numpy as np
from
vllm
import
envs
from
vllm.compilation.cuda_graph
import
CUDAGraphStat
from
vllm.config
import
VllmConfig
from
vllm.config.compilation
import
CUDAGraphMode
from
vllm.distributed.ec_transfer.ec_connector.base
import
(
ECConnectorMetadata
,
ECConnectorRole
,
...
...
@@ -213,6 +214,10 @@ class Scheduler(SchedulerInterface):
if
speculative_config
.
uses_draft_model
():
self
.
num_lookahead_tokens
=
self
.
num_spec_tokens
self
.
compilation_config
=
vllm_config
.
compilation_config
self
.
full_cuda_graph
=
self
.
compilation_config
.
cudagraph_mode
==
CUDAGraphMode
.
FULL
self
.
use_mla
=
vllm_config
.
model_config
.
use_mla
# Create the KV cache manager.
self
.
kv_cache_manager
=
KVCacheManager
(
kv_cache_config
=
kv_cache_config
,
...
...
@@ -310,7 +315,7 @@ class Scheduler(SchedulerInterface):
pass
return
num_new_tokens
def
schedule
(
self
)
->
SchedulerOutput
:
def
schedule
_default
(
self
)
->
SchedulerOutput
:
# NOTE(woosuk) on the scheduling algorithm:
# There's no "decoding phase" nor "prefill phase" in the scheduler.
# Each request just has the num_computed_tokens and
...
...
@@ -888,6 +893,597 @@ class Scheduler(SchedulerInterface):
self
.
_update_after_schedule
(
scheduler_output
)
return
scheduler_output
def
schedule_split_pd
(
self
)
->
SchedulerOutput
:
# NOTE(woosuk) on the scheduling algorithm:
# There's no "decoding phase" nor "prefill phase" in the scheduler.
# Each request just has the num_computed_tokens and
# num_tokens_with_spec. num_tokens_with_spec =
# len(prompt_token_ids) + len(output_token_ids) + len(spec_token_ids).
# At each step, the scheduler tries to assign tokens to the requests
# so that each request's num_computed_tokens can catch up its
# num_tokens_with_spec. This is general enough to cover
# chunked prefills, prefix caching, speculative decoding,
# and the "jump decoding" optimization in the future.
scheduled_new_reqs
:
list
[
Request
]
=
[]
scheduled_resumed_reqs
:
list
[
Request
]
=
[]
scheduled_running_reqs
:
list
[
Request
]
=
[]
preempted_reqs
:
list
[
Request
]
=
[]
req_to_new_blocks
:
dict
[
str
,
KVCacheBlocks
]
=
{}
num_scheduled_tokens
:
dict
[
str
,
int
]
=
{}
token_budget
=
self
.
max_num_scheduled_tokens
# Encoder-related.
scheduled_encoder_inputs
:
dict
[
str
,
list
[
int
]]
=
{}
encoder_compute_budget
=
self
.
max_num_encoder_input_tokens
# Spec decode-related.
scheduled_spec_decode_tokens
:
dict
[
str
,
list
[
int
]]
=
{}
# For logging.
scheduled_timestamp
=
time
.
monotonic
()
# Use a temporary RequestQueue to collect requests that need to be
# skipped and put back at the head of the waiting queue later
skipped_waiting_requests
=
create_request_queue
(
self
.
policy
)
req_index
=
len
(
self
.
running
)
# First, schedule the WAITING requests.
while
self
.
waiting
and
token_budget
>
0
:
if
len
(
self
.
running
)
==
self
.
max_num_running_reqs
:
break
request
=
self
.
waiting
.
peek_request
()
# KVTransfer: skip request if still waiting for remote kvs.
if
request
.
status
==
RequestStatus
.
WAITING_FOR_REMOTE_KVS
:
is_ready
=
self
.
_update_waiting_for_remote_kv
(
request
)
if
is_ready
:
if
request
.
num_preemptions
:
# We must be loading for a resumed preemption
# rather than a new request.
request
.
status
=
RequestStatus
.
PREEMPTED
else
:
request
.
status
=
RequestStatus
.
WAITING
else
:
logger
.
debug
(
"%s is still in WAITING_FOR_REMOTE_KVS state."
,
request
.
request_id
,
)
self
.
waiting
.
pop_request
()
skipped_waiting_requests
.
prepend_request
(
request
)
continue
# Skip request if the structured output request is still waiting
# for FSM compilation.
if
request
.
status
==
RequestStatus
.
WAITING_FOR_FSM
:
structured_output_req
=
request
.
structured_output_request
if
structured_output_req
and
structured_output_req
.
grammar
:
request
.
status
=
RequestStatus
.
WAITING
else
:
self
.
waiting
.
pop_request
()
skipped_waiting_requests
.
prepend_request
(
request
)
continue
# Streaming: skip request if still waiting for next streaming req.
if
request
.
status
==
RequestStatus
.
WAITING_FOR_STREAMING_REQ
:
assert
not
request
.
streaming_queue
self
.
waiting
.
pop_request
()
skipped_waiting_requests
.
prepend_request
(
request
)
continue
# Check that adding the request still respects the max_loras
# constraint.
if
(
self
.
lora_config
and
request
.
lora_request
and
(
len
(
scheduled_loras
)
==
self
.
lora_config
.
max_loras
and
request
.
lora_request
.
lora_int_id
not
in
scheduled_loras
)
):
# Scheduling would exceed max_loras, skip.
self
.
waiting
.
pop_request
()
skipped_waiting_requests
.
prepend_request
(
request
)
continue
num_external_computed_tokens
=
0
load_kv_async
=
False
# Get already-cached tokens.
if
request
.
num_computed_tokens
==
0
:
# Get locally-cached tokens.
new_computed_blocks
,
num_new_local_computed_tokens
=
(
self
.
kv_cache_manager
.
get_computed_blocks
(
request
)
)
# Get externally-cached tokens if using a KVConnector.
if
self
.
connector
is
not
None
:
ext_tokens
,
load_kv_async
=
(
self
.
connector
.
get_num_new_matched_tokens
(
request
,
num_new_local_computed_tokens
)
)
if
ext_tokens
is
None
:
# The request cannot be scheduled because
# the KVConnector couldn't determine
# the number of matched tokens.
self
.
waiting
.
pop_request
()
skipped_waiting_requests
.
prepend_request
(
request
)
continue
request
.
num_external_computed_tokens
=
ext_tokens
num_external_computed_tokens
=
ext_tokens
# Total computed tokens (local + external).
num_computed_tokens
=
(
num_new_local_computed_tokens
+
num_external_computed_tokens
)
else
:
# KVTransfer: WAITING reqs have num_computed_tokens > 0
# after async KV recvs are completed.
new_computed_blocks
=
self
.
kv_cache_manager
.
empty_kv_cache_blocks
num_new_local_computed_tokens
=
0
num_computed_tokens
=
request
.
num_computed_tokens
encoder_inputs_to_schedule
=
None
external_load_encoder_input
=
[]
new_encoder_compute_budget
=
encoder_compute_budget
if
load_kv_async
:
# KVTransfer: loading remote KV, do not allocate for new work.
assert
num_external_computed_tokens
>
0
num_new_tokens
=
0
else
:
# Number of tokens to be scheduled.
# We use `request.num_tokens` instead of
# `request.num_prompt_tokens` to consider the resumed
# requests, which have output tokens.
num_new_tokens
=
request
.
num_tokens
-
num_computed_tokens
threshold
=
self
.
scheduler_config
.
long_prefill_token_threshold
if
0
<
threshold
<
num_new_tokens
:
num_new_tokens
=
threshold
# chunked prefill has to be enabled explicitly to allow
# pooling requests to be chunked
if
(
not
self
.
scheduler_config
.
enable_chunked_prefill
and
num_new_tokens
>
token_budget
):
# If chunked_prefill is disabled,
# we can stop the scheduling here.
break
num_new_tokens
=
min
(
num_new_tokens
,
token_budget
)
assert
num_new_tokens
>
0
# Schedule encoder inputs.
if
request
.
has_encoder_inputs
:
(
encoder_inputs_to_schedule
,
num_new_tokens
,
new_encoder_compute_budget
,
external_load_encoder_input
,
)
=
self
.
_try_schedule_encoder_inputs
(
request
,
num_computed_tokens
,
num_new_tokens
,
encoder_compute_budget
,
shift_computed_tokens
=
1
if
self
.
use_eagle
else
0
,
)
if
num_new_tokens
==
0
:
# The request cannot be scheduled.
break
if
self
.
need_mamba_block_aligned_split
:
num_new_tokens
=
self
.
_mamba_block_aligned_split
(
request
,
num_new_tokens
,
num_new_local_computed_tokens
,
num_external_computed_tokens
,
)
if
num_new_tokens
==
0
:
break
# Handles an edge case when P/D Disaggregation
# is used with Spec Decoding where an
# extra block gets allocated which
# creates a mismatch between the number
# of local and remote blocks.
effective_lookahead_tokens
=
(
0
if
request
.
num_computed_tokens
==
0
else
self
.
num_lookahead_tokens
)
num_encoder_tokens
=
(
self
.
_num_encoder_max_input_tokens
if
self
.
is_encoder_decoder
and
request
.
has_encoder_inputs
else
0
)
new_blocks
=
self
.
kv_cache_manager
.
allocate_slots
(
request
,
num_new_tokens
,
num_new_computed_tokens
=
num_new_local_computed_tokens
,
new_computed_blocks
=
new_computed_blocks
,
num_lookahead_tokens
=
effective_lookahead_tokens
,
num_external_computed_tokens
=
num_external_computed_tokens
,
delay_cache_blocks
=
load_kv_async
,
num_encoder_tokens
=
num_encoder_tokens
,
)
if
new_blocks
is
None
:
# The request cannot be scheduled.
# NOTE: we need to untouch the request from the encode cache
# manager
if
request
.
has_encoder_inputs
:
self
.
encoder_cache_manager
.
free
(
request
)
break
# KVTransfer: the connector uses this info to determine
# if a load is needed. Note that
# This information is used to determine if a load is
# needed for this request.
if
self
.
connector
is
not
None
:
self
.
connector
.
update_state_after_alloc
(
request
,
self
.
kv_cache_manager
.
get_blocks
(
request
.
request_id
),
num_external_computed_tokens
,
)
# Request was already popped from self.waiting
# unless it was re-added above due to new_blocks being None.
request
=
self
.
waiting
.
pop_request
()
if
load_kv_async
:
# If loading async, allocate memory and put request
# into the WAITING_FOR_REMOTE_KV state.
skipped_waiting_requests
.
prepend_request
(
request
)
request
.
status
=
RequestStatus
.
WAITING_FOR_REMOTE_KVS
continue
self
.
_update_connector_prefix_cache_stats
(
request
)
self
.
running
.
append
(
request
)
if
self
.
log_stats
:
request
.
record_event
(
EngineCoreEventType
.
SCHEDULED
,
scheduled_timestamp
)
if
request
.
status
==
RequestStatus
.
WAITING
:
scheduled_new_reqs
.
append
(
request
)
elif
request
.
status
==
RequestStatus
.
PREEMPTED
:
scheduled_resumed_reqs
.
append
(
request
)
else
:
raise
RuntimeError
(
f
"Invalid request status:
{
request
.
status
}
"
)
if
self
.
lora_config
and
request
.
lora_request
:
scheduled_loras
.
add
(
request
.
lora_request
.
lora_int_id
)
req_to_new_blocks
[
request
.
request_id
]
=
(
self
.
kv_cache_manager
.
get_blocks
(
request
.
request_id
)
)
num_scheduled_tokens
[
request
.
request_id
]
=
num_new_tokens
token_budget
-=
num_new_tokens
request
.
status
=
RequestStatus
.
RUNNING
request
.
num_computed_tokens
=
num_computed_tokens
# Count the number of prefix cached tokens.
if
request
.
num_cached_tokens
<
0
:
request
.
num_cached_tokens
=
num_computed_tokens
# Encoder-related.
if
encoder_inputs_to_schedule
:
scheduled_encoder_inputs
[
request
.
request_id
]
=
(
encoder_inputs_to_schedule
)
# Allocate the encoder cache.
for
i
in
encoder_inputs_to_schedule
:
self
.
encoder_cache_manager
.
allocate
(
request
,
i
)
encoder_compute_budget
=
new_encoder_compute_budget
# Allocate for external load encoder cache
if
external_load_encoder_input
:
for
i
in
external_load_encoder_input
:
self
.
encoder_cache_manager
.
allocate
(
request
,
i
)
if
self
.
ec_connector
is
not
None
:
self
.
ec_connector
.
update_state_after_alloc
(
request
,
i
)
# Put back any skipped requests at the head of the waiting queue
if
skipped_waiting_requests
:
self
.
waiting
.
prepend_requests
(
skipped_waiting_requests
)
# Next, schedule the RUNNING requests.
if
not
scheduled_new_reqs
and
not
scheduled_resumed_reqs
:
req_index
=
0
while
req_index
<
len
(
self
.
running
)
and
token_budget
>
0
:
request
=
self
.
running
[
req_index
]
# do not schedule another step for the same request while it still has
# output placeholders for PP.
# TODO: support PP + async scheduling without this limit
if
self
.
use_pp
and
request
.
num_output_placeholders
>
0
:
req_index
+=
1
continue
if
(
request
.
num_output_placeholders
>
0
# This is (num_computed_tokens + 1) - (num_output_placeholders - 1).
# Since output placeholders are also included in the computed tokens
# count, we subtract (num_output_placeholders - 1) to remove any draft
# tokens, so that we can be sure no further steps are needed even if
# they are all rejected.
and
request
.
num_computed_tokens
+
2
-
request
.
num_output_placeholders
>=
request
.
num_prompt_tokens
+
request
.
max_tokens
):
# Async scheduling: Avoid scheduling an extra step when we are sure that
# the previous step has reached request.max_tokens. We don't schedule
# partial draft tokens since this prevents uniform decode optimizations.
req_index
+=
1
continue
num_new_tokens
=
(
request
.
num_tokens_with_spec
+
request
.
num_output_placeholders
-
request
.
num_computed_tokens
)
if
0
<
self
.
scheduler_config
.
long_prefill_token_threshold
<
num_new_tokens
:
num_new_tokens
=
self
.
scheduler_config
.
long_prefill_token_threshold
num_new_tokens
=
min
(
num_new_tokens
,
token_budget
)
# Make sure the input position does not exceed the max model len.
# This is necessary when using spec decoding.
num_new_tokens
=
min
(
num_new_tokens
,
self
.
max_model_len
-
1
-
request
.
num_computed_tokens
)
# Schedule encoder inputs.
encoder_inputs_to_schedule
=
None
external_load_encoder_input
:
list
[
int
]
=
[]
new_encoder_compute_budget
=
encoder_compute_budget
if
request
.
has_encoder_inputs
:
(
encoder_inputs_to_schedule
,
num_new_tokens
,
new_encoder_compute_budget
,
external_load_encoder_input
,
)
=
self
.
_try_schedule_encoder_inputs
(
request
,
request
.
num_computed_tokens
,
num_new_tokens
,
encoder_compute_budget
,
shift_computed_tokens
=
1
if
self
.
use_eagle
else
0
,
)
if
self
.
need_mamba_block_aligned_split
:
num_new_tokens
=
self
.
_mamba_block_aligned_split
(
request
,
num_new_tokens
)
if
num_new_tokens
==
0
:
# The request cannot be scheduled because one of the following
# reasons:
# 1. No new tokens to schedule. This may happen when
# (1) PP>1 and we have already scheduled all prompt tokens
# but they are not finished yet.
# (2) Async scheduling and the request has reached to either
# its max_total_tokens or max_model_len.
# 2. The encoder budget is exhausted.
# 3. The encoder cache is exhausted.
# 4. Insufficient budget for a block-aligned chunk in hybrid
# models with mamba cache mode \"align\".
# NOTE(woosuk): Here, by doing `continue` instead of `break`,
# we do not strictly follow the FCFS scheduling policy and
# allow the lower-priority requests to be scheduled.
req_index
+=
1
continue
# Schedule newly needed KV blocks for the request.
with
record_function_or_nullcontext
(
"schedule: allocate_slots"
):
while
True
:
new_blocks
=
self
.
kv_cache_manager
.
allocate_slots
(
request
,
num_new_tokens
,
num_lookahead_tokens
=
self
.
num_lookahead_tokens
,
)
if
new_blocks
is
not
None
:
# The request can be scheduled.
break
# The request cannot be scheduled.
# Preempt the lowest-priority request.
if
self
.
policy
==
SchedulingPolicy
.
PRIORITY
:
preempted_req
=
max
(
self
.
running
,
key
=
lambda
r
:
(
r
.
priority
,
r
.
arrival_time
),
)
self
.
running
.
remove
(
preempted_req
)
if
preempted_req
in
scheduled_running_reqs
:
scheduled_running_reqs
.
remove
(
preempted_req
)
token_budget
+=
num_scheduled_tokens
[
preempted_req
.
request_id
]
req_to_new_blocks
.
pop
(
preempted_req
.
request_id
)
num_scheduled_tokens
.
pop
(
preempted_req
.
request_id
)
scheduled_spec_decode_tokens
.
pop
(
preempted_req
.
request_id
,
None
)
preempted_encoder_inputs
=
scheduled_encoder_inputs
.
pop
(
preempted_req
.
request_id
,
None
)
if
preempted_encoder_inputs
:
# Restore encoder compute budget if the preempted
# request had encoder inputs scheduled in this step.
num_embeds_to_restore
=
sum
(
preempted_req
.
get_num_encoder_embeds
(
i
)
for
i
in
preempted_encoder_inputs
)
encoder_compute_budget
+=
num_embeds_to_restore
req_index
-=
1
else
:
preempted_req
=
self
.
running
.
pop
()
self
.
_preempt_request
(
preempted_req
,
scheduled_timestamp
)
preempted_reqs
.
append
(
preempted_req
)
if
preempted_req
==
request
:
# No more request to preempt. Cannot schedule this request.
break
if
new_blocks
is
None
:
# Cannot schedule this request.
break
# Schedule the request.
scheduled_running_reqs
.
append
(
request
)
req_to_new_blocks
[
request
.
request_id
]
=
new_blocks
num_scheduled_tokens
[
request
.
request_id
]
=
num_new_tokens
token_budget
-=
num_new_tokens
req_index
+=
1
# Speculative decode related.
if
request
.
spec_token_ids
:
num_scheduled_spec_tokens
=
(
num_new_tokens
+
request
.
num_computed_tokens
-
request
.
num_tokens
-
request
.
num_output_placeholders
)
if
num_scheduled_spec_tokens
>
0
:
# Trim spec_token_ids list to num_scheduled_spec_tokens.
del
request
.
spec_token_ids
[
num_scheduled_spec_tokens
:]
scheduled_spec_decode_tokens
[
request
.
request_id
]
=
(
request
.
spec_token_ids
)
# New spec tokens will be set in `update_draft_token_ids` before the
# next step when applicable.
request
.
spec_token_ids
=
[]
# Encoder-related.
if
encoder_inputs_to_schedule
:
scheduled_encoder_inputs
[
request
.
request_id
]
=
(
encoder_inputs_to_schedule
)
# Allocate the encoder cache.
for
i
in
encoder_inputs_to_schedule
:
self
.
encoder_cache_manager
.
allocate
(
request
,
i
)
encoder_compute_budget
=
new_encoder_compute_budget
if
external_load_encoder_input
:
for
i
in
external_load_encoder_input
:
self
.
encoder_cache_manager
.
allocate
(
request
,
i
)
if
self
.
ec_connector
is
not
None
:
self
.
ec_connector
.
update_state_after_alloc
(
request
,
i
)
# Record the LoRAs in scheduled_running_reqs
scheduled_loras
:
set
[
int
]
=
set
()
if
self
.
lora_config
:
scheduled_loras
=
set
(
req
.
lora_request
.
lora_int_id
for
req
in
scheduled_running_reqs
if
req
.
lora_request
and
req
.
lora_request
.
lora_int_id
>
0
)
assert
len
(
scheduled_loras
)
<=
self
.
lora_config
.
max_loras
# Check if the scheduling constraints are satisfied.
total_num_scheduled_tokens
=
sum
(
num_scheduled_tokens
.
values
())
assert
total_num_scheduled_tokens
<=
self
.
max_num_scheduled_tokens
assert
token_budget
>=
0
assert
len
(
self
.
running
)
<=
self
.
max_num_running_reqs
# Since some requests in the RUNNING queue may not be scheduled in
# this step, the total number of scheduled requests can be smaller than
# len(self.running).
assert
len
(
scheduled_new_reqs
)
+
len
(
scheduled_resumed_reqs
)
+
len
(
scheduled_running_reqs
)
<=
len
(
self
.
running
)
# Get the longest common prefix among all requests in the running queue.
# This can be potentially used for cascade attention.
num_common_prefix_blocks
=
[
0
]
*
len
(
self
.
kv_cache_config
.
kv_cache_groups
)
with
record_function_or_nullcontext
(
"schedule: get_num_common_prefix_blocks"
):
if
self
.
running
:
any_request
=
self
.
running
[
0
]
num_common_prefix_blocks
=
(
self
.
kv_cache_manager
.
get_num_common_prefix_blocks
(
any_request
.
request_id
)
)
# Construct the scheduler output.
if
self
.
use_v2_model_runner
:
scheduled_new_reqs
=
scheduled_new_reqs
+
scheduled_resumed_reqs
scheduled_resumed_reqs
=
[]
new_reqs_data
=
[
NewRequestData
.
from_request
(
req
,
req_to_new_blocks
[
req
.
request_id
].
get_block_ids
(),
req
.
_all_token_ids
,
)
for
req
in
scheduled_new_reqs
]
else
:
new_reqs_data
=
[
NewRequestData
.
from_request
(
req
,
req_to_new_blocks
[
req
.
request_id
].
get_block_ids
()
)
for
req
in
scheduled_new_reqs
]
with
record_function_or_nullcontext
(
"schedule: make_cached_request_data"
):
cached_reqs_data
=
self
.
_make_cached_request_data
(
scheduled_running_reqs
,
scheduled_resumed_reqs
,
num_scheduled_tokens
,
scheduled_spec_decode_tokens
,
req_to_new_blocks
,
)
# Record the request ids that were scheduled in this step.
self
.
prev_step_scheduled_req_ids
.
clear
()
self
.
prev_step_scheduled_req_ids
.
update
(
num_scheduled_tokens
.
keys
())
scheduler_output
=
SchedulerOutput
(
scheduled_new_reqs
=
new_reqs_data
,
scheduled_cached_reqs
=
cached_reqs_data
,
num_scheduled_tokens
=
num_scheduled_tokens
,
total_num_scheduled_tokens
=
total_num_scheduled_tokens
,
scheduled_spec_decode_tokens
=
scheduled_spec_decode_tokens
,
scheduled_encoder_inputs
=
scheduled_encoder_inputs
,
num_common_prefix_blocks
=
num_common_prefix_blocks
,
preempted_req_ids
=
{
req
.
request_id
for
req
in
preempted_reqs
},
# finished_req_ids is an existing state in the scheduler,
# instead of being newly scheduled in this step.
# It contains the request IDs that are finished in between
# the previous and the current steps.
finished_req_ids
=
self
.
finished_req_ids
,
free_encoder_mm_hashes
=
self
.
encoder_cache_manager
.
get_freed_mm_hashes
(),
)
# NOTE(Kuntai): this function is designed for multiple purposes:
# 1. Plan the KV cache store
# 2. Wrap up all the KV cache load / save ops into an opaque object
# 3. Clear the internal states of the connector
if
self
.
connector
is
not
None
:
meta
:
KVConnectorMetadata
=
self
.
connector
.
build_connector_meta
(
scheduler_output
)
scheduler_output
.
kv_connector_metadata
=
meta
# Build the connector meta for ECConnector
if
self
.
ec_connector
is
not
None
:
ec_meta
:
ECConnectorMetadata
=
self
.
ec_connector
.
build_connector_meta
(
scheduler_output
)
scheduler_output
.
ec_connector_metadata
=
ec_meta
with
record_function_or_nullcontext
(
"schedule: update_after_schedule"
):
self
.
_update_after_schedule
(
scheduler_output
)
return
scheduler_output
def
schedule
(
self
)
->
SchedulerOutput
:
if
envs
.
VLLM_USE_PD_SPLIT
:
if
self
.
use_mla
:
if
self
.
full_cuda_graph
and
self
.
num_spec_tokens
>
0
:
return
self
.
schedule_split_pd
()
else
:
return
self
.
schedule_default
()
else
:
return
self
.
schedule_split_pd
()
else
:
return
self
.
schedule_default
()
def
_preempt_request
(
self
,
request
:
Request
,
timestamp
:
float
)
->
None
:
"""Preempt a request and put it back to the waiting queue.
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment