Unverified Commit 467d3247 authored by Yuwei An's avatar Yuwei An Committed by GitHub
Browse files

[LMCache] vLLM Block Allocation Event (#38856)


Signed-off-by: default avataryuwei <yuwei@dev.local>
Co-authored-by: default avataryuwei <yuwei@dev.local>
parent e5de19ff
...@@ -29,7 +29,18 @@ try: ...@@ -29,7 +29,18 @@ try:
LMCacheMPWorkerAdapter, LMCacheMPWorkerAdapter,
LoadStoreOp, LoadStoreOp,
) )
try:
from lmcache.v1.multiprocess.custom_types import RequestAllocationRecord
except ImportError:
from lmcache.v1.multiprocess.custom_types import (
BlockAllocationRecord as RequestAllocationRecord,
)
except ImportError: except ImportError:
from lmcache.v1.multiprocess.custom_types import (
BlockAllocationRecord as RequestAllocationRecord,
)
from vllm.distributed.kv_transfer.kv_connector.v1.lmcache_integration import ( from vllm.distributed.kv_transfer.kv_connector.v1.lmcache_integration import (
LMCacheMPSchedulerAdapter, LMCacheMPSchedulerAdapter,
LMCacheMPWorkerAdapter, LMCacheMPWorkerAdapter,
...@@ -837,6 +848,9 @@ class LMCacheMPConnector(KVConnectorBase_V1): ...@@ -837,6 +848,9 @@ class LMCacheMPConnector(KVConnectorBase_V1):
if len(metadata) > 0: if len(metadata) > 0:
logger.debug("Final connector metadata: %s", metadata) logger.debug("Final connector metadata: %s", metadata)
# Report block allocation deltas to LMCache for observability
self._report_block_allocation_deltas(scheduler_output)
return metadata return metadata
def update_connector_output(self, connector_output: KVConnectorOutput): def update_connector_output(self, connector_output: KVConnectorOutput):
...@@ -1007,6 +1021,64 @@ class LMCacheMPConnector(KVConnectorBase_V1): ...@@ -1007,6 +1021,64 @@ class LMCacheMPConnector(KVConnectorBase_V1):
if r_meta is not None: if r_meta is not None:
metadata.add_request_metadata(r_meta) metadata.add_request_metadata(r_meta)
def _report_block_allocation_deltas(
self,
scheduler_output: SchedulerOutput,
) -> None:
"""Gather per-request block allocation deltas and report to LMCache.
For new requests: all allocated_block_ids and token_ids are new.
For cached requests: only newly appended block_ids and token_ids.
"""
records: list[RequestAllocationRecord] = []
# New requests: send all tokens covering all allocated blocks so
# the L0 metrics subscriber can correctly map each block to its
# actual token content (not just the newly-scheduled slice).
for new_request in scheduler_output.scheduled_new_reqs:
tracker = self.request_trackers.get(new_request.req_id)
if tracker is None:
continue
num_blocks = len(tracker.allocated_block_ids)
total_tokens = num_blocks * self.vllm_block_size
records.append(
RequestAllocationRecord(
req_id=new_request.req_id,
new_block_ids=list(tracker.allocated_block_ids),
new_token_ids=list(tracker.all_token_ids[:total_tokens]),
)
)
# Cached requests: only the newly added blocks and their full
# token content. We send all tokens covered by the new blocks
# (not just the tokens scheduled this step) so the L0 subscriber
# can correctly identify block content.
cached_reqs = scheduler_output.scheduled_cached_reqs
for idx, request_id in enumerate(cached_reqs.req_ids):
new_block_ids = reformat_block_ids(cached_reqs.new_block_ids[idx])
if not new_block_ids:
continue
tracker = self.request_trackers.get(request_id)
if tracker is None:
continue
# The new blocks sit at the end of the request's block list.
# Compute the token range they cover.
total_blocks = len(tracker.allocated_block_ids)
num_new_blocks = len(new_block_ids)
start_token = (total_blocks - num_new_blocks) * self.vllm_block_size
end_token = total_blocks * self.vllm_block_size
new_token_ids = list(tracker.all_token_ids[start_token:end_token])
records.append(
RequestAllocationRecord(
req_id=request_id,
new_block_ids=new_block_ids,
new_token_ids=new_token_ids,
)
)
if records:
self.scheduler_adapter.report_block_allocations(records)
def _get_request_tracker(self, request_id: str) -> LMCacheMPRequestTracker: def _get_request_tracker(self, request_id: str) -> LMCacheMPRequestTracker:
assert request_id in self.request_trackers, ( assert request_id in self.request_trackers, (
f"Request tracker for request_id {request_id} not found. " f"Request tracker for request_id {request_id} not found. "
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment