Commit 83750a84 authored by maxiang's avatar maxiang
Browse files

[TMP] 临时修改P节点 关于MTP new_token计算

parent 771ed2c4
......@@ -315,6 +315,17 @@ class Scheduler(SchedulerInterface):
pass
return num_new_tokens
def _kv_connector_lookahead_for_waiting(self, request: Request) -> int:
if self.connector is not None and self.connector.is_producer:
return 0
return 0 if request.num_computed_tokens == 0 else self.num_lookahead_tokens
def _kv_connector_lookahead_for_running(self) -> int:
if self.connector is not None and self.connector.is_producer:
return 0
return self.num_lookahead_tokens
def schedule_default(self) -> SchedulerOutput:
# NOTE(woosuk) on the scheduling algorithm:
# There's no "decoding phase" nor "prefill phase" in the scheduler.
......@@ -442,7 +453,7 @@ class Scheduler(SchedulerInterface):
new_blocks = self.kv_cache_manager.allocate_slots(
request,
num_new_tokens,
num_lookahead_tokens=self.num_lookahead_tokens,
num_lookahead_tokens=self._kv_connector_lookahead_for_running(),
)
if new_blocks is not None:
......@@ -480,6 +491,7 @@ class Scheduler(SchedulerInterface):
encoder_compute_budget += num_embeds_to_restore
req_index -= 1
else:
#从运行队列中弹出, 强占
preempted_req = self.running.pop()
self._preempt_request(preempted_req, scheduled_timestamp)
......@@ -564,6 +576,7 @@ class Scheduler(SchedulerInterface):
continue
# KVTransfer: skip request if still waiting for remote kvs.
if request.status == RequestStatus.WAITING_FOR_REMOTE_KVS:
#检查kv cache是否已经传输完毕
is_ready = self._update_waiting_for_remote_kv(request)
if is_ready:
if request.num_preemptions:
......@@ -577,6 +590,7 @@ class Scheduler(SchedulerInterface):
"%s is still in WAITING_FOR_REMOTE_KVS state.",
request.request_id,
)
#如果依然没有传输完毕, 将request设置为可跳过的
self.waiting.pop_request()
skipped_waiting_requests.prepend_request(request)
continue
......@@ -667,7 +681,9 @@ class Scheduler(SchedulerInterface):
# We use `request.num_tokens` instead of
# `request.num_prompt_tokens` to consider the resumed
# requests, which have output tokens.
#总prompt tokens - 已计算tokens
num_new_tokens = request.num_tokens - num_computed_tokens
#长prefill token阈值 max-num-batched-tokens
threshold = self.scheduler_config.long_prefill_token_threshold
if 0 < threshold < num_new_tokens:
num_new_tokens = threshold
......@@ -702,7 +718,7 @@ class Scheduler(SchedulerInterface):
if num_new_tokens == 0:
# The request cannot be scheduled.
break
#区分FA 与 mamba , block size 意义不同
if self.need_mamba_block_aligned_split:
num_new_tokens = self._mamba_block_aligned_split(
request,
......@@ -718,16 +734,16 @@ class Scheduler(SchedulerInterface):
# extra block gets allocated which
# creates a mismatch between the number
# of local and remote blocks.
effective_lookahead_tokens = (
0 if request.num_computed_tokens == 0 else self.num_lookahead_tokens
effective_lookahead_tokens = self._kv_connector_lookahead_for_waiting(
request
)
num_encoder_tokens = (
self._num_encoder_max_input_tokens
if self.is_encoder_decoder and request.has_encoder_inputs
else 0
)
#分配新的blocks, 这里只有memory pool已经没有free block才返回None,
# 如果是不需要分配新block, new_blocks是空列表
new_blocks = self.kv_cache_manager.allocate_slots(
request,
num_new_tokens,
......@@ -755,6 +771,7 @@ class Scheduler(SchedulerInterface):
if self.connector is not None:
self.connector.update_state_after_alloc(
request,
#获取这个请求的全量 blocks
self.kv_cache_manager.get_blocks(request.request_id),
num_external_computed_tokens,
)
......@@ -779,12 +796,14 @@ class Scheduler(SchedulerInterface):
if request.status == RequestStatus.WAITING:
scheduled_new_reqs.append(request)
elif request.status == RequestStatus.PREEMPTED:
#恢复的请求
scheduled_resumed_reqs.append(request)
else:
raise RuntimeError(f"Invalid request status: {request.status}")
if self.lora_config and request.lora_request:
scheduled_loras.add(request.lora_request.lora_int_id)
#这里会将恢复或者新请求 重新分配 block
req_to_new_blocks[request.request_id] = (
self.kv_cache_manager.get_blocks(request.request_id)
)
......@@ -951,6 +970,7 @@ class Scheduler(SchedulerInterface):
request = self.waiting.peek_request()
# KVTransfer: skip request if still waiting for remote kvs.
#如果该请求为等待远端KV CACHE
if request.status == RequestStatus.WAITING_FOR_REMOTE_KVS:
is_ready = self._update_waiting_for_remote_kv(request)
if is_ready:
......@@ -1101,13 +1121,8 @@ class Scheduler(SchedulerInterface):
if num_new_tokens == 0:
break
# Handles an edge case when P/D Disaggregation
# is used with Spec Decoding where an
# extra block gets allocated which
# creates a mismatch between the number
# of local and remote blocks.
effective_lookahead_tokens = (
0 if request.num_computed_tokens == 0 else self.num_lookahead_tokens
effective_lookahead_tokens = self._kv_connector_lookahead_for_waiting(
request
)
num_encoder_tokens = (
......@@ -1160,6 +1175,7 @@ class Scheduler(SchedulerInterface):
self._update_connector_prefix_cache_stats(request)
self.running.append(request)
if self.log_stats:
request.record_event(
EngineCoreEventType.SCHEDULED, scheduled_timestamp
......@@ -1173,9 +1189,11 @@ class Scheduler(SchedulerInterface):
if self.lora_config and request.lora_request:
scheduled_loras.add(request.lora_request.lora_int_id)
req_to_new_blocks[request.request_id] = (
self.kv_cache_manager.get_blocks(request.request_id)
)
num_scheduled_tokens[request.request_id] = num_new_tokens
token_budget -= num_new_tokens
request.status = RequestStatus.RUNNING
......@@ -1203,6 +1221,7 @@ class Scheduler(SchedulerInterface):
self.waiting.prepend_requests(skipped_waiting_requests)
# Next, schedule the RUNNING requests.
#只有当本轮没有调度任何 WAITING/RESUMED 请求时,才进入 RUNNING 调度
if not scheduled_new_reqs and not scheduled_resumed_reqs:
req_index = 0
while req_index < len(self.running) and token_budget > 0:
......@@ -1232,10 +1251,11 @@ class Scheduler(SchedulerInterface):
continue
num_new_tokens = (
request.num_tokens_with_spec
request.num_tokens_with_spec # 不开MTP = prompt + output + 1 / prompt + output + 2
+ request.num_output_placeholders
- request.num_computed_tokens
- request.num_computed_tokens #prompt token + output - 1
)
if 0 < self.scheduler_config.long_prefill_token_threshold < num_new_tokens:
num_new_tokens = self.scheduler_config.long_prefill_token_threshold
num_new_tokens = min(num_new_tokens, token_budget)
......@@ -1284,16 +1304,18 @@ class Scheduler(SchedulerInterface):
# NOTE(woosuk): Here, by doing `continue` instead of `break`,
# we do not strictly follow the FCFS scheduling policy and
# allow the lower-priority requests to be scheduled.
print("xiang ----- new_tokens == 0")
req_index += 1
continue
# Schedule newly needed KV blocks for the request.
with record_function_or_nullcontext("schedule: allocate_slots"):
while True:
#给Runing请求分配block
new_blocks = self.kv_cache_manager.allocate_slots(
request,
num_new_tokens,
num_lookahead_tokens=self.num_lookahead_tokens,
num_lookahead_tokens=self._kv_connector_lookahead_for_running(),
)
if new_blocks is not None:
......@@ -1512,8 +1534,10 @@ class Scheduler(SchedulerInterface):
assert request.status == RequestStatus.RUNNING, (
"Only running requests can be preempted"
)
#释放所有block
self.kv_cache_manager.free(request)
self.encoder_cache_manager.free(request)
#请求状态设置为被抢占
request.status = RequestStatus.PREEMPTED
request.num_computed_tokens = 0
request.spec_token_ids.clear()
......@@ -1522,6 +1546,7 @@ class Scheduler(SchedulerInterface):
request.record_event(EngineCoreEventType.PREEMPTED, timestamp)
# Put the request back to the waiting queue.
#放入waiting队列队头
self.waiting.prepend_request(request)
def _update_after_schedule(self, scheduler_output: SchedulerOutput) -> None:
......@@ -1633,12 +1658,17 @@ class Scheduler(SchedulerInterface):
scheduled_in_prev_step = req_id in self.prev_step_scheduled_req_ids
if idx >= num_running_reqs:
assert not scheduled_in_prev_step
#这里是恢复的请求ID
resumed_req_ids.add(req_id)
if not scheduled_in_prev_step:
all_token_ids[req_id] = req.all_token_ids.copy()
#这里加入新分配的block ids
new_block_ids.append(
req_to_new_blocks[req_id].get_block_ids(allow_none=True)
)
print("new_block_ids : ", new_block_ids)
num_computed_tokens.append(req.num_computed_tokens)
num_output_tokens.append(
req.num_output_tokens + req.num_output_placeholders
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment