Unverified Commit 49feb284 authored by Schwinn Saereesitthipitak's avatar Schwinn Saereesitthipitak Committed by GitHub
Browse files

feat(gms): exported GMS handles on allocation (#8108)

parent 8f4bd3b4
...@@ -65,7 +65,7 @@ The GMS server runs as an independent process that manages GPU memory without ev ...@@ -65,7 +65,7 @@ The GMS server runs as an independent process that manages GPU memory without ev
The server consists of three main components: The server consists of three main components:
1. **Memory Manager** - Allocates physical GPU memory via CUDA VMM (`cuMemCreate`) and exports shareable file descriptors (`cuMemExportToShareableHandle`). Critically, it never calls `cuMemMap` - clients handle all virtual address mapping. Allocation requests retry on OOM until they succeed or the optional retry timeout is reached. 1. **Memory Manager** - Allocates physical GPU memory via CUDA VMM (`cuMemCreate`) and eagerly exports one shareable file descriptor (`cuMemExportToShareableHandle`) per allocation. Later export RPCs `dup()` that cached FD instead of calling back into CUDA again. Critically, it never calls `cuMemMap` - clients handle all virtual address mapping. Allocation requests retry on OOM until they succeed or the optional retry timeout is reached.
2. **State Machine (FSM)** - Manages global lock state, waiter coordination, and disconnect cleanup. 2. **State Machine (FSM)** - Manages global lock state, waiter coordination, and disconnect cleanup.
...@@ -106,14 +106,15 @@ sequenceDiagram ...@@ -106,14 +106,15 @@ sequenceDiagram
C->>S: AllocateRequest(size, tag) C->>S: AllocateRequest(size, tag)
S->>GPU: cuMemCreate(size) S->>GPU: cuMemCreate(size)
GPU-->>S: handle GPU-->>S: handle
S->>GPU: cuMemExportToShareableHandle(handle)
GPU-->>S: cached fd
S-->>C: AllocateResponse(allocation_id) S-->>C: AllocateResponse(allocation_id)
end end
%% Export/Import (Both Writer and Reader) %% Export/Import (Both Writer and Reader)
Note over C,GPU: Both Writer and Reader: Export and map Note over C,GPU: Both Writer and Reader: Export and map
C->>S: ExportAllocationRequest(allocation_id) C->>S: ExportAllocationRequest(allocation_id)
S->>GPU: cuMemExportToShareableHandle(handle) S->>S: dup(cached fd)
GPU-->>S: fd
S-->>C: Response + fd (via SCM_RIGHTS) S-->>C: Response + fd (via SCM_RIGHTS)
C->>GPU: cuMemImportFromShareableHandle(fd) C->>GPU: cuMemImportFromShareableHandle(fd)
...@@ -194,6 +195,8 @@ flowchart LR ...@@ -194,6 +195,8 @@ flowchart LR
- `RW_CONNECT` starts a fresh RW layout build. - `RW_CONNECT` starts a fresh RW layout build.
- `RW_COMMIT` publishes the current layout; it does not create another one. - `RW_COMMIT` publishes the current layout; it does not create another one.
- `RW_ABORT` discards the current RW layout and returns the system to `EMPTY`. - `RW_ABORT` discards the current RW layout and returns the system to `EMPTY`.
- `RW -> EMPTY` does not require allocating a new layout first; it happens immediately when the writer drops the session before commit.
- There is no RPC that clears the active RW layout while keeping the same writer session alive. To abandon a partially built RW layout, the writer must disconnect or call `abort()`, and any later RW build starts from a fresh `RW_CONNECT`.
- Allocations and metadata live in one flat store that is cleared on `RW_CONNECT` and `RW_ABORT`. - Allocations and metadata live in one flat store that is cleared on `RW_CONNECT` and `RW_ABORT`.
- RO requests are served only from the committed layout, while RW requests mutate only the active layout. - RO requests are served only from the committed layout, while RW requests mutate only the active layout.
- Read RPCs (`export`, allocation lookup/listing, metadata lookup/listing) operate on that single live store. This is safe because the FSM prevents RW and RO sessions from coexisting. - Read RPCs (`export`, allocation lookup/listing, metadata lookup/listing) operate on that single live store. This is safe because the FSM prevents RW and RO sessions from coexisting.
...@@ -386,10 +389,16 @@ sequenceDiagram ...@@ -386,10 +389,16 @@ sequenceDiagram
S-->>C: HandshakeResponse(granted=RO, committed=true) S-->>C: HandshakeResponse(granted=RO, committed=true)
Note over P: Subsequent process - import from GMS Note over P: Subsequent process - import from GMS
else RW held by another else RW held by another
S->>S: Wait until a committed layout becomes available S->>S: Wait for current writer to either commit or abort
S->>S: Then grant RO from COMMITTED alt current writer commits
S->>S: Grant RO from COMMITTED
S-->>C: HandshakeResponse(granted=RO, committed=true) S-->>C: HandshakeResponse(granted=RO, committed=true)
Note over P: Wait for writer to publish committed weights Note over P: Import published weights
else current writer aborts
S->>S: Grant RW from EMPTY
S-->>C: HandshakeResponse(granted=RW, committed=false)
Note over P: Previous writer gave up; this process becomes the writer
end
end end
``` ```
...@@ -529,7 +538,7 @@ GMS provides pre-built integrations for vLLM and SGLang. Enable GMS by passing ` ...@@ -529,7 +538,7 @@ GMS provides pre-built integrations for vLLM and SGLang. Enable GMS by passing `
When `--load-format gms` is set: When `--load-format gms` is set:
1. **A GMS server must already be running** for the target GPU device. The engine connects to it via a Unix socket derived from the GPU UUID. 1. **A GMS server must already be running** for the target GPU device. The engine connects to it via a Unix socket derived from the GPU UUID.
2. The engine uses `RW_OR_RO` mode by default: if no committed layout exists and no writer holds the lock, the first process gets RW and loads weights from disk. Otherwise clients wait for a committed layout and then get RO to import published weights. 2. The engine uses `RW_OR_RO` mode by default: if no committed layout exists and no writer holds the lock, the first process gets RW and loads weights from disk. If another writer is already active, later clients wait until that writer either commits or aborts; after a commit they get RO to import published weights, and after an abort one of them can become the new RW writer.
3. Both weights and KV cache are managed by GMS, but they use separate tags: 3. Both weights and KV cache are managed by GMS, but they use separate tags:
- `weights`: publish/import flow (`RW_OR_RO`, then `RO` after commit) - `weights`: publish/import flow (`RW_OR_RO`, then `RO` after commit)
- `kv_cache`: separate RW-only tag for mutable KV-cache memory - `kv_cache`: separate RW-only tag for mutable KV-cache memory
......
...@@ -7,6 +7,7 @@ from __future__ import annotations ...@@ -7,6 +7,7 @@ from __future__ import annotations
import asyncio import asyncio
import logging import logging
import os
import time import time
from dataclasses import dataclass from dataclasses import dataclass
from typing import Callable, Optional from typing import Callable, Optional
...@@ -30,6 +31,7 @@ class AllocationInfo: ...@@ -30,6 +31,7 @@ class AllocationInfo:
size: int size: int
aligned_size: int aligned_size: int
handle: int handle: int
export_fd: int
tag: str tag: str
layout_slot: int layout_slot: int
created_at: float created_at: float
...@@ -130,11 +132,13 @@ class GMSAllocationManager: ...@@ -130,11 +132,13 @@ class GMSAllocationManager:
) )
await asyncio.sleep(self._allocation_retry_interval) await asyncio.sleep(self._allocation_retry_interval)
export_fd = int(cumem_export_to_shareable_handle(int(handle)))
info = AllocationInfo( info = AllocationInfo(
allocation_id=str(uuid4()), allocation_id=str(uuid4()),
size=size, size=size,
aligned_size=aligned_size, aligned_size=aligned_size,
handle=int(handle), handle=int(handle),
export_fd=export_fd,
tag=tag, tag=tag,
layout_slot=self._next_layout_slot, layout_slot=self._next_layout_slot,
created_at=time.time(), created_at=time.time(),
...@@ -152,14 +156,14 @@ class GMSAllocationManager: ...@@ -152,14 +156,14 @@ class GMSAllocationManager:
return info return info
def export_allocation(self, allocation_id: str) -> int: def export_allocation(self, allocation_id: str) -> int:
return cumem_export_to_shareable_handle( info = self.get_allocation(allocation_id)
self.get_allocation(allocation_id).handle return os.dup(info.export_fd)
)
def free_allocation(self, allocation_id: str) -> bool: def free_allocation(self, allocation_id: str) -> bool:
info = self._allocations.get(allocation_id) info = self._allocations.get(allocation_id)
if info is None: if info is None:
return False return False
os.close(info.export_fd)
cumem_release(info.handle) cumem_release(info.handle)
self._allocations.pop(allocation_id, None) self._allocations.pop(allocation_id, None)
logger.debug("Freed allocation: %s", allocation_id) logger.debug("Freed allocation: %s", allocation_id)
...@@ -169,6 +173,7 @@ class GMSAllocationManager: ...@@ -169,6 +173,7 @@ class GMSAllocationManager:
allocation_ids = list(self._allocations) allocation_ids = list(self._allocations)
for allocation_id in allocation_ids: for allocation_id in allocation_ids:
info = self._allocations[allocation_id] info = self._allocations[allocation_id]
os.close(info.export_fd)
cumem_release(info.handle) cumem_release(info.handle)
self._allocations.pop(allocation_id, None) self._allocations.pop(allocation_id, None)
if allocation_ids: if allocation_ids:
......
...@@ -742,6 +742,55 @@ def test_disconnect_during_allocation_retry_aborts_writer_and_unblocks_next_writ ...@@ -742,6 +742,55 @@ def test_disconnect_during_allocation_retry_aborts_writer_and_unblocks_next_writ
assert isinstance(result.get("error"), ConnectionError) assert isinstance(result.get("error"), ConnectionError)
@pytest.mark.asyncio
async def test_allocation_manager_caches_exported_fd(monkeypatch):
export_calls = 0
monkeypatch.setattr(server_allocations, "cuda_ensure_initialized", lambda: None)
monkeypatch.setattr(
server_allocations,
"cumem_get_allocation_granularity",
lambda device: 4096,
)
monkeypatch.setattr(
server_allocations,
"cumem_create_tolerate_oom",
lambda size, device: (True, 4242),
)
monkeypatch.setattr(server_allocations, "cumem_release", lambda handle: None)
def export_fd(handle: int) -> int:
nonlocal export_calls
export_calls += 1
read_fd, write_fd = os.pipe()
os.close(write_fd)
return read_fd
monkeypatch.setattr(
server_allocations, "cumem_export_to_shareable_handle", export_fd
)
allocations = GMSAllocationManager(device=0)
info = await allocations.allocate(size=4096, tag="weights")
first_fd = allocations.export_allocation(info.allocation_id)
second_fd = allocations.export_allocation(info.allocation_id)
try:
assert export_calls == 1
assert info.export_fd >= 0
assert first_fd != info.export_fd
assert second_fd != info.export_fd
assert first_fd != second_fd
os.fstat(first_fd)
os.fstat(second_fd)
finally:
os.close(first_fd)
os.close(second_fd)
assert allocations.free_allocation(info.allocation_id)
@pytest.mark.asyncio @pytest.mark.asyncio
@pytest.mark.timeout(180) @pytest.mark.timeout(180)
async def test_new_layout_large_allocation_waits_for_dead_writer_process( async def test_new_layout_large_allocation_waits_for_dead_writer_process(
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment