test_quiesce_resume.py 4.55 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

from __future__ import annotations

import logging

import pytest

from tests.gpu_memory_service.common.runtime import (
    GMSProcessManager,
    SGLangWithGMSProcess,
    VLLMWithGMSProcess,
    get_gpu_memory_used,
)
from tests.gpu_memory_service.flow_assertions import (
    assert_completion_ok,
    assert_kv_history,
    assert_memory_restored_after_quiesce,
    assert_weights_published_once,
    quiesce_engine,
    wait_for_resumed_layout,
)
from tests.utils.constants import FAULT_TOLERANCE_MODEL_NAME

pytestmark = [pytest.mark.nightly, pytest.mark.fault_tolerance]

# Event flow under test:
# 1. Weights are published once as a committed layout.
# 2. KV cache starts as a live RW layout build.
# 3. Quiesce keeps weights committed but aborts and clears the KV layout.
# 4. Resume reconnects weights as RO to the same committed layout.
# 5. Resume recreates KV cache in a fresh RW layout after the old one was cleared.

logger = logging.getLogger(__name__)


def _run_quiesce_resume_test(
    request,
    engine_cls,
) -> None:
    with GMSProcessManager(request, engine_cls) as manager:
        frontend_port = manager.frontend_port
        weights_gms = manager.weights_gms
        kv_cache_gms = manager.kv_cache_gms
        engine = manager.start_engine("engine")
        assert_completion_ok(
            frontend_port,
            "Hello",
            failure_message="Initial inference failed",
            success_message="Initial inference result",
        )

        # Before quiesce, weights must already be published and visible to RO
        # readers while KV cache remains a live RW layout owned by the engine.
        weights_before_quiesce, released_bytes, mem_after_quiesce = quiesce_engine(
            weights_gms,
            kv_cache_gms,
            engine,
            quiesce_label="Engine quiesce",
        )

        # Weights are immutable across quiesce/resume, so their event history should
        # still be the original publish: connect once, commit once.
        weights_events = weights_gms.get_event_history().events
        assert_weights_published_once(weights_events)

        # KV cache is different: quiesce must abort the old RW layout and clear
        # its server-owned allocations before resume can start a new RW layout.
        kv_events = kv_cache_gms.get_event_history().events
        assert_kv_history(kv_events, cleared_layouts=1)
        assert kv_events[-1].allocation_count > 0

        resume_result = engine.resume()
        assert resume_result["status"] == "ok"

        mem_after_resume = get_gpu_memory_used()
        assert_memory_restored_after_quiesce(
            "Memory after resume",
            mem_after_quiesce,
            mem_after_resume,
            released_bytes,
        )

        # Resume reconnects weights as RO to the same committed layout, but KV cache
        # must come back as a fresh RW layout with new allocations.
        wait_for_resumed_layout(
            weights_gms,
            kv_cache_gms,
            weights_before_quiesce,
        )

        weights_events_after_resume = weights_gms.get_event_history().events
        assert_weights_published_once(weights_events_after_resume)

        # The resume history should therefore extend the old KV sequence with one
        # new RW connect after the previous layout was fully cleared.
        kv_events_after_resume = kv_cache_gms.get_event_history().events
        assert_kv_history(
            kv_events_after_resume,
            cleared_layouts=1,
            suffix=["rw_connected"],
        )
        assert kv_events_after_resume[2].allocation_count > 0

        assert_completion_ok(
            frontend_port,
            "Goodbye",
            failure_message="Post-resume inference failed",
            success_message="Post-resume inference result",
        )

        logger.info("Memory freed: %.0f MB", released_bytes / (1 << 20))


@pytest.mark.e2e
@pytest.mark.gpu_1
@pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME)
@pytest.mark.timeout(300)
@pytest.mark.vllm
def test_gms_basic_quiesce_resume_vllm(
    request,
    runtime_services_dynamic_ports,
    predownload_models,
):
    _run_quiesce_resume_test(request, VLLMWithGMSProcess)


@pytest.mark.e2e
@pytest.mark.gpu_1
@pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME)
@pytest.mark.timeout(300)
@pytest.mark.sglang
def test_gms_basic_quiesce_resume_sglang(
    request,
    runtime_services_dynamic_ports,
    predownload_models,
):
    _run_quiesce_resume_test(request, SGLangWithGMSProcess)