Commit 5af06ae0 authored by laibao's avatar laibao
Browse files

Add PD-specific scheduling controls for resume cap and self-limit

parent 8d6b0b0a
...@@ -177,6 +177,10 @@ if TYPE_CHECKING: ...@@ -177,6 +177,10 @@ if TYPE_CHECKING:
VLLM_P2P_BUF_TOKENS: int = 30000 VLLM_P2P_BUF_TOKENS: int = 30000
VLLM_SCHED_ENABLE_MINIMAL_INJECTION: bool = False VLLM_SCHED_ENABLE_MINIMAL_INJECTION: bool = False
VLLM_USE_PD_SPLIT: bool = False VLLM_USE_PD_SPLIT: bool = False
# PD-specific scheduling fixes (resume-cap + self-limit). Controlled by
# a single env var VLLM_SCHED_ENABLE_PD_FIX.
VLLM_SCHED_ENABLE_PD_RESUME_CAP: bool = True
VLLM_SCHED_ENABLE_PD_SELF_LIMIT: bool = True
def get_default_cache_root(): def get_default_cache_root():
return os.getenv( return os.getenv(
...@@ -1150,7 +1154,17 @@ environment_variables: dict[str, Callable[[], Any]] = { ...@@ -1150,7 +1154,17 @@ environment_variables: dict[str, Callable[[], Any]] = {
# vLLM will split prefill and decode, not mix up # vLLM will split prefill and decode, not mix up
"VLLM_USE_PD_SPLIT": "VLLM_USE_PD_SPLIT":
lambda: (os.environ.get("VLLM_USE_PD_SPLIT", "True").lower() in lambda: (os.environ.get("VLLM_USE_PD_SPLIT", "True").lower() in
("true", "1")), ("true", "1")),
# PD-specific scheduling fixes (resume-cap + self-limit) are controlled by
# a single env var VLLM_SCHED_ENABLE_PD_FIX so they can be toggled together.
# Enable PD-specific PREEMPTED replay capping in schedule_split_pd.
"VLLM_SCHED_ENABLE_PD_RESUME_CAP":
lambda: (os.getenv("VLLM_SCHED_ENABLE_PD_FIX", "0").lower() in
("true", "1", "yes", "y", "t")),
# Enable PD-specific self-preemption avoidance + chunk shrinking in RUN.
"VLLM_SCHED_ENABLE_PD_SELF_LIMIT":
lambda: (os.getenv("VLLM_SCHED_ENABLE_PD_FIX", "0").lower() in
("true", "1", "yes", "y", "t")),
} }
# --8<-- [end:env-vars-definition] # --8<-- [end:env-vars-definition]
......
...@@ -76,6 +76,9 @@ class Scheduler(SchedulerInterface): ...@@ -76,6 +76,9 @@ class Scheduler(SchedulerInterface):
self.kv_events_config is not None self.kv_events_config is not None
and self.kv_events_config.enable_kv_cache_events) and self.kv_events_config.enable_kv_cache_events)
self.enable_minimal_injection = envs.VLLM_SCHED_ENABLE_MINIMAL_INJECTION self.enable_minimal_injection = envs.VLLM_SCHED_ENABLE_MINIMAL_INJECTION
# PD-only knobs to gate PREEMPTED replay capping and RUN self-limit.
self.enable_pd_resume_cap = envs.VLLM_SCHED_ENABLE_PD_RESUME_CAP
self.enable_pd_self_limit = envs.VLLM_SCHED_ENABLE_PD_SELF_LIMIT
# Create KVConnector for the Scheduler. Note that each Worker # Create KVConnector for the Scheduler. Note that each Worker
# will have a corresponding KVConnector with Role=WORKER. # will have a corresponding KVConnector with Role=WORKER.
...@@ -724,6 +727,19 @@ class Scheduler(SchedulerInterface): ...@@ -724,6 +727,19 @@ class Scheduler(SchedulerInterface):
< num_new_tokens): < num_new_tokens):
num_new_tokens = ( num_new_tokens = (
self.scheduler_config.long_prefill_token_threshold) self.scheduler_config.long_prefill_token_threshold)
# For PREEMPTED requests (resuming after preemption), we can
# optionally reduce the per-step replay size to mitigate very
# large replay batches that can stall pipeline parallelism.
# This behavior is gated by an env knob.
if (self.enable_pd_resume_cap
and request.status == RequestStatus.PREEMPTED):
new_num = max(num_new_tokens // 2, 1)
# if self.log_stats and new_num < num_new_tokens:
# logger.info(
# "PD resume-cap: halving PREEMPTED request %s "
# "chunk from %d to %d tokens",
# request.request_id, num_new_tokens, new_num)
num_new_tokens = new_num
# chunked prefill has to be enabled explicitly to allow # chunked prefill has to be enabled explicitly to allow
# pooling requests to be chunked # pooling requests to be chunked
...@@ -875,28 +891,74 @@ class Scheduler(SchedulerInterface): ...@@ -875,28 +891,74 @@ class Scheduler(SchedulerInterface):
num_draft_tokens=num_draft_tokens, num_draft_tokens=num_draft_tokens,
num_lookahead_tokens=self.num_lookahead_tokens) num_lookahead_tokens=self.num_lookahead_tokens)
if new_blocks is None: if new_blocks is None:
# The request cannot be scheduled. # The request cannot be scheduled with the current KV
# Preempt the lowest-priority request. # allocation. Try to preempt the lowest-priority
# request. Optionally avoid self-preemption by
# shrinking the chunk for the current request when it
# would otherwise be chosen as the victim.
if not self.running:
can_schedule = False
break
if self.policy == SchedulingPolicy.PRIORITY: if self.policy == SchedulingPolicy.PRIORITY:
preempted_req = max( candidate = max(
self.running, self.running,
key=lambda r: (r.priority, r.arrival_time), key=lambda r: (r.priority, r.arrival_time),
) )
self.running.remove(preempted_req)
else: else:
preempted_req = self.running.pop() candidate = self.running[-1]
self.kv_cache_manager.free(preempted_req) if candidate is request:
preempted_req.status = RequestStatus.PREEMPTED if self.enable_pd_self_limit:
preempted_req.num_computed_tokens = 0 # Avoid self-preemption: shrink this request's
if self.log_stats: # chunk to half of the remaining work and
preempted_req.record_event( # retry.
EngineCoreEventType.PREEMPTED, scheduled_timestamp) new_num = max(num_new_tokens // 2, 1)
if new_num < num_new_tokens:
self.waiting.prepend_request(preempted_req) num_new_tokens = new_num
preempted_reqs.append(preempted_req) num_draft_tokens = max(
if preempted_req == request: num_new_tokens
# No more request to preempt. + request.num_computed_tokens
- request.num_tokens, 0)
# Retry allocation with the smaller chunk
# size.
continue
# Already at the minimal chunk; cannot
# schedule more tokens for this request this
# round without self-preempting. Skip giving
# it new tokens.
# if self.log_stats:
# logger.info(
# "PD self-limit: cannot schedule request "
# "%s with %d new tokens without "
# "self-preempt; skipping new tokens "
# "this round",
# request.request_id, num_new_tokens)
can_schedule = False
break
# self.enable_pd_self_limit is False: fall back to
# the original behavior of self-preempting this
# request and then stopping scheduling for it in
# this step.
preempted_req = candidate
if self.policy == SchedulingPolicy.PRIORITY:
self.running.remove(preempted_req)
else:
self.running.pop()
self.kv_cache_manager.free(preempted_req)
preempted_req.status = RequestStatus.PREEMPTED
preempted_req.num_computed_tokens = 0
if self.log_stats:
preempted_req.record_event(
EngineCoreEventType.PREEMPTED,
scheduled_timestamp)
self.waiting.prepend_request(preempted_req)
self.preempted_req_ids.add(preempted_req.request_id)
preempted_reqs.append(preempted_req)
can_schedule = False can_schedule = False
break break
else: else:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment