Unverified Commit e38817fa authored by Mark McLoughlin's avatar Mark McLoughlin Committed by GitHub
Browse files

[Core][KV Connector] Remove use of num_cached_tokens in error handling (#38096)


Signed-off-by: default avatarMark McLoughlin <markmc@redhat.com>
parent 72cad44d
......@@ -1326,7 +1326,8 @@ class Scheduler(SchedulerInterface):
# load. Identify affected requests and adjust their computed token
# count to trigger recomputation of the invalid blocks.
failed_kv_load_req_ids = self._handle_invalid_blocks(
kv_connector_output.invalid_block_ids
kv_connector_output.invalid_block_ids,
num_scheduled_tokens,
)
# NOTE(woosuk): As len(num_scheduled_tokens) can be up to 1K or more,
......@@ -2133,6 +2134,7 @@ class Scheduler(SchedulerInterface):
self,
requests: Iterable[Request],
invalid_block_ids: set[int],
num_scheduled_tokens: dict[str, int],
evict_blocks: bool = True,
) -> tuple[set[str], int, set[int]]:
"""
......@@ -2146,6 +2148,7 @@ class Scheduler(SchedulerInterface):
Args:
requests: The set of requests to scan for invalid blocks.
invalid_block_ids: IDs of invalid blocks.
num_scheduled_tokens: req_id -> number of scheduled tokens.
evict_blocks: Whether to collect blocks for eviction (False for
async requests which aren't cached yet).
......@@ -2173,12 +2176,9 @@ class Scheduler(SchedulerInterface):
(req_block_ids,) = self.kv_cache_manager.get_block_ids(req_id)
# We iterate only over blocks that may contain externally computed
# tokens
if request.status == RequestStatus.WAITING_FOR_REMOTE_KVS:
# Async loading. num_computed_tokens does not include new tokens
req_num_computed_tokens = request.num_computed_tokens
else:
# Sync loading. num_computed_tokens includes new tokens
req_num_computed_tokens = request.num_cached_tokens
req_num_computed_tokens = (
request.num_computed_tokens - num_scheduled_tokens.get(req_id, 0)
)
req_num_computed_blocks = (
req_num_computed_tokens + self.block_size - 1
......@@ -2225,15 +2225,17 @@ class Scheduler(SchedulerInterface):
# Currently this only applies to sync loading; Async
# loading does not yet support block sharing
total_affected_tokens += (
request.num_computed_tokens - request.num_cached_tokens
request.num_computed_tokens - req_num_computed_tokens
)
request.num_computed_tokens = request.num_cached_tokens
request.num_computed_tokens = req_num_computed_tokens
affected_req_ids.add(request.request_id)
return affected_req_ids, total_affected_tokens, blocks_to_evict
def _handle_invalid_blocks(self, invalid_block_ids: set[int]) -> set[str]:
def _handle_invalid_blocks(
self, invalid_block_ids: set[int], num_scheduled_tokens: dict[str, int]
) -> set[str]:
"""
Handle requests affected by invalid KV cache blocks.
......@@ -2250,7 +2252,10 @@ class Scheduler(SchedulerInterface):
)
async_failed_req_ids, num_failed_tokens, _ = (
self._update_requests_with_invalid_blocks(
async_load_reqs, invalid_block_ids, evict_blocks=False
async_load_reqs,
invalid_block_ids,
num_scheduled_tokens,
evict_blocks=False,
)
)
......@@ -2260,7 +2265,7 @@ class Scheduler(SchedulerInterface):
# handle sync loads (may be cached, collect blocks for eviction)
sync_failed_req_ids, num_failed_tokens, sync_blocks_to_evict = (
self._update_requests_with_invalid_blocks(
self.running, invalid_block_ids, evict_blocks=True
self.running, invalid_block_ids, num_scheduled_tokens, evict_blocks=True
)
)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment