# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 from __future__ import annotations import logging import time from contextlib import ExitStack from typing import Callable import pytest from gpu_memory_service.server.fsm import ServerState from tests.utils.constants import FAULT_TOLERANCE_MODEL_NAME from tests.utils.managed_process import DynamoFrontendProcess, ManagedProcess from ..harness.gms import GMSServerProcess from ..harness.runtime import ( MIN_EXPECTED_MEMORY_RETURN_FRACTION, get_gpu_memory_used, send_completion, ) from ..harness.sglang import SGLangWithGMSProcess from ..harness.vllm import VLLMWithGMSProcess pytestmark = [pytest.mark.nightly] # Event flow under test: # 1. Weights are published once as a committed layout. # 2. KV cache starts as a live RW layout build. # 3. Sleep keeps weights committed but aborts and clears the KV layout. # 4. Wake reconnects weights as RO to the same committed layout. # 5. Wake recreates KV cache in a fresh RW layout after the old one was cleared. logger = logging.getLogger(__name__) def _run_sleep_wake_test( request, ports: dict, make_engine: Callable[[], ManagedProcess], ) -> None: with ExitStack() as stack: weights_gms = stack.enter_context( GMSServerProcess(request, device=0, tag="weights") ) kv_cache_gms = stack.enter_context( GMSServerProcess(request, device=0, tag="kv_cache") ) stack.enter_context( DynamoFrontendProcess(request, frontend_port=ports["frontend"]) ) with make_engine() as engine: result = send_completion(ports["frontend"]) logger.info("Initial inference result: %s", result) assert result["choices"] # Before sleep, weights must already be published and visible to RO # readers while KV cache remains a live RW layout owned by the engine. deadline = time.monotonic() + 30.0 while True: weights_before_sleep = weights_gms.get_runtime_state() kv_before_sleep = kv_cache_gms.get_runtime_state() if ( weights_before_sleep.state == ServerState.RO and weights_before_sleep.allocation_count > 0 and weights_before_sleep.memory_layout_hash and kv_before_sleep.state == ServerState.RW and kv_before_sleep.allocation_count > 0 ): break if time.monotonic() > deadline: raise TimeoutError("initial GMS state did not stabilize") time.sleep(0.1) mem_before = get_gpu_memory_used() logger.info("Memory before sleep: %.0f MB", mem_before / (1 << 20)) sleep_result = engine.sleep() assert sleep_result["status"] == "ok" mem_after_sleep = get_gpu_memory_used() released_bytes = mem_before - mem_after_sleep logger.info("Memory after sleep: %.0f MB", mem_after_sleep / (1 << 20)) assert mem_after_sleep < mem_before, "Sleep should reduce memory" assert released_bytes > 0 # Sleep preserves the committed weights layout but aborts and clears the # mutable KV-cache layout, which is what should release GPU memory. deadline = time.monotonic() + 30.0 while True: weights_after_sleep = weights_gms.get_runtime_state() kv_after_sleep = kv_cache_gms.get_runtime_state() if ( weights_after_sleep.state == ServerState.COMMITTED and weights_after_sleep.allocation_count == weights_before_sleep.allocation_count and weights_after_sleep.memory_layout_hash == weights_before_sleep.memory_layout_hash and kv_after_sleep.state == ServerState.EMPTY and kv_after_sleep.allocation_count == 0 ): break if time.monotonic() > deadline: raise TimeoutError( "sleep did not drive GMS into the expected state" ) time.sleep(0.1) # Weights are immutable across sleep/wake, so their event history should # still be the original publish: connect once, commit once. weights_events = weights_gms.get_event_history().events assert [event.kind for event in weights_events] == [ "rw_connected", "committed", ] # KV cache is different: sleep must abort the old RW layout and clear its # server-owned allocations before wake can start a new RW layout. kv_events = kv_cache_gms.get_event_history().events assert [event.kind for event in kv_events] == [ "rw_connected", "rw_aborted", "allocations_cleared", ] assert kv_events[-1].allocation_count > 0 wake_result = engine.wake() assert wake_result["status"] == "ok" mem_after_wake = get_gpu_memory_used() reacquired_bytes = mem_after_wake - mem_after_sleep logger.info("Memory after wake: %.0f MB", mem_after_wake / (1 << 20)) assert mem_after_wake > mem_after_sleep, "Wake should reacquire memory" assert ( reacquired_bytes ) >= released_bytes * MIN_EXPECTED_MEMORY_RETURN_FRACTION # Wake reconnects weights as RO to the same committed layout, but KV cache # must come back as a fresh RW layout with new allocations. deadline = time.monotonic() + 30.0 while True: weights_after_wake = weights_gms.get_runtime_state() kv_after_wake = kv_cache_gms.get_runtime_state() if ( weights_after_wake.state == ServerState.RO and weights_after_wake.allocation_count == weights_before_sleep.allocation_count and weights_after_wake.memory_layout_hash == weights_before_sleep.memory_layout_hash and kv_after_wake.state == ServerState.RW and kv_after_wake.allocation_count > 0 ): break if time.monotonic() > deadline: raise TimeoutError("wake did not restore the expected GMS state") time.sleep(0.1) weights_events_after_wake = weights_gms.get_event_history().events assert [event.kind for event in weights_events_after_wake] == [ "rw_connected", "committed", ] # The wake history should therefore extend the old KV sequence with one # new RW connect after the previous layout was fully cleared. kv_events_after_wake = kv_cache_gms.get_event_history().events assert [event.kind for event in kv_events_after_wake] == [ "rw_connected", "rw_aborted", "allocations_cleared", "rw_connected", ] assert kv_events_after_wake[2].allocation_count > 0 result = send_completion(ports["frontend"], "Goodbye") logger.info("Post-wake inference result: %s", result) assert result["choices"] logger.info( "Memory freed: %.0f MB", (mem_before - mem_after_sleep) / (1 << 20) ) @pytest.mark.vllm @pytest.mark.e2e @pytest.mark.gpu_1 @pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME) @pytest.mark.timeout(300) def test_gms_basic_sleep_wake_vllm( request, runtime_services_dynamic_ports, gms_ports, predownload_models, ): ports = gms_ports _run_sleep_wake_test( request, ports, make_engine=lambda: VLLMWithGMSProcess( request, "engine", ports["shadow_system"], ports["shadow_kv_event"], ports["shadow_nixl"], ports["frontend"], ), ) @pytest.mark.skip(reason="Nightly CI failure: https://linear.app/nvidia/issue/DYN-2567") @pytest.mark.sglang @pytest.mark.e2e @pytest.mark.gpu_1 @pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME) @pytest.mark.timeout(300) def test_gms_basic_sleep_wake_sglang( request, runtime_services_dynamic_ports, gms_ports, predownload_models, ): ports = gms_ports _run_sleep_wake_test( request, ports, make_engine=lambda: SGLangWithGMSProcess( request, "engine", ports["shadow_system"], ports["shadow_sglang"], ports["frontend"], ), )