Commit 4fc7b5d8 authored by zhuwenwen's avatar zhuwenwen
Browse files

Merge branch 'v0.9.2-dev' of http://10.16.6.30/dcutoolkit/deeplearing/vllm into v0.9.2-dev

parents 2bd4a707 78d3e006
......@@ -175,8 +175,12 @@ if TYPE_CHECKING:
USE_FUSED_SILU_MUL_QUANT: bool = False
VLLM_P2P_ASYNC: bool = False
VLLM_P2P_BUF_TOKENS: int = 30000
<<<<<<< HEAD
VLLM_ENABLE_OUTPUT_PLACEHOLDERS: bool = False
VLLM_USE_PD_SPLIT: bool = False
=======
VLLM_SCHED_ENABLE_MINIMAL_INJECTION: bool = False
>>>>>>> 78d3e006e7ce4f2444e42720d1e1485186312ae3
def get_default_cache_root():
return os.getenv(
......@@ -1143,9 +1147,10 @@ environment_variables: dict[str, Callable[[], Any]] = {
# pd separation p2p async buf tokens
"VLLM_P2P_BUF_TOKENS":
lambda: int(os.getenv("VLLM_P2P_BUF_TOKENS", "30000")),
# vllm will enable output placeholders
"VLLM_ENABLE_OUTPUT_PLACEHOLDERS":
lambda: bool(int(os.getenv("VLLM_ENABLE_OUTPUT_PLACEHOLDERS", "0"))),
# vllm will enable minimal injection for pipeline parallel scheduling
"VLLM_SCHED_ENABLE_MINIMAL_INJECTION":
lambda: (os.getenv("VLLM_SCHED_ENABLE_MINIMAL_INJECTION", "0").lower() in
("true", "1")),
# vLLM will split prefill and decode, not mix up
"VLLM_USE_PD_SPLIT":
lambda: (os.environ.get("VLLM_USE_PD_SPLIT", "True").lower() in
......
......@@ -148,3 +148,6 @@ class SchedulerInterface(ABC):
def get_kv_connector(self) -> Optional["KVConnectorBase_V1"]:
return None
def set_allow_minimal_injection(self, allow: bool) -> None:
return
\ No newline at end of file
......@@ -75,8 +75,7 @@ class Scheduler(SchedulerInterface):
self.enable_kv_cache_events = (
self.kv_events_config is not None
and self.kv_events_config.enable_kv_cache_events)
# Cache the output placeholders feature flag to avoid repeated envs access
self.enable_output_placeholders = envs.VLLM_ENABLE_OUTPUT_PLACEHOLDERS
self.enable_minimal_injection = envs.VLLM_SCHED_ENABLE_MINIMAL_INJECTION
# Create KVConnector for the Scheduler. Note that each Worker
# will have a corresponding KVConnector with Role=WORKER.
......@@ -165,6 +164,11 @@ class Scheduler(SchedulerInterface):
enable_kv_cache_events=self.enable_kv_cache_events,
)
self.use_pp = self.parallel_config.pipeline_parallel_size > 1
self.allow_minimal_injection: bool = False
# Engine hint to gate minimal-progress injection (0-token breaker)
def set_allow_minimal_injection(self, allow: bool) -> None:
self.allow_minimal_injection = allow
def schedule_default(self) -> SchedulerOutput:
# NOTE(woosuk) on the scheduling algorithm:
......@@ -208,11 +212,8 @@ class Scheduler(SchedulerInterface):
while req_index < len(self.running) and token_budget > 0:
request = self.running[req_index]
# Include output placeholders so that decode keeps a minimal
# backlog, avoiding 0-token plateaus when resuming from WAITING
# or immediately after prefill finishes.
num_new_tokens = (request.num_tokens_with_spec +
request.num_output_placeholders -
num_new_tokens = (request.num_tokens_with_spec -
request.num_computed_tokens)
if (0 < self.scheduler_config.long_prefill_token_threshold <
num_new_tokens):
......@@ -243,9 +244,21 @@ class Scheduler(SchedulerInterface):
# not finished yet.
# 2. The encoder budget is exhausted.
# 3. The encoder cache is exhausted.
# Allow lower-priority requests to be scheduled in this step.
req_index += 1
continue
if (self.use_pp and self.enable_minimal_injection and self.allow_minimal_injection
and len(self.waiting) == 0 and token_budget > 0):
# Ensure we are truly at a boundary (no target tokens)
# and have room to advance without exceeding model len.
if (request.num_tokens_with_spec
== request.num_computed_tokens):
max_additional = (self.max_model_len - 1
- request.num_computed_tokens)
if max_additional > 0:
# Advance with a single decode token.
num_new_tokens = 1
if num_new_tokens == 0:
# Still cannot schedule; allow lower-priority requests.
req_index += 1
continue
num_draft_tokens = max(
num_new_tokens + request.num_computed_tokens -
......@@ -813,11 +826,8 @@ class Scheduler(SchedulerInterface):
while req_index < len(self.running) and token_budget > 0:
request = self.running[req_index]
# Include output placeholders to maintain a minimal backlog
# for decode, avoiding 0-token plateaus after prefill or when
# resuming from WAITING under PP/speculative paths.
num_new_tokens = (request.num_tokens_with_spec +
request.num_output_placeholders -
num_new_tokens = (request.num_tokens_with_spec -
request.num_computed_tokens)
if (0 < self.scheduler_config.long_prefill_token_threshold <
num_new_tokens):
......@@ -1037,15 +1047,6 @@ class Scheduler(SchedulerInterface):
request = self.requests[req_id]
request.num_computed_tokens += num_scheduled_token
# 在推进已计算的 token 数量之后,对于本步骤将生成新 token 的请求,
# 增加一个输出占位符。这将创建一个小的赤字,确保即使在赤字本来为0的边界处,
# 随后的调度步骤也能继续安排解码任务。
if self.enable_output_placeholders:
for req_id in scheduler_output.num_scheduled_tokens:
request = self.requests[req_id]
if (request.num_computed_tokens
== request.num_tokens + request.num_output_placeholders):
request.num_output_placeholders += 1
# Clear the finished request IDs.
# NOTE: We shouldn't do self.finished_req_ids.clear() here because
......@@ -1265,12 +1266,6 @@ class Scheduler(SchedulerInterface):
# NOTE: once we support N tokens per step (spec decode),
# the outer lists can be of length > 1.
new_logprobs = logprobs.slice(req_index, req_index + 1)
# 按照本步实际生成的token数量(如果有),减少输出占位符的数量。
if self.enable_output_placeholders:
if new_token_ids:
request.num_output_placeholders = max(
0, request.num_output_placeholders - len(new_token_ids))
if new_token_ids and self.structured_output_manager.should_advance(
request):
......
......@@ -265,6 +265,14 @@ class EngineCore:
# the scheduler may return an empty batch if all requests are scheduled.
# Note that this is not blocking.
if not self.batch_queue.full():
# Tell the scheduler to try minimal progress injection only when there is no running batch (queue is empty),
# to avoid unnecessary 0-token batches while a batch is already running.
try:
self.scheduler.set_allow_minimal_injection(
self.batch_queue.empty())
except Exception:
# Schedulers without this hook simply ignore it.
pass
scheduler_output = self.scheduler.schedule()
if scheduler_output.total_num_scheduled_tokens > 0:
future = self.model_executor.execute_model(scheduler_output)
......
......@@ -82,11 +82,6 @@ class Request:
self.num_generated_token_ids = 0
self.cache_salt: Optional[str] = cache_salt
# 用于异步调度,确保解码过程能够持续推进:
# “输出占位符”的数量,表示我们预计即将生成的token数量。
# 其作用类似一个小的积压队列,用来避免在从WAITING状态恢复或解码初始时,
# 因待生成token数量为0而导致调度停滞的情况。
self.num_output_placeholders = 0
# Multi-modal related
self.mm_positions = multi_modal_placeholders or []
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment