Unverified Commit 2fd5c704 authored by Shangming Cai's avatar Shangming Cai Committed by GitHub
Browse files

[PD] Fix abort_request for PD disaggregation (#8352)


Signed-off-by: default avatarShangming Cai <caishangming@linux.alibaba.com>
Co-authored-by: default avatarybyang <10629930+whybeyoung@users.noreply.github.com>
parent 4ad97370
...@@ -992,6 +992,14 @@ class MooncakeKVSender(BaseKVSender): ...@@ -992,6 +992,14 @@ class MooncakeKVSender(BaseKVSender):
) )
raise KVTransferError(self.bootstrap_room, failure_reason) raise KVTransferError(self.bootstrap_room, failure_reason)
def abort(self):
self.kv_mgr.record_failure(
self.bootstrap_room,
"Aborted by AbortReq.",
)
# Explicitly set the status to failure since this request has been aborted
self.conclude_state = KVPoll.Failed
class MooncakeKVReceiver(BaseKVReceiver): class MooncakeKVReceiver(BaseKVReceiver):
_ctx = zmq.Context() _ctx = zmq.Context()
...@@ -1305,6 +1313,14 @@ class MooncakeKVReceiver(BaseKVReceiver): ...@@ -1305,6 +1313,14 @@ class MooncakeKVReceiver(BaseKVReceiver):
) )
raise KVTransferError(self.bootstrap_room, failure_reason) raise KVTransferError(self.bootstrap_room, failure_reason)
def abort(self):
self.kv_mgr.record_failure(
self.bootstrap_room,
"Aborted by AbortReq.",
)
# Explicitly set the status to failure since this request has been aborted
self.conclude_state = KVPoll.Failed
class MooncakeKVBootstrapServer(BaseKVBootstrapServer): class MooncakeKVBootstrapServer(BaseKVBootstrapServer):
def __init__(self, port: int): def __init__(self, port: int):
......
...@@ -2440,6 +2440,37 @@ class Scheduler( ...@@ -2440,6 +2440,37 @@ class Scheduler(
req.grammar.cancel() req.grammar.cancel()
req.set_finish_with_abort("Aborted by AbortReq.") req.set_finish_with_abort("Aborted by AbortReq.")
# Delete requests not in the waiting queue when PD disaggregation is enabled
if self.disaggregation_mode == DisaggregationMode.PREFILL:
# Abort requests that have not yet been bootstrapped
for i, req in enumerate(self.disagg_prefill_bootstrap_queue.queue):
logger.debug(f"Abort bootstrap queue request. {req.rid=}")
if recv_req.abort_all or req.rid.startswith(recv_req.rid):
if hasattr(req.disagg_kv_sender, "abort"):
req.disagg_kv_sender.abort()
# Abort in-flight requests
for i, req in enumerate(self.disagg_prefill_inflight_queue):
logger.debug(f"Abort inflight queue request. {req.rid=}")
if recv_req.abort_all or req.rid.startswith(recv_req.rid):
if hasattr(req.disagg_kv_sender, "abort"):
req.disagg_kv_sender.abort()
elif self.disaggregation_mode == DisaggregationMode.DECODE:
# Abort requests that have not yet finished preallocation
for i, decode_req in enumerate(self.disagg_decode_prealloc_queue.queue):
logger.debug(f"Abort prealloc queue request. {decode_req.req.rid=}")
if recv_req.abort_all or decode_req.req.rid.startswith(recv_req.rid):
if hasattr(decode_req.kv_receiver, "abort"):
decode_req.kv_receiver.abort()
# Abort requests waiting for kvcache to release tree cache
for i, decode_req in enumerate(self.disagg_decode_transfer_queue.queue):
logger.debug(f"Abort transfer queue request. {decode_req.req.rid=}")
if recv_req.abort_all or decode_req.req.rid.startswith(recv_req.rid):
if hasattr(decode_req.kv_receiver, "abort"):
decode_req.kv_receiver.abort()
# Delete requests in the running batch # Delete requests in the running batch
if self.cur_batch is self.running_batch or self.cur_batch is None: if self.cur_batch is self.running_batch or self.cur_batch is None:
reqs = self.running_batch.reqs reqs = self.running_batch.reqs
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment