Unverified Commit 1c65a588 authored by mohammedabdulwahhab's avatar mohammedabdulwahhab Committed by GitHub
Browse files

feat(gms): e2e failover support for vLLM (#6818)


Signed-off-by: default avatarmohammedabdulwahhab <furkhan324@berkeley.edu>
parent 8eb79979
...@@ -42,6 +42,9 @@ class Config(DynamoRuntimeConfig, DynamoVllmConfig): ...@@ -42,6 +42,9 @@ class Config(DynamoRuntimeConfig, DynamoVllmConfig):
enable_local_indexer: bool = True enable_local_indexer: bool = True
use_kv_events: bool use_kv_events: bool
# GMS configuration
gms_shadow_mode: bool = False
# mirror vLLM # mirror vLLM
model: str model: str
served_model_name: Optional[str] = None served_model_name: Optional[str] = None
...@@ -126,6 +129,13 @@ def cross_validate_config( ...@@ -126,6 +129,13 @@ def cross_validate_config(
"bypassing vLLM's OutputProcessor buffering." "bypassing vLLM's OutputProcessor buffering."
) )
# Validate --gms-shadow-mode requires --load-format gms
if dynamo_config.gms_shadow_mode and engine_config.load_format != "gms":
raise ValueError(
"--gms-shadow-mode requires --load-format gms. "
"Shadow mode depends on GMS for VA-stable weight sharing."
)
def update_dynamo_config_with_engine( def update_dynamo_config_with_engine(
dynamo_config: Config, engine_config: AsyncEngineArgs dynamo_config: Config, engine_config: AsyncEngineArgs
......
...@@ -157,6 +157,20 @@ class DynamoVllmArgGroup(ArgGroup): ...@@ -157,6 +157,20 @@ class DynamoVllmArgGroup(ArgGroup):
"Required when using --load-format=mx-source or --load-format=mx-target.", "Required when using --load-format=mx-source or --load-format=mx-target.",
) )
# GMS (GPU Memory Service) shadow mode
add_negatable_bool_argument(
g,
flag_name="--gms-shadow-mode",
env_var="DYN_VLLM_GMS_SHADOW_MODE",
default=False,
help=(
"Enable GMS shadow/standby mode. Shadow engines skip KV cache "
"allocation at startup, automatically sleep after initialization, "
"and wake on demand when the active engine dies. "
"Requires --load-format=gms."
),
)
# @dataclass() # @dataclass()
class DynamoVllmConfig(ConfigBase): class DynamoVllmConfig(ConfigBase):
...@@ -187,6 +201,9 @@ class DynamoVllmConfig(ConfigBase): ...@@ -187,6 +201,9 @@ class DynamoVllmConfig(ConfigBase):
# ModelExpress P2P # ModelExpress P2P
model_express_url: Optional[str] = None model_express_url: Optional[str] = None
# GMS shadow mode
gms_shadow_mode: bool = False
def validate(self) -> None: def validate(self) -> None:
"""Validate vLLM wrapper configuration.""" """Validate vLLM wrapper configuration."""
self._resolve_disaggregation_mode() self._resolve_disaggregation_mode()
......
...@@ -81,6 +81,26 @@ def run_dynamo_headless(config: Config) -> None: ...@@ -81,6 +81,26 @@ def run_dynamo_headless(config: Config) -> None:
Secondary nodes spawn vLLM workers only — no engine core, no scheduler, Secondary nodes spawn vLLM workers only — no engine core, no scheduler,
no Dynamo endpoints. Bypasses DistributedRuntime entirely (no NATS/etcd). no Dynamo endpoints. Bypasses DistributedRuntime entirely (no NATS/etcd).
""" """
# Propagate worker_cls for custom load formats so headless workers use
# the same model loader and patches as the leader node.
if config.engine_args.load_format == "gms":
config.engine_args.worker_cls = (
"gpu_memory_service.integrations.vllm.worker.GMSWorker"
)
if config.gms_shadow_mode:
from gpu_memory_service.integrations.vllm.utils import (
configure_gms_lock_mode,
validate_cudagraph_mode,
)
os.environ["DYN_GMS_SHADOW_MODE"] = "1"
configure_gms_lock_mode(config.engine_args)
validate_cudagraph_mode(config.engine_args)
elif config.engine_args.load_format in ("mx-source", "mx-target"):
config.engine_args.worker_cls = "modelexpress.vllm_worker.ModelExpressWorker"
# Keep the upstream CLI import local so tests that only exercise # Keep the upstream CLI import local so tests that only exercise
# build_headless_namespace() do not pull in vLLM's full CLI import graph. # build_headless_namespace() do not pull in vLLM's full CLI import graph.
from vllm.entrypoints.cli.serve import run_headless from vllm.entrypoints.cli.serve import run_headless
...@@ -445,6 +465,21 @@ def setup_vllm_engine( ...@@ -445,6 +465,21 @@ def setup_vllm_engine(
if engine_args.load_format == "gms": if engine_args.load_format == "gms":
engine_args.worker_cls = "gpu_memory_service.integrations.vllm.worker.GMSWorker" engine_args.worker_cls = "gpu_memory_service.integrations.vllm.worker.GMSWorker"
if config.gms_shadow_mode:
from gpu_memory_service.integrations.vllm.utils import (
configure_gms_lock_mode,
validate_cudagraph_mode,
)
os.environ["DYN_GMS_SHADOW_MODE"] = "1"
logger.info(
"[Shadow] Enabled shadow mode: will skip KV cache allocation at startup"
)
# ENGINE_ID=0 writes weights, all others import (RO).
# Prevents deadlock during TP>1 failover.
configure_gms_lock_mode(engine_args)
validate_cudagraph_mode(engine_args)
if engine_args.load_format in ("mx-source", "mx-target"): if engine_args.load_format in ("mx-source", "mx-target"):
try: try:
from modelexpress import register_modelexpress_loaders from modelexpress import register_modelexpress_loaders
......
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
import asyncio import asyncio
import logging import logging
import os
from collections.abc import Awaitable, Callable from collections.abc import Awaitable, Callable
from typing import Any, Optional from typing import Any, Optional
...@@ -514,6 +515,27 @@ class WorkerFactory: ...@@ -514,6 +515,27 @@ class WorkerFactory:
"The chat template will be loaded but the /v1/chat/completions endpoint will not be available." "The chat template will be loaded but the /v1/chat/completions endpoint will not be available."
) )
if config.gms_shadow_mode:
# Shadow mode: lock-driven activation.
# Flow: sleep → startup probe passes → block on lock → wake → register.
await handler.sleep({"level": 1})
runtime.set_health_status(True)
logger.info(
"[Shadow] Engine sleeping, startup probe now passing, waiting for lock"
)
from gpu_memory_service.failover_lock.flock import FlockFailoverLock
lock_path = os.environ.get("FAILOVER_LOCK_PATH", "/shared/failover.lock")
engine_id = os.environ.get("ENGINE_ID", "0")
lock = FlockFailoverLock(lock_path)
await lock.acquire(engine_id=f"engine-{engine_id}")
logger.info("[Shadow] Lock acquired, waking engine")
await handler.wake_up({})
logger.info("[Shadow] Engine awake, registering with discovery")
await self.register_vllm_model( await self.register_vllm_model(
model_input, model_input,
model_type, model_type,
......
...@@ -780,6 +780,17 @@ impl DistributedRuntime { ...@@ -780,6 +780,17 @@ impl DistributedRuntime {
Ok(()) Ok(())
} }
/// Set the system-level health status (Ready / NotReady).
fn set_health_status(&self, ready: bool) -> PyResult<()> {
let status = if ready {
config::HealthStatus::Ready
} else {
config::HealthStatus::NotReady
};
self.inner.system_health().lock().set_health_status(status);
Ok(())
}
// This is used to pass the DistributedRuntime from the dynamo-runtime bindings // This is used to pass the DistributedRuntime from the dynamo-runtime bindings
// to the KVBM bindings, since KVBM cannot directly use the struct from this cdylib. // to the KVBM bindings, since KVBM cannot directly use the struct from this cdylib.
// TODO: Create a separate crate "dynamo-python" so that all binding crates can import // TODO: Create a separate crate "dynamo-python" so that all binding crates can import
......
...@@ -93,6 +93,12 @@ class DistributedRuntime: ...@@ -93,6 +93,12 @@ class DistributedRuntime:
""" """
... ...
def set_health_status(self, ready: bool) -> None:
"""
Explicitly set the system-level health status (Ready / NotReady).
"""
...
def register_engine_route( def register_engine_route(
self, self,
route_name: str, route_name: str,
......
...@@ -49,10 +49,24 @@ def register_gms_loader(load_format: str = "gms") -> None: ...@@ -49,10 +49,24 @@ def register_gms_loader(load_format: str = "gms") -> None:
class GMSModelLoader(BaseModelLoader): class GMSModelLoader(BaseModelLoader):
"""vLLM model loader that loads weights via GPU Memory Service.""" """vLLM model loader that loads weights via GPU Memory Service."""
# Keys in model_loader_extra_config that are GMS-specific and should
# not be passed to the fallback DefaultModelLoader.
_GMS_EXTRA_KEYS = frozenset({"gms_read_only"})
def __init__(self, load_config): def __init__(self, load_config):
super().__init__(load_config) super().__init__(load_config)
# Strip GMS-specific keys before creating the fallback loader,
# otherwise DefaultModelLoader rejects unknown extra config.
extra = getattr(load_config, "model_loader_extra_config", None) or {}
clean_extra = {
k: v for k, v in extra.items() if k not in self._GMS_EXTRA_KEYS
}
self.default_loader = DefaultModelLoader( self.default_loader = DefaultModelLoader(
replace(load_config, load_format="auto") replace(
load_config,
load_format="auto",
model_loader_extra_config=clean_extra,
)
) )
def download_model(self, model_config) -> None: def download_model(self, model_config) -> None:
......
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""GMS model runner subclass for shadow mode.
Allows for kv cache to be skipped for a shadow engine init.
During failover scenarios, multiple engines will be running on the same device.
They should only allocate on their cache when they are the active/leader engine.
"""
from __future__ import annotations
import logging
import time
import torch
from vllm.v1.worker.gpu_model_runner import GPUModelRunner
logger = logging.getLogger(__name__)
class GMSShadowModelRunner(GPUModelRunner):
"""GPUModelRunner subclass for shadow mode overrides.
Injected via __class__ swap in GMSWorker.init_device()
"""
@property
def in_shadow_init(self) -> bool:
"""True while shadow engine is in init phase (KV cache skipped)."""
return getattr(self, "_shadow_init_phase", False)
def enter_shadow_init(self) -> None:
"""Enter shadow init phase — KV cache allocation will be skipped."""
self._shadow_init_phase = True
logger.info("[Shadow] Entered shadow init phase")
def exit_shadow_init(self) -> None:
"""Exit shadow init phase — KV cache allocation will proceed normally."""
self._shadow_init_phase = False
logger.info("[Shadow] Exited shadow init phase")
def initialize_kv_cache_tensors(self, kv_cache_config, kernel_block_sizes):
"""No-op during shadow init; store config for later allocation on wake."""
if self.in_shadow_init:
self._shadow_kv_cache_config = kv_cache_config
self._shadow_kernel_block_sizes = kernel_block_sizes
logger.info(
"[Shadow] Init phase: stored config, skipping KV cache allocation"
)
return {}
return super().initialize_kv_cache_tensors(kv_cache_config, kernel_block_sizes)
def _get_slot_mappings(self, *args, **kwargs):
"""Return (None, None) when KV caches are empty.
_dummy_run() calls this unconditionally during warmup. Without KV
tensors there is nothing to index into. This coerces a graceful no-op.
"""
if not self.kv_caches:
return None, None
return super()._get_slot_mappings(*args, **kwargs)
def _check_and_update_cudagraph_mode(self, attention_backends, kv_cache_groups):
"""Force PIECEWISE (or keep NONE for enforce_eager) and skip backend resolution.
vLLM's default resolution may escalate to FULL_AND_PIECEWISE. We
intercept to clamp back to a shadow-compatible mode.
"""
from vllm.config import CUDAGraphMode
mode = self.compilation_config.cudagraph_mode
if mode == CUDAGraphMode.NONE:
# enforce_eager — keep NONE, just init keys
self.cudagraph_dispatcher.initialize_cudagraph_keys(
CUDAGraphMode.NONE, self.uniform_decode_query_len
)
else:
# Default shadow path — force PIECEWISE
self.compilation_config.cudagraph_mode = CUDAGraphMode.PIECEWISE
self.cudagraph_dispatcher.initialize_cudagraph_keys(
CUDAGraphMode.PIECEWISE, self.uniform_decode_query_len
)
def allocate_kv_cache_on_wake(self) -> dict:
"""Allocate KV cache on wake using config stored during shadow init.
Called by GMSWorker.wake_up() after shadow init phase is exited.
Waits up to 60s for GPU memory to be freed.
"""
assert hasattr(
self, "_shadow_kv_cache_config"
), "_shadow_kv_cache_config not set — was enter_shadow_init() called?"
assert hasattr(
self, "_shadow_kernel_block_sizes"
), "_shadow_kernel_block_sizes not set — was enter_shadow_init() called?"
# OOM remediation during failover: wait for the dying engine to release memory.
# TODO: This will be replaced with a barrier in GMS when we manage kv cache there instead
config = self._shadow_kv_cache_config
kv_cache_bytes = sum(t.size for t in config.kv_cache_tensors)
free_bytes, _ = torch.cuda.mem_get_info()
if free_bytes < kv_cache_bytes:
logger.info(
"[Shadow] Waiting for GPU memory (need %.2f GiB, free %.2f GiB)",
kv_cache_bytes / (1 << 30),
free_bytes / (1 << 30),
)
deadline = time.monotonic() + 60.0
last_log = time.monotonic()
while free_bytes < kv_cache_bytes:
if time.monotonic() > deadline:
raise RuntimeError(
f"Timed out waiting for GPU memory: "
f"need {kv_cache_bytes / (1 << 30):.2f} GiB, "
f"free {free_bytes / (1 << 30):.2f} GiB"
)
now = time.monotonic()
if now - last_log >= 5.0:
elapsed = now - (deadline - 60.0)
remaining = deadline - now
logger.info(
"[Shadow] Still waiting for GPU memory: "
"need %.2f GiB, free %.2f GiB "
"(%.0fs elapsed, %.0fs remaining)",
kv_cache_bytes / (1 << 30),
free_bytes / (1 << 30),
elapsed,
remaining,
)
last_log = now
time.sleep(0.5)
free_bytes = torch.cuda.mem_get_info()[0]
logger.info(
"[Shadow] GPU memory available (free %.2f GiB), proceeding",
free_bytes / (1 << 30),
)
logger.info("[Shadow] Allocating KV cache on wake")
from vllm.config import set_current_vllm_config
with set_current_vllm_config(self.vllm_config):
kv_caches = self.initialize_kv_cache_tensors(
config,
self._shadow_kernel_block_sizes,
)
# Re-register with KV transfer group (skipped at init since kv_caches was {}).
# Mirrors GPUModelRunner.initialize_kv_cache() — update if upstream changes.
try:
from vllm.distributed.kv_transfer.kv_connector.v1.base import (
get_kv_transfer_group,
has_kv_transfer_group,
)
if has_kv_transfer_group() and kv_caches:
kv_transfer_group = get_kv_transfer_group()
kv_transfer_group.register_kv_caches(kv_caches)
logger.debug("[Shadow] Registered KV caches with transfer group")
except ImportError:
logger.debug("[Shadow] KV transfer group not available")
total_bytes = sum(t.numel() * t.element_size() for t in kv_caches.values())
logger.info(
"[Shadow] Allocated KV cache on wake: %.2f GiB (%d tensors)",
total_bytes / (1 << 30),
len(kv_caches),
)
return kv_caches
...@@ -16,14 +16,22 @@ import logging ...@@ -16,14 +16,22 @@ import logging
from gpu_memory_service import get_gms_client_memory_manager from gpu_memory_service import get_gms_client_memory_manager
from gpu_memory_service.common.types import GrantedLockType from gpu_memory_service.common.types import GrantedLockType
from gpu_memory_service.integrations.vllm.utils import is_shadow_mode
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
_memory_snapshot_patched = False _memory_snapshot_patched = False
_request_memory_patched = False
_register_kv_caches_patched = False
# =============================================================================
# Core GMS patch (always applied)
# =============================================================================
def patch_memory_snapshot() -> None: def patch_memory_snapshot() -> None:
"""Patch MemorySnapshot.measure to add committed bytes to free_memory.""" """Add committed GMS bytes to MemorySnapshot.free_memory"""
global _memory_snapshot_patched global _memory_snapshot_patched
if _memory_snapshot_patched: if _memory_snapshot_patched:
...@@ -66,3 +74,81 @@ def patch_memory_snapshot() -> None: ...@@ -66,3 +74,81 @@ def patch_memory_snapshot() -> None:
MemorySnapshot.measure = patched_measure MemorySnapshot.measure = patched_measure
_memory_snapshot_patched = True _memory_snapshot_patched = True
logger.info("[GMS Patch] Patched MemorySnapshot.measure") logger.info("[GMS Patch] Patched MemorySnapshot.measure")
# =============================================================================
# Shadow mode patches
# =============================================================================
def patch_request_memory() -> None:
"""Bypass free >= requested check (shadow shares GPU with active engine)."""
global _request_memory_patched
if _request_memory_patched:
return
try:
from vllm.v1.worker import utils as worker_utils
except ImportError:
logger.debug("[GMS Patch] vllm.v1.worker.utils not available")
return
def patched_request_memory(init_snapshot, cache_config):
requested_memory = int(
init_snapshot.total_memory * cache_config.gpu_memory_utilization
)
logger.info(
"[GMS Patch] Shadow mode: bypassing memory check "
"(requested=%.2f GiB, free=%.2f GiB)",
requested_memory / (1 << 30),
init_snapshot.free_memory / (1 << 30),
)
return requested_memory
worker_utils.request_memory = patched_request_memory
_request_memory_patched = True
logger.info("[GMS Patch] Patched request_memory for shadow mode")
def patch_register_kv_caches() -> None:
"""Skip NixlConnector.register_kv_caches when kv_caches is empty."""
global _register_kv_caches_patched
if _register_kv_caches_patched:
return
try:
from vllm.distributed.kv_transfer.kv_connector.v1.nixl_connector import (
NixlConnector,
)
except ImportError:
logger.debug("[GMS Patch] NixlConnector not available")
return
original_register = NixlConnector.register_kv_caches
def patched_register_kv_caches(self, kv_caches):
if not kv_caches:
logger.info("[GMS Patch] Skipping KV cache registration (empty kv_caches)")
return
return original_register(self, kv_caches)
NixlConnector.register_kv_caches = patched_register_kv_caches
_register_kv_caches_patched = True
logger.info("[GMS Patch] Patched NixlConnector.register_kv_caches")
# =============================================================================
# Patch application helper
# =============================================================================
def apply_shadow_mode_patches() -> None:
"""Apply shadow mode monkey-patches. No-ops if not in shadow mode."""
if not is_shadow_mode():
return
patch_request_memory()
patch_register_kv_caches()
logger.info("[GMS Patch] Shadow mode patches applied")
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""Shadow mode utilities for GMS vLLM integration."""
import logging
import os
logger = logging.getLogger(__name__)
def is_shadow_mode() -> bool:
"""True when DYN_GMS_SHADOW_MODE=1 (set by main.py at startup)."""
return os.environ.get("DYN_GMS_SHADOW_MODE", "0") == "1"
def validate_cudagraph_mode(engine_args) -> None:
"""Validate and set cudagraph mode for shadow engines.
Defaults unset mode to PIECEWISE (attention stubbed during graph capture).
Accepts NONE (e.g. enforce_eager). Rejects FULL variants which need
KV cache tensors that don't exist during shadow init.
"""
from vllm.config import CompilationConfig, CUDAGraphMode
cc = engine_args.compilation_config
assert isinstance(cc, CompilationConfig), (
f"Expected CompilationConfig, got {type(cc).__name__}. "
f"vLLM's arg parsing may have changed."
)
if cc.cudagraph_mode is None:
cc.cudagraph_mode = CUDAGraphMode.PIECEWISE
logger.info("[Shadow] cudagraph_mode defaulted to PIECEWISE")
elif cc.cudagraph_mode in (CUDAGraphMode.PIECEWISE, CUDAGraphMode.NONE):
pass # compatible
else:
raise ValueError(
f"Shadow mode requires PIECEWISE or NONE cudagraph mode, "
f"got {cc.cudagraph_mode.name}. FULL modes capture attention ops "
f"that need KV cache tensors, which don't exist during shadow init."
)
def configure_gms_lock_mode(engine_args) -> None:
"""Set gms_read_only in model_loader_extra_config based on ENGINE_ID.
In a failover setup with TP>1, only ENGINE_ID="0" loads weights from
disk (RW_OR_RO). All other engines import from GMS (RO). This avoids
deadlock: if multiple engines tried to acquire RW locks across TP ranks
simultaneously, they could block each other indefinitely.
Raises if user-specified gms_read_only conflicts with ENGINE_ID.
"""
engine_id = os.environ.get("ENGINE_ID", "0")
extra = engine_args.model_loader_extra_config or {}
user_read_only = extra.get("gms_read_only", None)
if engine_id == "0":
if user_read_only:
raise ValueError(
"ENGINE_ID=0 is the primary writer but "
"gms_read_only=True was explicitly set. "
"The primary engine must be able to write weights."
)
else:
if user_read_only is not None and not user_read_only:
raise ValueError(
f"ENGINE_ID={engine_id} requires gms_read_only=True, "
f"but gms_read_only=False was explicitly set."
)
extra["gms_read_only"] = True
engine_args.model_loader_extra_config = extra
...@@ -13,6 +13,7 @@ Usage: ...@@ -13,6 +13,7 @@ Usage:
from __future__ import annotations from __future__ import annotations
import logging import logging
import sys
from contextlib import nullcontext from contextlib import nullcontext
from typing import List, Optional from typing import List, Optional
...@@ -21,23 +22,31 @@ from gpu_memory_service import ( ...@@ -21,23 +22,31 @@ from gpu_memory_service import (
get_gms_client_memory_manager, get_gms_client_memory_manager,
get_or_create_gms_client_memory_manager, get_or_create_gms_client_memory_manager,
) )
from gpu_memory_service.client.memory_manager import StaleMemoryLayoutError
from gpu_memory_service.common.types import RequestedLockType from gpu_memory_service.common.types import RequestedLockType
from gpu_memory_service.common.utils import get_socket_path from gpu_memory_service.common.utils import get_socket_path
from gpu_memory_service.integrations.common import patch_empty_cache from gpu_memory_service.integrations.common import patch_empty_cache
from gpu_memory_service.integrations.common.utils import get_gms_lock_mode from gpu_memory_service.integrations.common.utils import get_gms_lock_mode
from gpu_memory_service.integrations.vllm.model_loader import register_gms_loader from gpu_memory_service.integrations.vllm.model_loader import register_gms_loader
from gpu_memory_service.integrations.vllm.patches import patch_memory_snapshot from gpu_memory_service.integrations.vllm.patches import (
apply_shadow_mode_patches,
patch_memory_snapshot,
)
from gpu_memory_service.integrations.vllm.utils import is_shadow_mode
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Trigger model loader registration and utility patches on import # Trigger model loader registration and utility patches on import
register_gms_loader() register_gms_loader()
# Apply core utility patches (always needed for GMS)
patch_empty_cache() patch_empty_cache()
patch_memory_snapshot() patch_memory_snapshot()
logger.info( # Apply shadow mode patches if shadow mode is enabled
"[GMS] Worker module loaded - model loader registered, utility patches applied" apply_shadow_mode_patches()
)
logger.info("[GMS] Worker module loaded - model loader registered, all patches applied")
# Import Worker after patches are applied # Import Worker after patches are applied
from vllm.v1.worker.gpu_worker import Worker # noqa: E402 from vllm.v1.worker.gpu_worker import Worker # noqa: E402
...@@ -59,21 +68,63 @@ class GMSWorker(Worker): ...@@ -59,21 +68,63 @@ class GMSWorker(Worker):
current_platform.set_device(torch.device(f"cuda:{device}")) current_platform.set_device(torch.device(f"cuda:{device}"))
# Establish weights GMS connection (so MemorySnapshot can query committed bytes). # Establish weights GMS connection (so MemorySnapshot can query committed bytes).
# Fetch extra config from vLLM load_config to determine RW/RO lock mode. # Lock type is determined by model_loader_extra_config, set upstream by
# configure_gms_lock_mode() in main.py.
socket_path = get_socket_path(device)
extra = ( extra = (
getattr(self.vllm_config.load_config, "model_loader_extra_config", {}) or {} getattr(self.vllm_config.load_config, "model_loader_extra_config", {}) or {}
) )
socket_path = get_socket_path(device) mode = get_gms_lock_mode(extra)
get_or_create_gms_client_memory_manager( get_or_create_gms_client_memory_manager(
socket_path, socket_path,
device, device,
mode=get_gms_lock_mode(extra), mode=mode,
tag="weights", tag="weights",
) )
# Parent will set device again (harmless) and do memory checks # Parent will set device again (harmless) and do memory checks
super().init_device() super().init_device()
# __class__ swap: preserves object identity so vLLM's internal
# references see our overrides.
if is_shadow_mode() and hasattr(self, "model_runner"):
from gpu_memory_service.integrations.vllm.model_runner import (
GMSShadowModelRunner,
)
self.model_runner.__class__ = GMSShadowModelRunner
self.model_runner.enter_shadow_init()
logger.info("[GMS] Injected GMSShadowModelRunner via __class__ swap")
def determine_available_memory(self) -> int:
"""
Determine actual available memory for the engine.
During a failover scenario, this function may be called while there is an active engine colocated on the same device.
We want our assessment to ignore the kv cache allocation of the active engine if there is one.
"""
if not is_shadow_mode():
return super().determine_available_memory()
# TODO: Need a more robust way for shadow engines to profile memory while they are sharing GPUs with other engines.
# For now this gets the job done.
torch.cuda.reset_peak_memory_stats()
self.model_runner.profile_run()
torch.cuda.synchronize()
non_kv_cache_memory = torch.cuda.max_memory_allocated()
projected_available = self.requested_memory - non_kv_cache_memory
logger.info(
"[GMS] Shadow mode: projected available memory "
"%.2f GiB (requested=%.2f GiB, non_kv=%.2f GiB)",
projected_available / (1 << 30),
self.requested_memory / (1 << 30),
non_kv_cache_memory / (1 << 30),
)
return int(projected_available)
def load_model(self, *args, **kwargs) -> None: def load_model(self, *args, **kwargs) -> None:
"""Load model with corrected memory accounting. """Load model with corrected memory accounting.
...@@ -105,6 +156,23 @@ class GMSWorker(Worker): ...@@ -105,6 +156,23 @@ class GMSWorker(Worker):
except Exception as e: except Exception as e:
logger.debug("[GMS] Could not correct memory accounting: %s", e) logger.debug("[GMS] Could not correct memory accounting: %s", e)
def initialize_from_config(self, kv_cache_config) -> None:
"""Initialize from config with post-init cudagraph mode assertion.
vLLM can try to upgrade the cudagraph mode in certain scenarios. We
assert that the final resolved mode is still compatible with shadow mode.
"""
super().initialize_from_config(kv_cache_config)
if is_shadow_mode():
from vllm.config import CUDAGraphMode
mode = self.model_runner.compilation_config.cudagraph_mode
if mode not in (CUDAGraphMode.PIECEWISE, CUDAGraphMode.NONE):
raise RuntimeError(
f"Shadow mode requires PIECEWISE cudagraph mode after resolution, "
f"but got {mode.name}. vLLM's config resolution overrode it."
)
def sleep(self, level: int = 1) -> None: def sleep(self, level: int = 1) -> None:
""" """
vLLM sleep implementation with GMS integration. vLLM sleep implementation with GMS integration.
...@@ -112,6 +180,10 @@ class GMSWorker(Worker): ...@@ -112,6 +180,10 @@ class GMSWorker(Worker):
NOTE: `level` is a no-op here: weights are only unmapped (but remain in GPU memory). NOTE: `level` is a no-op here: weights are only unmapped (but remain in GPU memory).
NOTE: We do NOT call super().sleep() because it tries to copy GPU buffers to CPU, NOTE: We do NOT call super().sleep() because it tries to copy GPU buffers to CPU,
which segfaults on already-unmapped GMS memory. which segfaults on already-unmapped GMS memory.
Handles two cases for KV cache:
1. Normal: KV cache was allocated, sleep via CuMemAllocator
2. Shadow: KV cache was skipped at startup, nothing to do
""" """
free_bytes_before = torch.cuda.mem_get_info()[0] free_bytes_before = torch.cuda.mem_get_info()[0]
...@@ -122,11 +194,16 @@ class GMSWorker(Worker): ...@@ -122,11 +194,16 @@ class GMSWorker(Worker):
manager.unmap_all_vas() manager.unmap_all_vas()
manager.disconnect() manager.disconnect()
# Sleep KV cache via CuMemAllocator # Sleep KV cache via CuMemAllocator (discard, no CPU backup)
# If KV cache was never allocated (shadow engine mode), this is a no-op
from vllm.device_allocator.cumem import CuMemAllocator from vllm.device_allocator.cumem import CuMemAllocator
allocator = CuMemAllocator.get_instance() kv_caches = getattr(self.model_runner, "kv_caches", None)
allocator.sleep(offload_tags=tuple()) if kv_caches:
allocator = CuMemAllocator.get_instance()
allocator.sleep(offload_tags=tuple())
else:
logger.info("[GMS] KV cache not allocated (shadow mode), skipping sleep")
free_bytes_after, total = torch.cuda.mem_get_info() free_bytes_after, total = torch.cuda.mem_get_info()
freed_bytes = free_bytes_after - free_bytes_before freed_bytes = free_bytes_after - free_bytes_before
...@@ -138,7 +215,18 @@ class GMSWorker(Worker): ...@@ -138,7 +215,18 @@ class GMSWorker(Worker):
) )
def wake_up(self, tags: Optional[List[str]] = None) -> None: def wake_up(self, tags: Optional[List[str]] = None) -> None:
"""vLLM wake implementation with GMS integration.""" """vLLM wake implementation with GMS integration.
Handles two cases for KV cache:
1. Normal: KV cache was allocated at startup, reallocate via CuMemAllocator
2. Shadow: KV cache was skipped at startup, allocate via allocate_kv_cache_on_wake()
"""
if (
hasattr(self.model_runner, "exit_shadow_init")
and self.model_runner.in_shadow_init
):
self.model_runner.exit_shadow_init()
if tags is None: if tags is None:
tags = ["weights", "kv_cache"] tags = ["weights", "kv_cache"]
...@@ -146,14 +234,41 @@ class GMSWorker(Worker): ...@@ -146,14 +234,41 @@ class GMSWorker(Worker):
manager = get_gms_client_memory_manager() manager = get_gms_client_memory_manager()
assert manager is not None, "GMS client is not initialized" assert manager is not None, "GMS client is not initialized"
assert manager.is_unmapped, "GMS weights are not unmapped" assert manager.is_unmapped, "GMS weights are not unmapped"
manager.connect(RequestedLockType.RO)
manager.remap_all_vas()
if "kv_cache" in tags: # These errors are fatal and unrecoverable in a worker subprocess:
from vllm.device_allocator.cumem import CuMemAllocator # the worker cannot serve requests without weights. sys.exit(1)
# ensures clean termination so the orchestrator (K8s) can restart.
try:
manager.connect(RequestedLockType.RO, timeout_ms=30_000)
manager.remap_all_vas()
except TimeoutError:
logger.error(
"Fatal: timed out waiting for GMS RO lock during remap "
"(GMS may be down or RW lock held indefinitely)"
)
sys.exit(1)
except StaleMemoryLayoutError as e:
logger.error(
"Fatal: weight layout changed while unmapped, cannot remap: %s", e
)
sys.exit(1)
except ConnectionError as e:
logger.error("Fatal: cannot connect to GMS during remap: %s", e)
sys.exit(1)
allocator = CuMemAllocator.get_instance() if "kv_cache" in tags:
allocator.wake_up(tags=["kv_cache"]) # Check if KV cache was skipped at startup (shadow engine mode)
kv_caches = getattr(self.model_runner, "kv_caches", None)
if not kv_caches:
logger.info("[GMS] KV cache not allocated - allocating on wake")
self.model_runner.allocate_kv_cache_on_wake()
logger.info("[GMS] Successfully allocated KV cache on wake")
else:
# Normal case: KV cache was allocated, reallocate via CuMemAllocator
from vllm.device_allocator.cumem import CuMemAllocator
allocator = CuMemAllocator.get_instance()
allocator.wake_up(tags=["kv_cache"])
# Reinitialize FP8 KV scales if needed # Reinitialize FP8 KV scales if needed
if self.cache_config.cache_dtype.startswith("fp8") and hasattr( if self.cache_config.cache_dtype.startswith("fp8") and hasattr(
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment