"docs/vscode:/vscode.git/clone" did not exist on "2e569756570bbe1297694ec1dcc93844198ed685"
Unverified Commit 81819fbc authored by mohammedabdulwahhab's avatar mohammedabdulwahhab Committed by GitHub
Browse files

fix(vllm): initialize failover flock in prefill worker (#8367)


Signed-off-by: default avatarmohammedabdulwahhab <furkhan324@berkeley.edu>
parent c4964f55
......@@ -187,6 +187,36 @@ class WorkerFactory:
finally:
handler.cleanup()
async def _maybe_wait_for_failover_lock(
self,
handler,
runtime: DistributedRuntime,
config: Config,
) -> None:
# Shadow mode: lock-driven activation.
# Flow: sleep → startup probe passes → block on lock → wake → register.
if not config.gms_shadow_mode:
return
await handler._quiesce_controller.quiesce(1)
runtime.set_health_status(True)
logger.info(
"[Shadow] Engine sleeping, startup probe now passing, waiting for lock"
)
from gpu_memory_service.failover_lock.flock import FlockFailoverLock
lock_path = os.environ.get("FAILOVER_LOCK_PATH", "/shared/failover.lock")
engine_id = os.environ.get("ENGINE_ID", "0")
lock = FlockFailoverLock(lock_path)
await lock.acquire(engine_id=f"engine-{engine_id}")
logger.info("[Shadow] Lock acquired, waking engine")
await handler._quiesce_controller.resume()
handler._quiesce_controller.mark_resumed()
logger.info("[Shadow] Engine awake, registering with discovery")
async def _create_decode_worker(
self,
runtime: DistributedRuntime,
......@@ -354,27 +384,7 @@ class WorkerFactory:
"The chat template will be loaded but the /v1/chat/completions endpoint will not be available."
)
if config.gms_shadow_mode:
# Shadow mode: lock-driven activation.
# Flow: sleep → startup probe passes → block on lock → wake → register.
await handler._quiesce_controller.quiesce(1)
runtime.set_health_status(True)
logger.info(
"[Shadow] Engine sleeping, startup probe now passing, waiting for lock"
)
from gpu_memory_service.failover_lock.flock import FlockFailoverLock
lock_path = os.environ.get("FAILOVER_LOCK_PATH", "/shared/failover.lock")
engine_id = os.environ.get("ENGINE_ID", "0")
lock = FlockFailoverLock(lock_path)
await lock.acquire(engine_id=f"engine-{engine_id}")
logger.info("[Shadow] Lock acquired, waking engine")
await handler._quiesce_controller.resume()
handler._quiesce_controller.mark_resumed()
logger.info("[Shadow] Engine awake, registering with discovery")
await self._maybe_wait_for_failover_lock(handler, runtime, config)
# Wait for self-benchmark to complete before registering.
bench_cfg = vllm_config.additional_config.get("benchmark")
......@@ -574,6 +584,8 @@ class WorkerFactory:
"Registered engine routes: /engine/sleep, /engine/wake_up, /engine/scale_elastic_ep"
)
await self._maybe_wait_for_failover_lock(handler, runtime, config)
# Wait for self-benchmark to complete before registering.
bench_cfg = vllm_config.additional_config.get("benchmark")
if bench_cfg:
......
......@@ -113,8 +113,7 @@ class GMSShadowModelRunner(GPUModelRunner):
# Re-register with KV transfer group (skipped at init since kv_caches was {}).
# Mirrors GPUModelRunner.initialize_kv_cache() — update if upstream changes.
try:
from vllm.distributed.kv_transfer.kv_connector.v1.base import (
from vllm.distributed.kv_transfer import (
get_kv_transfer_group,
has_kv_transfer_group,
)
......@@ -122,9 +121,7 @@ class GMSShadowModelRunner(GPUModelRunner):
if has_kv_transfer_group() and kv_caches:
kv_transfer_group = get_kv_transfer_group()
kv_transfer_group.register_kv_caches(kv_caches)
logger.debug("[Shadow] Registered KV caches with transfer group")
except ImportError:
logger.debug("[Shadow] KV transfer group not available")
logger.info("[Shadow] Registered KV caches with transfer group")
total_bytes = sum(t.numel() * t.element_size() for t in kv_caches.values())
msg = "[Shadow] Allocated KV cache on wake: %.2f GiB (%d tensors)" % (
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment