Commit 78d3e006 authored by zhuwenwen's avatar zhuwenwen
Browse files

Merge branch 'v0.9.2-dev-pp_bug' into 'v0.9.2-dev'

fix:修复pp卡住问题

See merge request dcutoolkit/deeplearing/vllm!245
parents a6088d09 71a40725
...@@ -175,7 +175,7 @@ if TYPE_CHECKING: ...@@ -175,7 +175,7 @@ if TYPE_CHECKING:
USE_FUSED_SILU_MUL_QUANT: bool = False USE_FUSED_SILU_MUL_QUANT: bool = False
VLLM_P2P_ASYNC: bool = False VLLM_P2P_ASYNC: bool = False
VLLM_P2P_BUF_TOKENS: int = 30000 VLLM_P2P_BUF_TOKENS: int = 30000
VLLM_ENABLE_OUTPUT_PLACEHOLDERS: bool = False VLLM_SCHED_ENABLE_MINIMAL_INJECTION: bool = False
def get_default_cache_root(): def get_default_cache_root():
return os.getenv( return os.getenv(
...@@ -1143,9 +1143,10 @@ environment_variables: dict[str, Callable[[], Any]] = { ...@@ -1143,9 +1143,10 @@ environment_variables: dict[str, Callable[[], Any]] = {
# pd separation p2p async buf tokens # pd separation p2p async buf tokens
"VLLM_P2P_BUF_TOKENS": "VLLM_P2P_BUF_TOKENS":
lambda: int(os.getenv("VLLM_P2P_BUF_TOKENS", "30000")), lambda: int(os.getenv("VLLM_P2P_BUF_TOKENS", "30000")),
# vllm will enable output placeholders # vllm will enable minimal injection for pipeline parallel scheduling
"VLLM_ENABLE_OUTPUT_PLACEHOLDERS": "VLLM_SCHED_ENABLE_MINIMAL_INJECTION":
lambda: bool(int(os.getenv("VLLM_ENABLE_OUTPUT_PLACEHOLDERS", "0"))), lambda: (os.getenv("VLLM_SCHED_ENABLE_MINIMAL_INJECTION", "0").lower() in
("true", "1")),
} }
# --8<-- [end:env-vars-definition] # --8<-- [end:env-vars-definition]
......
...@@ -148,3 +148,6 @@ class SchedulerInterface(ABC): ...@@ -148,3 +148,6 @@ class SchedulerInterface(ABC):
def get_kv_connector(self) -> Optional["KVConnectorBase_V1"]: def get_kv_connector(self) -> Optional["KVConnectorBase_V1"]:
return None return None
def set_allow_minimal_injection(self, allow: bool) -> None:
return
\ No newline at end of file
...@@ -75,8 +75,7 @@ class Scheduler(SchedulerInterface): ...@@ -75,8 +75,7 @@ class Scheduler(SchedulerInterface):
self.enable_kv_cache_events = ( self.enable_kv_cache_events = (
self.kv_events_config is not None self.kv_events_config is not None
and self.kv_events_config.enable_kv_cache_events) and self.kv_events_config.enable_kv_cache_events)
# Cache the output placeholders feature flag to avoid repeated envs access self.enable_minimal_injection = envs.VLLM_SCHED_ENABLE_MINIMAL_INJECTION
self.enable_output_placeholders = envs.VLLM_ENABLE_OUTPUT_PLACEHOLDERS
# Create KVConnector for the Scheduler. Note that each Worker # Create KVConnector for the Scheduler. Note that each Worker
# will have a corresponding KVConnector with Role=WORKER. # will have a corresponding KVConnector with Role=WORKER.
...@@ -165,6 +164,11 @@ class Scheduler(SchedulerInterface): ...@@ -165,6 +164,11 @@ class Scheduler(SchedulerInterface):
enable_kv_cache_events=self.enable_kv_cache_events, enable_kv_cache_events=self.enable_kv_cache_events,
) )
self.use_pp = self.parallel_config.pipeline_parallel_size > 1 self.use_pp = self.parallel_config.pipeline_parallel_size > 1
self.allow_minimal_injection: bool = False
# Engine hint to gate minimal-progress injection (0-token breaker)
def set_allow_minimal_injection(self, allow: bool) -> None:
self.allow_minimal_injection = allow
def schedule_default(self) -> SchedulerOutput: def schedule_default(self) -> SchedulerOutput:
# NOTE(woosuk) on the scheduling algorithm: # NOTE(woosuk) on the scheduling algorithm:
...@@ -208,11 +212,8 @@ class Scheduler(SchedulerInterface): ...@@ -208,11 +212,8 @@ class Scheduler(SchedulerInterface):
while req_index < len(self.running) and token_budget > 0: while req_index < len(self.running) and token_budget > 0:
request = self.running[req_index] request = self.running[req_index]
# Include output placeholders so that decode keeps a minimal
# backlog, avoiding 0-token plateaus when resuming from WAITING num_new_tokens = (request.num_tokens_with_spec -
# or immediately after prefill finishes.
num_new_tokens = (request.num_tokens_with_spec +
request.num_output_placeholders -
request.num_computed_tokens) request.num_computed_tokens)
if (0 < self.scheduler_config.long_prefill_token_threshold < if (0 < self.scheduler_config.long_prefill_token_threshold <
num_new_tokens): num_new_tokens):
...@@ -243,9 +244,21 @@ class Scheduler(SchedulerInterface): ...@@ -243,9 +244,21 @@ class Scheduler(SchedulerInterface):
# not finished yet. # not finished yet.
# 2. The encoder budget is exhausted. # 2. The encoder budget is exhausted.
# 3. The encoder cache is exhausted. # 3. The encoder cache is exhausted.
# Allow lower-priority requests to be scheduled in this step. if (self.use_pp and self.enable_minimal_injection and self.allow_minimal_injection
req_index += 1 and len(self.waiting) == 0 and token_budget > 0):
continue # Ensure we are truly at a boundary (no target tokens)
# and have room to advance without exceeding model len.
if (request.num_tokens_with_spec
== request.num_computed_tokens):
max_additional = (self.max_model_len - 1
- request.num_computed_tokens)
if max_additional > 0:
# Advance with a single decode token.
num_new_tokens = 1
if num_new_tokens == 0:
# Still cannot schedule; allow lower-priority requests.
req_index += 1
continue
num_draft_tokens = max( num_draft_tokens = max(
num_new_tokens + request.num_computed_tokens - num_new_tokens + request.num_computed_tokens -
...@@ -813,11 +826,8 @@ class Scheduler(SchedulerInterface): ...@@ -813,11 +826,8 @@ class Scheduler(SchedulerInterface):
while req_index < len(self.running) and token_budget > 0: while req_index < len(self.running) and token_budget > 0:
request = self.running[req_index] request = self.running[req_index]
# Include output placeholders to maintain a minimal backlog
# for decode, avoiding 0-token plateaus after prefill or when num_new_tokens = (request.num_tokens_with_spec -
# resuming from WAITING under PP/speculative paths.
num_new_tokens = (request.num_tokens_with_spec +
request.num_output_placeholders -
request.num_computed_tokens) request.num_computed_tokens)
if (0 < self.scheduler_config.long_prefill_token_threshold < if (0 < self.scheduler_config.long_prefill_token_threshold <
num_new_tokens): num_new_tokens):
...@@ -1037,15 +1047,6 @@ class Scheduler(SchedulerInterface): ...@@ -1037,15 +1047,6 @@ class Scheduler(SchedulerInterface):
request = self.requests[req_id] request = self.requests[req_id]
request.num_computed_tokens += num_scheduled_token request.num_computed_tokens += num_scheduled_token
# 在推进已计算的 token 数量之后,对于本步骤将生成新 token 的请求,
# 增加一个输出占位符。这将创建一个小的赤字,确保即使在赤字本来为0的边界处,
# 随后的调度步骤也能继续安排解码任务。
if self.enable_output_placeholders:
for req_id in scheduler_output.num_scheduled_tokens:
request = self.requests[req_id]
if (request.num_computed_tokens
== request.num_tokens + request.num_output_placeholders):
request.num_output_placeholders += 1
# Clear the finished request IDs. # Clear the finished request IDs.
# NOTE: We shouldn't do self.finished_req_ids.clear() here because # NOTE: We shouldn't do self.finished_req_ids.clear() here because
...@@ -1265,12 +1266,6 @@ class Scheduler(SchedulerInterface): ...@@ -1265,12 +1266,6 @@ class Scheduler(SchedulerInterface):
# NOTE: once we support N tokens per step (spec decode), # NOTE: once we support N tokens per step (spec decode),
# the outer lists can be of length > 1. # the outer lists can be of length > 1.
new_logprobs = logprobs.slice(req_index, req_index + 1) new_logprobs = logprobs.slice(req_index, req_index + 1)
# 按照本步实际生成的token数量(如果有),减少输出占位符的数量。
if self.enable_output_placeholders:
if new_token_ids:
request.num_output_placeholders = max(
0, request.num_output_placeholders - len(new_token_ids))
if new_token_ids and self.structured_output_manager.should_advance( if new_token_ids and self.structured_output_manager.should_advance(
request): request):
......
...@@ -265,6 +265,14 @@ class EngineCore: ...@@ -265,6 +265,14 @@ class EngineCore:
# the scheduler may return an empty batch if all requests are scheduled. # the scheduler may return an empty batch if all requests are scheduled.
# Note that this is not blocking. # Note that this is not blocking.
if not self.batch_queue.full(): if not self.batch_queue.full():
# Tell the scheduler to try minimal progress injection only when there is no running batch (queue is empty),
# to avoid unnecessary 0-token batches while a batch is already running.
try:
self.scheduler.set_allow_minimal_injection(
self.batch_queue.empty())
except Exception:
# Schedulers without this hook simply ignore it.
pass
scheduler_output = self.scheduler.schedule() scheduler_output = self.scheduler.schedule()
if scheduler_output.total_num_scheduled_tokens > 0: if scheduler_output.total_num_scheduled_tokens > 0:
future = self.model_executor.execute_model(scheduler_output) future = self.model_executor.execute_model(scheduler_output)
......
...@@ -82,11 +82,6 @@ class Request: ...@@ -82,11 +82,6 @@ class Request:
self.num_generated_token_ids = 0 self.num_generated_token_ids = 0
self.cache_salt: Optional[str] = cache_salt self.cache_salt: Optional[str] = cache_salt
# 用于异步调度,确保解码过程能够持续推进:
# “输出占位符”的数量,表示我们预计即将生成的token数量。
# 其作用类似一个小的积压队列,用来避免在从WAITING状态恢复或解码初始时,
# 因待生成token数量为0而导致调度停滞的情况。
self.num_output_placeholders = 0
# Multi-modal related # Multi-modal related
self.mm_positions = multi_modal_placeholders or [] self.mm_positions = multi_modal_placeholders or []
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment