Unverified Commit 2ef69456 authored by Yuwei An's avatar Yuwei An Committed by GitHub
Browse files

[LMCache] Fault Tolerance Mechanism (#36586)


Signed-off-by: default avatarOasis-Git <ayw.sirius19@gmail.com>
parent 17852aa5
...@@ -101,7 +101,11 @@ def extract_world_size_and_kv_rank( ...@@ -101,7 +101,11 @@ def extract_world_size_and_kv_rank(
def create_scheduler_adapter( def create_scheduler_adapter(
server_url: str, zmq_context: zmq.Context, vllm_config: VllmConfig server_url: str,
zmq_context: zmq.Context,
vllm_config: VllmConfig,
mq_timeout: float,
heartbeat_interval: float,
) -> LMCacheMPSchedulerAdapter: ) -> LMCacheMPSchedulerAdapter:
world_size, kv_rank = extract_world_size_and_kv_rank( world_size, kv_rank = extract_world_size_and_kv_rank(
vllm_config.parallel_config.world_size, vllm_config.parallel_config.world_size,
...@@ -123,12 +127,18 @@ def create_scheduler_adapter( ...@@ -123,12 +127,18 @@ def create_scheduler_adapter(
world_size, world_size,
kv_rank, kv_rank,
vllm_config.cache_config.block_size, vllm_config.cache_config.block_size,
mq_timeout=mq_timeout,
heartbeat_interval=heartbeat_interval,
**kwargs, **kwargs,
) )
def create_worker_adapter( def create_worker_adapter(
server_url: str, zmq_context: zmq.Context, vllm_config: VllmConfig server_url: str,
zmq_context: zmq.Context,
vllm_config: VllmConfig,
mq_timeout: float,
heartbeat_interval: float,
) -> LMCacheMPWorkerAdapter: ) -> LMCacheMPWorkerAdapter:
world_size, kv_rank = extract_world_size_and_kv_rank( world_size, kv_rank = extract_world_size_and_kv_rank(
vllm_config.parallel_config.world_size, vllm_config.parallel_config.world_size,
...@@ -142,6 +152,8 @@ def create_worker_adapter( ...@@ -142,6 +152,8 @@ def create_worker_adapter(
world_size, world_size,
kv_rank, kv_rank,
vllm_config.cache_config.block_size, vllm_config.cache_config.block_size,
mq_timeout=mq_timeout,
heartbeat_interval=heartbeat_interval,
) )
...@@ -413,6 +425,9 @@ class LMCacheMPConnector(KVConnectorBase_V1): ...@@ -413,6 +425,9 @@ class LMCacheMPConnector(KVConnectorBase_V1):
Extra configs (kv_transfer_config.extra_config): Extra configs (kv_transfer_config.extra_config):
- lmcache.mp.host: the host of the LMCache server. - lmcache.mp.host: the host of the LMCache server.
- lmcache.mp.port: the port of the LMCache server. - lmcache.mp.port: the port of the LMCache server.
- lmcache.mp.mq_timeout: timeout (seconds) for message queue requests.
- lmcache.mp.heartbeat_interval: interval (seconds) between server
heartbeat pings.
""" """
def __init__( def __init__(
...@@ -430,17 +445,35 @@ class LMCacheMPConnector(KVConnectorBase_V1): ...@@ -430,17 +445,35 @@ class LMCacheMPConnector(KVConnectorBase_V1):
server_port = vllm_config.kv_transfer_config.get_from_extra_config( server_port = vllm_config.kv_transfer_config.get_from_extra_config(
"lmcache.mp.port", 5555 "lmcache.mp.port", 5555
) )
mq_timeout = float(
vllm_config.kv_transfer_config.get_from_extra_config(
"lmcache.mp.mq_timeout", 300.0
)
)
heartbeat_interval = float(
vllm_config.kv_transfer_config.get_from_extra_config(
"lmcache.mp.heartbeat_interval", 10.0
)
)
server_url = f"{server_host}:{server_port}" server_url = f"{server_host}:{server_port}"
zmq_context = zmq.Context.instance() zmq_context = zmq.Context.instance()
if self.role == KVConnectorRole.SCHEDULER: if self.role == KVConnectorRole.SCHEDULER:
self.scheduler_adapter = create_scheduler_adapter( self.scheduler_adapter = create_scheduler_adapter(
server_url, zmq_context, vllm_config server_url,
zmq_context,
vllm_config,
mq_timeout,
heartbeat_interval,
) )
self.request_trackers: dict[str, LMCacheMPRequestTracker] = {} self.request_trackers: dict[str, LMCacheMPRequestTracker] = {}
elif self.role == KVConnectorRole.WORKER: elif self.role == KVConnectorRole.WORKER:
self.worker_adapter = create_worker_adapter( self.worker_adapter = create_worker_adapter(
server_url, zmq_context, vllm_config server_url,
zmq_context,
vllm_config,
mq_timeout,
heartbeat_interval,
) )
else: else:
raise ValueError(f"Unknown KVConnectorRole: {self.role}") raise ValueError(f"Unknown KVConnectorRole: {self.role}")
...@@ -616,8 +649,7 @@ class LMCacheMPConnector(KVConnectorBase_V1): ...@@ -616,8 +649,7 @@ class LMCacheMPConnector(KVConnectorBase_V1):
- Sync loading: failed blocks should be reported in the forward - Sync loading: failed blocks should be reported in the forward
pass in which they are detected. pass in which they are detected.
""" """
# TODO: add error tracking return self.worker_adapter.get_block_ids_with_load_errors()
return set()
def shutdown(self): def shutdown(self):
""" """
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment