Commit 327fdf18 authored by zhuwenwen's avatar zhuwenwen
Browse files

Merge branch 'v0.9.2-dev-ds-pp-balance' into 'v0.9.2-dev-ds'

feat: pipeline_parallel新增pp域请求数均衡,VLLM_USE_PP_BALANCE控制,默认开启

See merge request dcutoolkit/deeplearing/vllm!267
parents 18a43696 f4b01cd4
......@@ -177,6 +177,7 @@ if TYPE_CHECKING:
VLLM_P2P_BUF_TOKENS: int = 30000
VLLM_SCHED_ENABLE_MINIMAL_INJECTION: bool = False
VLLM_USE_PD_SPLIT: bool = False
VLLM_USE_PP_BALANCE: bool = False
def get_default_cache_root():
return os.getenv(
......@@ -1094,7 +1095,7 @@ environment_variables: dict[str, Callable[[], Any]] = {
"VLLM_USE_FLASH_ATTN_PA":
lambda: (os.environ.get("VLLM_USE_FLASH_ATTN_PA", "True").lower() in
("true", "1")),
# vLLM will use apex for rmsnorm
"VLLM_USE_APEX_RN":
lambda: (os.environ.get("VLLM_USE_APEX_RN", "False").lower() in
......@@ -1130,7 +1131,7 @@ environment_variables: dict[str, Callable[[], Any]] = {
# vLLM will use lightop moe_align_block_size
"VLLM_USE_LIGHTOP_MOE_ALIGN":
lambda: (os.environ.get("VLLM_USE_LIGHTOP_MOE_ALIGN", "True").lower() in
("true", "1")),
("true", "1")),
# vLLM will use opt merge_aatn_states, not triton
"VLLM_USE_MERGE_ATTN_STATES_OPT":
lambda: (os.environ.get("VLLM_USE_MERGE_ATTN_STATES_OPT", "True").lower() in
......@@ -1153,20 +1154,23 @@ environment_variables: dict[str, Callable[[], Any]] = {
# vllm pd separation will be used async
"VLLM_P2P_ASYNC":
lambda: bool(int(os.getenv("VLLM_P2P_ASYNC", "0"))),
# pd separation p2p async buf tokens
"VLLM_P2P_BUF_TOKENS":
lambda: int(os.getenv("VLLM_P2P_BUF_TOKENS", "30000")),
lambda: int(os.getenv("VLLM_P2P_BUF_TOKENS", "30000")),
# vllm will enable minimal injection for pipeline parallel scheduling
"VLLM_SCHED_ENABLE_MINIMAL_INJECTION":
lambda: (os.getenv("VLLM_SCHED_ENABLE_MINIMAL_INJECTION", "0").lower() in
("true", "1")),
# vLLM will split prefill and decode, not mix up
"VLLM_USE_PD_SPLIT":
lambda: (os.environ.get("VLLM_USE_PD_SPLIT", "True").lower() in
("true", "1")),
("true", "1")),
"VLLM_USE_PP_BALANCE":
lambda: (os.getenv('VLLM_USE_PP_BALANCE', '1').lower() in
("true", "1")),
}
# --8<-- [end:env-vars-definition]
......
......@@ -213,9 +213,17 @@ class Scheduler(SchedulerInterface):
# First, schedule the RUNNING requests.
req_index = 0
if envs.VLLM_USE_PP_BALANCE and self.use_pp:
pipeline_size = self.parallel_config.pipeline_parallel_size
max_batch_running = (len(self.waiting) + len(self.running) + pipeline_size - 1 ) // pipeline_size
while req_index < len(self.running) and token_budget > 0:
request = self.running[req_index]
if (envs.VLLM_USE_PP_BALANCE and self.use_pp and
len(scheduled_new_reqs) + len(scheduled_resumed_reqs)
+ len(scheduled_running_reqs) >= max_batch_running):
break
num_new_tokens = (request.num_tokens_with_spec -
request.num_computed_tokens)
if (0 < self.scheduler_config.long_prefill_token_threshold <
......@@ -358,8 +366,13 @@ class Scheduler(SchedulerInterface):
if len(self.running) == self.max_num_running_reqs:
break
if (envs.VLLM_USE_PP_BALANCE and self.use_pp and
len(scheduled_new_reqs) + len(scheduled_resumed_reqs)
+ len(scheduled_running_reqs) >= max_batch_running):
break
request = self.waiting.peek_request()
if request.is_finished():
self.waiting.pop_request()
continue
......@@ -648,11 +661,18 @@ class Scheduler(SchedulerInterface):
skipped_waiting_requests = create_request_queue(self.policy)
req_index = len(self.running)
if envs.VLLM_USE_PP_BALANCE and self.use_pp:
pipeline_size = self.parallel_config.pipeline_parallel_size
max_batch_running = (len(self.waiting) + len(self.running) + pipeline_size - 1 ) // pipeline_size
# First, schedule the WAITING requests.
while self.waiting and token_budget > 0:
if len(self.running) == self.max_num_running_reqs:
break
#TODO:考虑到decode过程中来新请求时,可以一次性处理所有请求的prefill 也许schedule the WAITING requests 中取消pp平衡效果更好
if (envs.VLLM_USE_PP_BALANCE and self.use_pp and
len(scheduled_new_reqs) + len(scheduled_resumed_reqs)
+ len(scheduled_running_reqs) >= max_batch_running):
break
request = self.waiting.peek_request()
# KVTransfer: skip request if still waiting for remote kvs.
......@@ -833,6 +853,10 @@ class Scheduler(SchedulerInterface):
if not scheduled_new_reqs and not scheduled_resumed_reqs:
req_index = 0
while req_index < len(self.running) and token_budget > 0:
if (envs.VLLM_USE_PP_BALANCE and self.use_pp and
len(scheduled_new_reqs) + len(scheduled_resumed_reqs)
+ len(scheduled_running_reqs) >= max_batch_running):
break
request = self.running[req_index]
num_new_tokens = (request.num_tokens_with_spec -
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment