Unverified Commit c69e19e8 authored by Schwinn Saereesitthipitak's avatar Schwinn Saereesitthipitak Committed by GitHub
Browse files

fix(tests): keep shadows cold before failover quiesce (#8258)


Signed-off-by: default avatarSchwinn Saereesitthipitak <schwinns@nvidia.com>
parent 595ff0fa
......@@ -9,11 +9,9 @@ import json
import logging
import os
import sys
import time
from abc import ABC, abstractmethod
from contextlib import ExitStack
import pynvml
import requests
from tests.gpu_memory_service.common.gms import GMSServer
......@@ -26,32 +24,6 @@ from tests.utils.port_utils import allocate_ports, deallocate_ports
logger = logging.getLogger(__name__)
def get_gpu_memory_used(device: int = 0) -> int:
pynvml.nvmlInit()
try:
handle = pynvml.nvmlDeviceGetHandleByIndex(device)
return pynvml.nvmlDeviceGetMemoryInfo(handle).used
finally:
pynvml.nvmlShutdown()
def wait_for_memory_drop(
baseline_bytes: int,
*,
timeout_s: float = 30.0,
poll_interval_s: float = 0.5,
) -> int:
"""Poll until GPU memory drops below *baseline_bytes*, then return current usage."""
deadline = time.monotonic() + timeout_s
current = get_gpu_memory_used()
while time.monotonic() < deadline:
if current < baseline_bytes:
return current
time.sleep(poll_interval_s)
current = get_gpu_memory_used()
return current
class GMSProcessManager:
"""Start the shared GMS daemons and frontend for one test scenario."""
......@@ -314,7 +286,7 @@ class VLLMWithGMSProcess(GMSEngineProcess):
"--max-num-seqs",
"1",
"--gpu-memory-utilization",
"0.9",
"0.8",
"--kv-events-config",
kv_events_cfg,
]
......@@ -360,7 +332,7 @@ class TRTLLMWithGMSProcess(GMSEngineProcess):
read_only_weights: bool = False,
override_engine_args: str | None = None,
):
reserved_ports = allocate_ports(1)
reserved_ports = allocate_ports(1, DefaultPort.SYSTEM1.value)
self._override_engine_args = override_engine_args
try:
super().__init__(
......@@ -461,7 +433,7 @@ class SGLangWithGMSProcess(GMSEngineProcess):
"--enable-memory-saver",
"--disable-cuda-graph",
"--mem-fraction-static",
"0.9",
"0.8",
"--port",
str(self.serve_port),
]
......
......@@ -9,15 +9,12 @@ import time
import requests
from gpu_memory_service.server.fsm import ServerState
from tests.gpu_memory_service.common.runtime import get_gpu_memory_used
from tests.utils.client import send_request
from tests.utils.constants import FAULT_TOLERANCE_MODEL_NAME
from tests.utils.payloads import CompletionPayload
logger = logging.getLogger(__name__)
MIN_EXPECTED_MEMORY_RESTORE_FRACTION = 0.9
def assert_completion_ok(
frontend_port: int,
......@@ -60,25 +57,6 @@ def assert_completion_ok(
time.sleep(retry_interval)
def assert_memory_restored_after_quiesce(
label: str,
quiesced_memory: int,
active_memory: int,
released_bytes: int,
*,
min_fraction: float = MIN_EXPECTED_MEMORY_RESTORE_FRACTION,
) -> None:
restored_bytes = active_memory - quiesced_memory
logger.info(
"%s: %.2f GiB (restored %.0f MB)",
label,
active_memory / (1 << 30),
restored_bytes / (1 << 20),
)
assert active_memory > quiesced_memory
assert restored_bytes >= released_bytes * min_fraction
def quiesce_engine(
weights_gms,
kv_cache_gms,
......@@ -93,22 +71,11 @@ def quiesce_engine(
expected_weights_hash=expected_weights_hash,
)
memory_before_quiesce = get_gpu_memory_used()
assert engine.quiesce()["status"] == "ok"
memory_after_quiesce = get_gpu_memory_used()
released_bytes = memory_before_quiesce - memory_after_quiesce
logger.info(
"%s: %.2f -> %.2f GiB (freed %.0f MB)",
quiesce_label,
memory_before_quiesce / (1 << 30),
memory_after_quiesce / (1 << 30),
released_bytes / (1 << 20),
)
assert memory_after_quiesce < memory_before_quiesce
assert released_bytes > 0
logger.info("%s completed", quiesce_label)
wait_for_quiesced_layout(weights_gms, kv_cache_gms, weights_state)
return weights_state, released_bytes, memory_after_quiesce
return weights_state
def wait_for_active_layout(
......
......@@ -3,8 +3,6 @@
from __future__ import annotations
import logging
import pytest
from gpu_memory_service.server.fsm import ServerState
......@@ -13,12 +11,10 @@ from tests.gpu_memory_service.common.runtime import (
SGLangWithGMSProcess,
TRTLLMWithGMSProcess,
VLLMWithGMSProcess,
get_gpu_memory_used,
)
from tests.gpu_memory_service.flow_assertions import (
assert_completion_ok,
assert_kv_history,
assert_memory_restored_after_quiesce,
assert_weights_published_once,
quiesce_engine,
wait_for_resumed_layout,
......@@ -35,8 +31,6 @@ pytestmark = [pytest.mark.nightly, pytest.mark.fault_tolerance]
# 4. Resume reconnects weights as RO to the same committed layout.
# 5. Resume recreates KV cache in a fresh RW layout after the old one was cleared.
logger = logging.getLogger(__name__)
def _run_quiesce_resume_test(
request,
......@@ -56,7 +50,7 @@ def _run_quiesce_resume_test(
# Before quiesce, weights must already be published and visible to RO
# readers while KV cache remains a live RW layout owned by the engine.
weights_before_quiesce, released_bytes, mem_after_quiesce = quiesce_engine(
weights_before_quiesce = quiesce_engine(
weights_gms,
kv_cache_gms,
engine,
......@@ -77,14 +71,6 @@ def _run_quiesce_resume_test(
resume_result = engine.resume()
assert resume_result["status"] == "ok"
mem_after_resume = get_gpu_memory_used()
assert_memory_restored_after_quiesce(
"Memory after resume",
mem_after_quiesce,
mem_after_resume,
released_bytes,
)
# Resume reconnects weights as RO to the same committed layout, but KV cache
# must come back as a fresh RW layout with new allocations.
wait_for_resumed_layout(
......@@ -113,8 +99,6 @@ def _run_quiesce_resume_test(
success_message="Post-resume inference result",
)
logger.info("Memory freed: %.0f MB", released_bytes / (1 << 20))
@pytest.mark.e2e
@pytest.mark.gpu_1
......@@ -173,17 +157,7 @@ def test_gms_basic_quiesce_resume_trtllm(
ws = wait_for_weights_state(weights_gms, ServerState.RO, timeout=60.0)
weights_hash = ws.memory_layout_hash
mem_before = get_gpu_memory_used()
assert engine.quiesce()["status"] == "ok"
mem_after = get_gpu_memory_used()
released = mem_before - mem_after
logger.info(
"TRT-LLM quiesce: %.2f -> %.2f GiB (freed %.0f MB)",
mem_before / (1 << 30),
mem_after / (1 << 30),
released / (1 << 20),
)
assert released > 0
wait_for_weights_state(
weights_gms, ServerState.COMMITTED, expected_hash=weights_hash
......@@ -191,14 +165,6 @@ def test_gms_basic_quiesce_resume_trtllm(
assert_weights_published_once(weights_gms.get_event_history().events)
assert engine.resume()["status"] == "ok"
mem_resumed = get_gpu_memory_used()
assert_memory_restored_after_quiesce(
"TRT-LLM resume",
mem_after,
mem_resumed,
released,
min_fraction=0.6,
)
wait_for_weights_state(weights_gms, ServerState.RO, expected_hash=weights_hash)
assert_weights_published_once(weights_gms.get_event_history().events)
......@@ -209,7 +175,6 @@ def test_gms_basic_quiesce_resume_trtllm(
failure_message="Post-resume inference failed",
success_message="Post-resume inference OK",
)
logger.info("Memory freed: %.0f MB", released / (1 << 20))
@pytest.mark.trtllm
......
......@@ -17,12 +17,10 @@ from tests.gpu_memory_service.common.runtime import (
SGLangWithGMSProcess,
TRTLLMWithGMSProcess,
VLLMWithGMSProcess,
get_gpu_memory_used,
)
from tests.gpu_memory_service.flow_assertions import (
assert_completion_ok,
assert_kv_history,
assert_memory_restored_after_quiesce,
assert_weights_published_once,
quiesce_engine,
wait_for_active_layout,
......@@ -35,8 +33,8 @@ from tests.utils.managed_process import ManagedProcess
pytestmark = [pytest.mark.nightly, pytest.mark.fault_tolerance]
# Event flow under test:
# 1. Shadow A starts as the initial weights publisher, then quiesces.
# 2. Shadow B starts in read-only mode from the committed weights layout, then quiesces.
# 1. Shadow A starts as the initial weights publisher, then quiesces without serving traffic.
# 2. Shadow B starts in read-only mode from the committed weights layout, then quiesces without serving traffic.
# 3. Primary starts in read-only mode and owns the next RW KV layout.
# 4. Shadow A tries to resume while primary still owns the KV-cache RW layout.
# 5. Primary is SIGKILLed; the old KV session clears before its GPU memory is reclaimed.
......@@ -51,7 +49,6 @@ def _kill_process_group(process: ManagedProcess) -> None:
logger.warning("kill process group: no PID available")
return
memory_before_kill = get_gpu_memory_used()
try:
os.killpg(os.getpgid(pid), signal.SIGKILL)
except ProcessLookupError:
......@@ -63,13 +60,6 @@ def _kill_process_group(process: ManagedProcess) -> None:
except ChildProcessError:
pass
memory_after_kill = get_gpu_memory_used()
logger.info(
"Primary kill snapshot: %.2f -> %.2f GiB",
memory_before_kill / (1 << 30),
memory_after_kill / (1 << 30),
)
def _start_primary(
manager,
......@@ -78,8 +68,6 @@ def _start_primary(
kv_cache_gms,
*,
weights_hash: str,
quiesced_memory_after_shadow_b: int,
shadow_b_released_bytes: int,
):
primary = manager.start_engine("primary", read_only_weights=True)
assert_completion_ok(
......@@ -89,14 +77,6 @@ def _start_primary(
success_message="Primary inference OK",
)
primary_memory_in_use = get_gpu_memory_used()
assert_memory_restored_after_quiesce(
"Primary active memory",
quiesced_memory_after_shadow_b,
primary_memory_in_use,
shadow_b_released_bytes,
)
weights_with_primary, _ = wait_for_active_layout(
weights_gms,
kv_cache_gms,
......@@ -144,6 +124,7 @@ def _resume_shadow_after_primary_failover(
kv_cache_gms,
primary: ManagedProcess,
):
resume_timeout_s = 300
expected_kv_kinds_while_blocked = [
"rw_connected",
"rw_aborted",
......@@ -151,7 +132,7 @@ def _resume_shadow_after_primary_failover(
] * 3 + ["rw_connected", "allocation_oom"]
with ThreadPoolExecutor(max_workers=1) as executor:
resume_future = executor.submit(shadow.resume, 180)
resume_future = executor.submit(shadow.resume, resume_timeout_s)
deadline = time.monotonic() + 10.0
while time.monotonic() < deadline:
if resume_future.done():
......@@ -187,7 +168,7 @@ def _resume_shadow_after_primary_failover(
else:
raise TimeoutError("shadow did not reacquire KV cache after failover")
return resume_future.result(timeout=180)
return resume_future.result(timeout=resume_timeout_s)
def _run_shadow_failover_test(
......@@ -202,17 +183,7 @@ def _run_shadow_failover_test(
shadow_a = manager.start_engine(
"shadow-a",
)
assert_completion_ok(
frontend_port,
"Hello",
failure_message="Shadow inference failed",
success_message="Shadow inference OK",
)
(
weights_state_after_shadow_a,
_,
_,
) = quiesce_engine(
weights_state_after_shadow_a = quiesce_engine(
weights_gms,
kv_cache_gms,
shadow_a,
......@@ -223,17 +194,7 @@ def _run_shadow_failover_test(
"shadow-b",
read_only_weights=True,
)
assert_completion_ok(
frontend_port,
"Hello",
failure_message="Shadow inference failed",
success_message="Shadow inference OK",
)
(
weights_state_after_shadow_b,
shadow_b_released_bytes,
quiesced_memory_after_shadow_b,
) = quiesce_engine(
weights_state_after_shadow_b = quiesce_engine(
weights_gms,
kv_cache_gms,
shadow_b,
......@@ -248,16 +209,12 @@ def _run_shadow_failover_test(
kv_events_after_shadow_quiesce = kv_cache_gms.get_event_history().events
assert_kv_history(kv_events_after_shadow_quiesce, cleared_layouts=2)
# Later engines import the committed weights layout read-only, so
# compare them against the importer footprint from shadow-b.
primary, weights_with_primary = _start_primary(
manager,
frontend_port,
weights_gms,
kv_cache_gms,
weights_hash=weights_hash,
quiesced_memory_after_shadow_b=quiesced_memory_after_shadow_b,
shadow_b_released_bytes=shadow_b_released_bytes,
)
resume_result = _resume_shadow_after_primary_failover(
shadow_a,
......@@ -266,13 +223,6 @@ def _run_shadow_failover_test(
)
assert resume_result["status"] == "ok"
shadow_memory_after_resume = get_gpu_memory_used()
assert_memory_restored_after_quiesce(
"Shadow resume memory",
quiesced_memory_after_shadow_b,
shadow_memory_after_resume,
shadow_b_released_bytes,
)
# Once the primary is gone, the failover shadow should finish resume
# with the same committed weights layout and a new live RW KV-cache layout.
......@@ -339,27 +289,17 @@ def _trtllm_quiesce(
label: str,
expected_hash: str | None = None,
):
"""Quiesce a weights-only TRT-LLM engine and return state tuple."""
"""Quiesce a weights-only TRT-LLM engine and return the weights state."""
wait_for_weights_state(
weights_gms,
ServerState.RO,
expected_hash=expected_hash,
timeout=60.0,
)
mem_before = get_gpu_memory_used()
assert engine.quiesce()["status"] == "ok"
mem_after = get_gpu_memory_used()
released = mem_before - mem_after
logger.info(
"%s: %.2f -> %.2f GiB (freed %.0f MB)",
label,
mem_before / (1 << 30),
mem_after / (1 << 30),
released / (1 << 20),
)
assert released > 0
logger.info("%s completed", label)
ws = wait_for_weights_state(weights_gms, ServerState.COMMITTED)
return ws, released, mem_after
return ws
@pytest.mark.trtllm
......@@ -383,9 +323,7 @@ def test_gms_shadow_engine_failover_trtllm(
failure_message="Shadow A inference failed",
success_message="Shadow A inference OK",
)
ws_a, released_a, _ = _trtllm_quiesce(
weights_gms, shadow_a, label="Shadow A quiesce"
)
ws_a = _trtllm_quiesce(weights_gms, shadow_a, label="Shadow A quiesce")
weights_hash = ws_a.memory_layout_hash
# Shadow B starts RO, then quiesces.
......@@ -396,7 +334,7 @@ def test_gms_shadow_engine_failover_trtllm(
failure_message="Shadow B inference failed",
success_message="Shadow B inference OK",
)
_, _, mem_after_b = _trtllm_quiesce(
_trtllm_quiesce(
weights_gms,
shadow_b,
label="Shadow B quiesce",
......@@ -412,14 +350,6 @@ def test_gms_shadow_engine_failover_trtllm(
failure_message="Primary inference failed",
success_message="Primary inference OK",
)
primary_mem = get_gpu_memory_used()
assert_memory_restored_after_quiesce(
"Primary active",
mem_after_b,
primary_mem,
released_a,
min_fraction=0.6,
)
wait_for_weights_state(
weights_gms,
ServerState.RO,
......@@ -432,14 +362,6 @@ def test_gms_shadow_engine_failover_trtllm(
resume_result = shadow_a.resume(timeout=180)
assert resume_result["status"] == "ok"
shadow_mem = get_gpu_memory_used()
assert_memory_restored_after_quiesce(
"Shadow A resume",
mem_after_b,
shadow_mem,
released_a,
min_fraction=0.6,
)
wait_for_weights_state(
weights_gms,
ServerState.RO,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment