Unverified Commit 9df692c1 authored by mohammedabdulwahhab's avatar mohammedabdulwahhab Committed by GitHub
Browse files

feat: flock-based failover lock for engine leader election (#6817)


Signed-off-by: default avatarmohammedabdulwahhab <furkhan324@berkeley.edu>
Co-authored-by: default avatarClaude Opus 4.6 (1M context) <noreply@anthropic.com>
parent 56dd0d90
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
from gpu_memory_service.failover_lock.interface import FailoverLock, FailoverLockError
__all__ = ["FailoverLock", "FailoverLockError"]
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
from gpu_memory_service.failover_lock.flock.lock import FlockFailoverLock
__all__ = ["FlockFailoverLock"]
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import asyncio
import fcntl
import logging
import os
import time
from gpu_memory_service.failover_lock.interface import FailoverLock, FailoverLockError
logger = logging.getLogger(__name__)
class FlockFailoverLock(FailoverLock):
"""flock-based failover lock.
Uses POSIX flock(LOCK_EX) on a shared file as the lock primitive.
The Linux kernel is the lock manager — no server process, no sidecar,
no protocol. The lock is automatically released when the holding
process dies (even via SIGKILL), because the kernel closes all file
descriptors.
Cross-container operation: containers sharing an emptyDir volume
can contend for the same lock file. Acquiring twice from the same
process is harmless — flock succeeds immediately if already held.
"""
def __init__(self, lock_path: str):
self._lock_path = lock_path
self._fd: int | None = None
self._engine_id: str | None = None
async def acquire(
self,
engine_id: str,
poll_interval: float = 0.1,
timeout: float | None = None,
) -> None:
"""Acquire the exclusive flock via non-blocking poll loop.
Uses LOCK_NB to avoid blocking the asyncio event loop. Polls
every ``poll_interval`` seconds (default 100ms).
Polling keeps us from blocking the event loop.
"""
# O_CREAT: create the file if it doesn't exist
# O_RDWR: open for reading and writing (flock requires a valid fd,
# and we write our engine_id into the file after acquiring)
fd = os.open(self._lock_path, os.O_CREAT | os.O_RDWR)
start = time.monotonic()
try:
while True:
try:
# LOCK_EX: exclusive lock — only one process can hold it
# LOCK_NB: non-blocking — raises BlockingIOError instead of
# blocking the calling thread, so the asyncio event
# loop stays responsive between poll attempts
fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
break
except BlockingIOError:
if timeout is not None:
elapsed = time.monotonic() - start
if elapsed >= timeout:
raise FailoverLockError(
f"Timed out acquiring flock at {self._lock_path} "
f"for engine {engine_id} after {elapsed:.1f}s"
)
await asyncio.sleep(poll_interval)
except Exception as e:
os.close(fd)
logger.error(
"Failed to acquire failover lock at %s for engine %s: %s",
self._lock_path,
engine_id,
e,
)
raise FailoverLockError(
f"Failed to acquire flock at {self._lock_path} for engine "
f"{engine_id}: {e}"
) from e
self._fd = fd
self._engine_id = engine_id
# Write identity into the lock file for observability (owner() reads
# this). We use raw fd ops because we must keep the fd open — closing
# it would release the flock. ftruncate clears any stale content from
# the previous holder, lseek rewinds, write stamps our id.
os.ftruncate(self._fd, 0)
os.lseek(self._fd, 0, os.SEEK_SET)
os.write(self._fd, engine_id.encode())
logger.info("Failover lock acquired: %s", engine_id)
async def release(self) -> None:
if self._fd is None:
return
# Guard: only the holder should release. If we don't hold the lock
# (e.g. double-release or programming error), closing the fd is a
# no-op for flock semantics — but log a warning for visibility.
try:
current = await self.owner()
if current != self._engine_id:
logger.warning(
"Releasing lock but owner is %r, expected %r",
current,
self._engine_id,
)
except OSError as e:
logger.debug("Could not read owner during release: %s", e)
logger.info("Failover lock released: %s", self._engine_id)
os.close(self._fd)
self._fd = None
self._engine_id = None
async def owner(self) -> str | None:
try:
with open(self._lock_path, "r") as f:
content = f.read().strip()
return content if content else None
except FileNotFoundError:
return None
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""Failover lock for GPU engine leader election.
In a failover deployment, there are multiple parallel engine processes ready to serve inference (warm standby engines).
The FailoverLock is used to determine which engine is active and serving inference while the others remain sleeping.
The lock couples two concerns:
1. Leader election — which engine is active
2. Resource safety — GPU memory is free for the new leader
Release happens on process death (implicit) or explicit call. By the
time a standby engine acquires, the old engine's GPU memory (KV cache,
CUDA contexts) has been reclaimed by the OS/driver.
"""
from abc import ABC, abstractmethod
class FailoverLockError(Exception):
"""Raised when a failover lock operation fails unexpectedly."""
class FailoverLock(ABC):
@abstractmethod
async def acquire(self, engine_id: str, timeout: float | None = None) -> None:
"""Block until this engine is granted the active role.
Args:
engine_id: Identity of the engine claiming the lock.
timeout: Maximum seconds to wait. None = wait forever.
"""
...
@abstractmethod
async def release(self) -> None:
"""Release the lock (give up the active role).
Called on graceful shutdown. Also happens implicitly on
process death (e.g kernel closes the file descriptor, releasing
the flock).
"""
...
@abstractmethod
async def owner(self) -> str | None:
"""Return the engine_id of the current lock holder, or None."""
...
......@@ -72,6 +72,8 @@ setup(
"gpu_memory_service.client",
"gpu_memory_service.client.torch",
"gpu_memory_service.client.torch.extensions",
"gpu_memory_service.failover_lock",
"gpu_memory_service.failover_lock.flock",
"gpu_memory_service.integrations",
"gpu_memory_service.integrations.common",
"gpu_memory_service.integrations.sglang",
......@@ -86,6 +88,8 @@ setup(
"gpu_memory_service.client": "client",
"gpu_memory_service.client.torch": "client/torch",
"gpu_memory_service.client.torch.extensions": "client/torch/extensions",
"gpu_memory_service.failover_lock": "failover_lock",
"gpu_memory_service.failover_lock.flock": "failover_lock/flock",
"gpu_memory_service.integrations": "integrations",
"gpu_memory_service.integrations.common": "integrations/common",
"gpu_memory_service.integrations.sglang": "integrations/sglang",
......
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""Tests for the flock-based failover lock.
No GPU required — these are pure Python/OS tests exercising flock
semantics across asyncio tasks and child processes.
"""
import asyncio
import multiprocessing
import os
import signal
import time
import pytest
import pytest_asyncio # noqa: F401 — ensures the plugin is available
from gpu_memory_service.failover_lock.flock import FlockFailoverLock
@pytest.fixture
def lock_path(tmp_path):
return str(tmp_path / "failover.lock")
# ── Test 1: basic acquire / release ──────────────────────────────────
@pytest.mark.asyncio
async def test_acquire_release(lock_path):
lock = FlockFailoverLock(lock_path)
await lock.acquire("engine-0")
# Lock file should contain the engine id
with open(lock_path) as f:
assert f.read().strip() == "engine-0"
# Internal fd is open
assert lock._fd is not None
await lock.release()
# fd is closed
assert lock._fd is None
# ── Test 2: two-engine contention ────────────────────────────────────
@pytest.mark.asyncio
async def test_two_engines_contention(lock_path):
"""Engine A holds lock. Engine B blocks. A releases. B acquires."""
lock_a = FlockFailoverLock(lock_path)
lock_b = FlockFailoverLock(lock_path)
await lock_a.acquire("engine-a")
b_acquired = asyncio.Event()
async def acquire_b():
await lock_b.acquire("engine-b", poll_interval=0.01)
b_acquired.set()
task_b = asyncio.create_task(acquire_b())
# Give B a few poll cycles — it should NOT acquire
await asyncio.sleep(0.1)
assert not b_acquired.is_set()
# Release A — B should acquire
await lock_a.release()
await asyncio.wait_for(b_acquired.wait(), timeout=2.0)
assert b_acquired.is_set()
# Lock file should now show engine-b
with open(lock_path) as f:
assert f.read().strip() == "engine-b"
await lock_b.release()
task_b.cancel()
# ── Test 3: process death releases lock ──────────────────────────────
def _child_acquire_and_hang(lock_path: str, ready_fd: int):
"""Child process: acquire flock, signal parent, then block forever."""
import fcntl
fd = os.open(lock_path, os.O_CREAT | os.O_RDWR)
fcntl.flock(fd, fcntl.LOCK_EX)
os.write(fd, b"child")
# Signal parent that we hold the lock
os.write(ready_fd, b"1")
os.close(ready_fd)
# Block forever (parent will SIGKILL us)
time.sleep(3600)
@pytest.mark.asyncio
async def test_process_death_releases(lock_path):
"""SIGKILL a child holding the lock. Parent should acquire."""
read_fd, write_fd = os.pipe()
child = multiprocessing.Process(
target=_child_acquire_and_hang, args=(lock_path, write_fd)
)
child.start()
os.close(write_fd)
# Wait for child to signal it holds the lock
os.read(read_fd, 1)
os.close(read_fd)
# Child holds the lock — verify we can't acquire immediately
lock = FlockFailoverLock(lock_path)
fd_check = os.open(lock_path, os.O_RDWR)
try:
import fcntl
fcntl.flock(fd_check, fcntl.LOCK_EX | fcntl.LOCK_NB)
pytest.fail("Should not have acquired — child holds the lock")
except BlockingIOError:
pass # expected
finally:
os.close(fd_check)
# Destroy the child process — kernel releases the flock
os.kill(child.pid, signal.SIGKILL)
child.join(timeout=5)
# Now parent should acquire
await lock.acquire("parent", poll_interval=0.01)
with open(lock_path) as f:
assert f.read().strip() == "parent"
await lock.release()
# ── Test 4: owner() ──────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_owner(lock_path):
lock = FlockFailoverLock(lock_path)
# No lock file yet
assert await lock.owner() is None
await lock.acquire("engine-x")
assert await lock.owner() == "engine-x"
await lock.release()
# File still exists with stale content (flock is the authority, not file content)
assert await lock.owner() == "engine-x"
@pytest.mark.asyncio
async def test_owner_separate_instance(lock_path):
"""owner() works from a different FlockFailoverLock instance."""
lock_holder = FlockFailoverLock(lock_path)
lock_observer = FlockFailoverLock(lock_path)
await lock_holder.acquire("holder")
assert await lock_observer.owner() == "holder"
await lock_holder.release()
# ── Test 5: cross-process race ───────────────────────────────────────
def _racer(lock_path: str, engine_id: str, result_queue: multiprocessing.Queue):
"""Acquire the lock, report timing, hold briefly, release."""
import fcntl
fd = os.open(lock_path, os.O_CREAT | os.O_RDWR)
t0 = time.monotonic()
fcntl.flock(fd, fcntl.LOCK_EX)
t1 = time.monotonic()
os.ftruncate(fd, 0)
os.lseek(fd, 0, os.SEEK_SET)
os.write(fd, engine_id.encode())
result_queue.put({"engine_id": engine_id, "wait_s": t1 - t0})
# Hold the lock briefly
time.sleep(0.2)
os.close(fd)
@pytest.mark.asyncio
async def test_cross_process_race(lock_path):
"""Two processes race. Exactly one wins first, the other acquires after."""
result_queue = multiprocessing.Queue()
p1 = multiprocessing.Process(target=_racer, args=(lock_path, "p1", result_queue))
p2 = multiprocessing.Process(target=_racer, args=(lock_path, "p2", result_queue))
p1.start()
p2.start()
p1.join(timeout=10)
p2.join(timeout=10)
results = []
while not result_queue.empty():
results.append(result_queue.get_nowait())
assert len(results) == 2
# Sort by wait time — the one with shorter wait won the race
results.sort(key=lambda r: r["wait_s"])
winner = results[0]
loser = results[1]
# Winner acquired almost immediately
assert winner["wait_s"] < 0.1
# Loser had to wait (winner held for 0.2s)
assert loser["wait_s"] >= 0.1
# Both finished — both eventually acquired
assert {r["engine_id"] for r in results} == {"p1", "p2"}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment