Commit 57c0544e authored by zhuwenwen's avatar zhuwenwen
Browse files

feat: 添加输出占位符功能以优化调度

- 在环境变量中引入 `VLLM_ENABLE_OUTPUT_PLACEHOLDERS` 以控制输出占位符的启用。
- 在 `Request` 类中增加 `num_output_placeholders` 属性,用于跟踪预计生成的 token 数量。
- 更新调度逻辑以利用输出占位符,确保在解码过程中避免 0-token 停滞。
- 移除不再使用的最小进度注入相关代码,简化调度器实现。
parent 4886cbdb
......@@ -175,6 +175,7 @@ if TYPE_CHECKING:
USE_FUSED_SILU_MUL_QUANT: bool = False
VLLM_P2P_ASYNC: bool = False
VLLM_P2P_BUF_TOKENS: int = 30000
VLLM_ENABLE_OUTPUT_PLACEHOLDERS: bool = False
def get_default_cache_root():
return os.getenv(
......@@ -1142,6 +1143,9 @@ environment_variables: dict[str, Callable[[], Any]] = {
# pd separation p2p async buf tokens
"VLLM_P2P_BUF_TOKENS":
lambda: int(os.getenv("VLLM_P2P_BUF_TOKENS", "30000")),
# vllm will enable output placeholders
"VLLM_ENABLE_OUTPUT_PLACEHOLDERS":
lambda: bool(int(os.getenv("VLLM_ENABLE_OUTPUT_PLACEHOLDERS", "0"))),
}
# --8<-- [end:env-vars-definition]
......
......@@ -34,6 +34,7 @@ from vllm.v1.outputs import ModelRunnerOutput
from vllm.v1.request import Request, RequestStatus
from vllm.v1.spec_decode.metrics import SpecDecodingStats
from vllm.v1.structured_output import StructuredOutputManager
from vllm import envs
logger = init_logger(__name__)
......@@ -74,6 +75,8 @@ class Scheduler(SchedulerInterface):
self.enable_kv_cache_events = (
self.kv_events_config is not None
and self.kv_events_config.enable_kv_cache_events)
# Cache the output placeholders feature flag to avoid repeated envs access
self.enable_output_placeholders = envs.is_set("VLLM_ENABLE_OUTPUT_PLACEHOLDERS")
# Create KVConnector for the Scheduler. Note that each Worker
# will have a corresponding KVConnector with Role=WORKER.
......@@ -205,7 +208,11 @@ class Scheduler(SchedulerInterface):
while req_index < len(self.running) and token_budget > 0:
request = self.running[req_index]
num_new_tokens = (request.num_tokens_with_spec -
# Include output placeholders so that decode keeps a minimal
# backlog, avoiding 0-token plateaus when resuming from WAITING
# or immediately after prefill finishes.
num_new_tokens = (request.num_tokens_with_spec +
request.num_output_placeholders -
request.num_computed_tokens)
if (0 < self.scheduler_config.long_prefill_token_threshold <
num_new_tokens):
......@@ -236,9 +243,7 @@ class Scheduler(SchedulerInterface):
# not finished yet.
# 2. The encoder budget is exhausted.
# 3. The encoder cache is exhausted.
# NOTE(woosuk): Here, by doing `continue` instead of `break`,
# we do not strictly follow the FCFS scheduling policy and
# allow the lower-priority requests to be scheduled.
# Allow lower-priority requests to be scheduled in this step.
req_index += 1
continue
......@@ -806,7 +811,11 @@ class Scheduler(SchedulerInterface):
while req_index < len(self.running) and token_budget > 0:
request = self.running[req_index]
num_new_tokens = (request.num_tokens_with_spec -
# Include output placeholders to maintain a minimal backlog
# for decode, avoiding 0-token plateaus after prefill or when
# resuming from WAITING under PP/speculative paths.
num_new_tokens = (request.num_tokens_with_spec +
request.num_output_placeholders -
request.num_computed_tokens)
if (0 < self.scheduler_config.long_prefill_token_threshold <
num_new_tokens):
......@@ -1026,6 +1035,16 @@ class Scheduler(SchedulerInterface):
request = self.requests[req_id]
request.num_computed_tokens += num_scheduled_token
# 在推进已计算的 token 数量之后,对于本步骤将生成新 token 的请求,
# 增加一个输出占位符。这将创建一个小的赤字,确保即使在赤字本来为0的边界处,
# 随后的调度步骤也能继续安排解码任务。
if self.enable_output_placeholders:
for req_id in scheduler_output.num_scheduled_tokens:
request = self.requests[req_id]
if (request.num_computed_tokens
== request.num_tokens + request.num_output_placeholders):
request.num_output_placeholders += 1
# Clear the finished request IDs.
# NOTE: We shouldn't do self.finished_req_ids.clear() here because
# it will also affect the scheduler output.
......@@ -1245,6 +1264,12 @@ class Scheduler(SchedulerInterface):
# the outer lists can be of length > 1.
new_logprobs = logprobs.slice(req_index, req_index + 1)
# 按照本步实际生成的token数量(如果有),减少输出占位符的数量。
if self.enable_output_placeholders:
if new_token_ids:
request.num_output_placeholders = max(
0, request.num_output_placeholders - len(new_token_ids))
if new_token_ids and self.structured_output_manager.should_advance(
request):
# NOTE: structured_output_request
......
......@@ -82,6 +82,12 @@ class Request:
self.num_generated_token_ids = 0
self.cache_salt: Optional[str] = cache_salt
# 用于异步调度,确保解码过程能够持续推进:
# “输出占位符”的数量,表示我们预计即将生成的token数量。
# 其作用类似一个小的积压队列,用来避免在从WAITING状态恢复或解码初始时,
# 因待生成token数量为0而导致调度停滞的情况。
self.num_output_placeholders = 0
# Multi-modal related
self.mm_positions = multi_modal_placeholders or []
self.mm_inputs = multi_modal_inputs or []
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment