Unverified Commit a5b17fba authored by Ronen Schaffer's avatar Ronen Schaffer Committed by GitHub
Browse files

[KV Offload] Implement `shutdown()` in `OffloadingConnector` and related classes (#39182)


Signed-off-by: default avatarRonen Schaffer <ronen.schaffer@ibm.com>
parent c48b2b83
...@@ -428,3 +428,6 @@ class OffloadingConnectorScheduler: ...@@ -428,3 +428,6 @@ class OffloadingConnectorScheduler:
medium=event.medium, medium=event.medium,
lora_name=None, lora_name=None,
) )
def shutdown(self) -> None:
self.manager.shutdown()
...@@ -394,3 +394,13 @@ class OffloadingConnectorWorker: ...@@ -394,3 +394,13 @@ class OffloadingConnectorWorker:
kv_connector_stats = self.kv_connector_stats kv_connector_stats = self.kv_connector_stats
self.kv_connector_stats = OffloadingConnectorStats() self.kv_connector_stats = OffloadingConnectorStats()
return kv_connector_stats return kv_connector_stats
def shutdown(self) -> None:
# Drop deferred store jobs: there is no point in submitting
# them during shutdown.
self._unsubmitted_store_jobs.clear()
self._jobs.clear()
self._load_job.clear()
self._store_jobs.clear()
self._finished_reqs_waiting_for_store.clear()
self.worker.shutdown()
...@@ -64,6 +64,12 @@ class OffloadingConnector(KVConnectorBase_V1): ...@@ -64,6 +64,12 @@ class OffloadingConnector(KVConnectorBase_V1):
elif role == KVConnectorRole.WORKER: elif role == KVConnectorRole.WORKER:
self.connector_worker = OffloadingConnectorWorker(spec) self.connector_worker = OffloadingConnectorWorker(spec)
def shutdown(self) -> None:
if self.connector_worker is not None:
self.connector_worker.shutdown()
if self.connector_scheduler is not None:
self.connector_scheduler.shutdown()
def register_kv_caches(self, kv_caches: dict[str, torch.Tensor]): def register_kv_caches(self, kv_caches: dict[str, torch.Tensor]):
assert self.connector_worker is not None assert self.connector_worker is not None
self.connector_worker.register_kv_caches(kv_caches) self.connector_worker.register_kv_caches(kv_caches)
......
...@@ -178,3 +178,7 @@ class OffloadingManager(ABC): ...@@ -178,3 +178,7 @@ class OffloadingManager(ABC):
New OffloadingEvents collected since the last call. New OffloadingEvents collected since the last call.
""" """
return () return ()
def shutdown(self) -> None:
"""Shutdown the manager and release any resources."""
return
...@@ -274,6 +274,14 @@ class SingleDirectionOffloadingHandler(OffloadingHandler): ...@@ -274,6 +274,14 @@ class SingleDirectionOffloadingHandler(OffloadingHandler):
if event is not None: if event is not None:
event.synchronize() event.synchronize()
def shutdown(self) -> None:
while self._transfers:
transfer = self._transfers.popleft()
transfer.end_event.synchronize()
self._transfer_events.clear()
self._stream_pool.clear()
self._event_pool.clear()
class CpuGpuOffloadingHandlers: class CpuGpuOffloadingHandlers:
def __init__( def __init__(
......
...@@ -69,6 +69,10 @@ class OffloadingHandler(ABC): ...@@ -69,6 +69,10 @@ class OffloadingHandler(ABC):
job_ids: The set of job IDs to wait for. job_ids: The set of job IDs to wait for.
""" """
def shutdown(self) -> None:
"""Shutdown the handler and release any resources."""
return
class OffloadingWorker: class OffloadingWorker:
""" """
...@@ -166,3 +170,7 @@ class OffloadingWorker: ...@@ -166,3 +170,7 @@ class OffloadingWorker:
""" """
for handler in self.handlers: for handler in self.handlers:
handler.wait(job_ids) handler.wait(job_ids)
def shutdown(self) -> None:
for handler in self.handlers:
handler.shutdown()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment