Commit 613edd7d authored by zhuwenwen's avatar zhuwenwen
Browse files

feat: 添加输出占位符功能以优化调度

- 在环境变量中引入 `VLLM_SCHED_ENABLE_MINIMAL_INJECTION` 以控制流水线并行调度的最小注入。
- 调整 Scheduler 逻辑以使用新的最小注入功能。
- 更新调度逻辑以利用输出占位符,确保在解码过程中避免 0-token 停滞。
- 增强 Scheduler,根据批次队列状态管理最小进度注入。
parent 7fdcfde2
......@@ -175,6 +175,7 @@ if TYPE_CHECKING:
USE_FUSED_SILU_MUL_QUANT: bool = False
VLLM_P2P_ASYNC: bool = False
VLLM_P2P_BUF_TOKENS: int = 30000
VLLM_SCHED_ENABLE_MINIMAL_INJECTION: bool = False
def get_default_cache_root():
return os.getenv(
......@@ -1151,6 +1152,11 @@ environment_variables: dict[str, Callable[[], Any]] = {
# pd separation p2p async buf tokens
"VLLM_P2P_BUF_TOKENS":
lambda: int(os.getenv("VLLM_P2P_BUF_TOKENS", "30000")),
# vllm will enable minimal injection for pipeline parallel scheduling
"VLLM_SCHED_ENABLE_MINIMAL_INJECTION":
lambda: (os.getenv("VLLM_SCHED_ENABLE_MINIMAL_INJECTION", "0").lower() in
("true", "1")),
}
# --8<-- [end:env-vars-definition]
......
......@@ -148,3 +148,6 @@ class SchedulerInterface(ABC):
def get_kv_connector(self) -> Optional["KVConnectorBase_V1"]:
return None
def set_allow_minimal_injection(self, allow: bool) -> None:
return
......@@ -34,6 +34,7 @@ from vllm.v1.outputs import ModelRunnerOutput
from vllm.v1.request import Request, RequestStatus
from vllm.v1.spec_decode.metrics import SpecDecodingStats
from vllm.v1.structured_output import StructuredOutputManager
from vllm import envs
logger = init_logger(__name__)
......@@ -74,6 +75,7 @@ class Scheduler(SchedulerInterface):
self.enable_kv_cache_events = (
self.kv_events_config is not None
and self.kv_events_config.enable_kv_cache_events)
self.enable_minimal_injection = envs.VLLM_SCHED_ENABLE_MINIMAL_INJECTION
# Create KVConnector for the Scheduler. Note that each Worker
# will have a corresponding KVConnector with Role=WORKER.
......@@ -166,6 +168,11 @@ class Scheduler(SchedulerInterface):
enable_kv_cache_events=self.enable_kv_cache_events,
)
self.use_pp = self.parallel_config.pipeline_parallel_size > 1
self.allow_minimal_injection: bool = False
# Engine hint to gate minimal-progress injection (0-token breaker)
def set_allow_minimal_injection(self, allow: bool) -> None:
self.allow_minimal_injection = allow
def schedule_default(self) -> SchedulerOutput:
# NOTE(woosuk) on the scheduling algorithm:
......@@ -209,7 +216,7 @@ class Scheduler(SchedulerInterface):
while req_index < len(self.running) and token_budget > 0:
request = self.running[req_index]
num_new_tokens = (request.num_tokens_with_spec -
num_new_tokens = (request.num_tokens_with_spec -
request.num_computed_tokens)
if (0 < self.scheduler_config.long_prefill_token_threshold <
num_new_tokens):
......@@ -240,11 +247,21 @@ class Scheduler(SchedulerInterface):
# not finished yet.
# 2. The encoder budget is exhausted.
# 3. The encoder cache is exhausted.
# NOTE(woosuk): Here, by doing `continue` instead of `break`,
# we do not strictly follow the FCFS scheduling policy and
# allow the lower-priority requests to be scheduled.
req_index += 1
continue
if (self.use_pp and self.enable_minimal_injection and self.allow_minimal_injection
and len(self.waiting) == 0 and token_budget > 0):
# Ensure we are truly at a boundary (no target tokens)
# and have room to advance without exceeding model len.
if (request.num_tokens_with_spec
== request.num_computed_tokens):
max_additional = (self.max_model_len - 1
- request.num_computed_tokens)
if max_additional > 0:
# Advance with a single decode token.
num_new_tokens = 1
if num_new_tokens == 0:
# Still cannot schedule; allow lower-priority requests.
req_index += 1
continue
num_draft_tokens = max(
num_new_tokens + request.num_computed_tokens -
......@@ -343,6 +360,10 @@ class Scheduler(SchedulerInterface):
request = self.waiting.peek_request()
if request.is_finished():
self.waiting.pop_request()
continue
if request.is_finished():
self.waiting.pop_request()
continue
......@@ -815,7 +836,7 @@ class Scheduler(SchedulerInterface):
request = self.running[req_index]
num_new_tokens = (request.num_tokens_with_spec -
request.num_computed_tokens)
request.num_computed_tokens)
if (0 < self.scheduler_config.long_prefill_token_threshold <
num_new_tokens):
num_new_tokens = (
......
......@@ -265,6 +265,14 @@ class EngineCore:
# the scheduler may return an empty batch if all requests are scheduled.
# Note that this is not blocking.
if not self.batch_queue.full():
# Tell the scheduler to try minimal progress injection only when there is no running batch (queue is empty),
# to avoid unnecessary 0-token batches while a batch is already running.
try:
self.scheduler.set_allow_minimal_injection(
self.batch_queue.empty())
except Exception:
# Schedulers without this hook simply ignore it.
pass
scheduler_output = self.scheduler.schedule()
if scheduler_output.total_num_scheduled_tokens > 0:
future = self.model_executor.execute_model(scheduler_output)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment