Unverified Commit f10f5942 authored by Keiven C's avatar Keiven C Committed by GitHub
Browse files

refactor: consolidate fault_tolerance DynamoFrontendProcess, add timeouts (#5008)


Signed-off-by: default avatarKeiven Chang <keivenchang@users.noreply.github.com>
Co-authored-by: default avatarKeiven Chang <keivenchang@users.noreply.github.com>
parent e1c685b1
......@@ -2,9 +2,7 @@
# SPDX-License-Identifier: Apache-2.0
import logging
import os
import re
import shutil
import socket
import threading
import time
......@@ -14,61 +12,33 @@ import pytest
import requests
from tests.utils.constants import FAULT_TOLERANCE_MODEL_NAME
from tests.utils.managed_process import (
DynamoFrontendProcess as BaseDynamoFrontendProcess,
)
from tests.utils.managed_process import ManagedProcess
from tests.utils.port_utils import allocate_port, deallocate_port
logger = logging.getLogger(__name__)
class DynamoFrontendProcess(ManagedProcess):
"""Process manager for Dynamo frontend"""
class DynamoFrontendProcess(BaseDynamoFrontendProcess):
"""Fault-tolerance frontend wrapper (keeps env settings from the historical helper)."""
def __init__(self, request):
# Allocate frontend port
frontend_port = allocate_port(8100)
self.frontend_port = frontend_port
command = ["python", "-m", "dynamo.frontend", "--http-port", str(frontend_port)]
env = os.environ.copy()
env["DYN_REQUEST_PLANE"] = request.getfixturevalue("request_plane")
env["DYN_LOG"] = "debug"
# Disable canary health check - these tests expect full control over requests
# sent to the workers where canary health check intermittently sends dummy
# requests to workers interfering with the test process which may cause
# intermittent failures
env["DYN_HEALTH_CHECK_ENABLED"] = "false"
# Unset DYN_SYSTEM_PORT - frontend doesn't use system metrics server
env.pop("DYN_SYSTEM_PORT", None)
log_dir = f"{request.node.name}_frontend"
# Clean up any existing log directory from previous runs
try:
shutil.rmtree(log_dir)
logger.info(f"Cleaned up existing log directory: {log_dir}")
except FileNotFoundError:
# Directory doesn't exist, which is fine
pass
extra_env = {
"DYN_REQUEST_PLANE": request.getfixturevalue("request_plane"),
"DYN_LOG": "debug",
# These tests expect full control over requests sent to workers. The canary
# health check can inject extra requests and cause intermittent failures.
"DYN_HEALTH_CHECK_ENABLED": "false",
}
super().__init__(
command=command,
env=env,
display_output=True,
terminate_existing=False, # Don't terminate other processes of the same name, we'll only terminate our own PID
log_dir=log_dir,
request,
frontend_port=0, # allocate a free port (xdist-safe)
router_mode="round-robin",
extra_env=extra_env,
terminate_existing=False,
)
def __exit__(self, exc_type, exc_val, exc_tb):
"""Release allocated port when frontend exits."""
try:
# frontend_port is always allocated in __init__
deallocate_port(self.frontend_port)
except Exception as e:
logger.warning(f"Failed to release frontend port: {e}")
return super().__exit__(exc_type, exc_val, exc_tb)
class CancellableRequest:
"""A wrapper for a single request that can be explicitly cancelled.
......
......@@ -152,6 +152,7 @@ class DynamoWorkerProcess(ManagedProcess):
@pytest.mark.gpu_1
@pytest.mark.e2e
@pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME)
@pytest.mark.timeout(600)
def test_etcd_ha_failover_sglang_aggregated(request, predownload_models):
"""
Test ETCD High Availability with repeated node failures and recoveries using SGLang.
......@@ -224,6 +225,7 @@ def test_etcd_ha_failover_sglang_aggregated(request, predownload_models):
@pytest.mark.gpu_2
@pytest.mark.e2e
@pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME)
@pytest.mark.timeout(600)
def test_etcd_ha_failover_sglang_disaggregated(
request, predownload_models, set_ucx_tls_no_mm
):
......@@ -304,6 +306,7 @@ def test_etcd_ha_failover_sglang_disaggregated(
@pytest.mark.gpu_1
@pytest.mark.e2e
@pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME)
@pytest.mark.timeout(600)
def test_etcd_non_ha_shutdown_sglang_aggregated(request, predownload_models):
"""
Test that frontend and worker shut down when single ETCD node is terminated using SGLang.
......@@ -360,6 +363,7 @@ def test_etcd_non_ha_shutdown_sglang_aggregated(request, predownload_models):
@pytest.mark.gpu_2
@pytest.mark.e2e
@pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME)
@pytest.mark.timeout(600)
def test_etcd_non_ha_shutdown_sglang_disaggregated(
request, predownload_models, set_ucx_tls_no_mm
):
......
......@@ -134,6 +134,7 @@ class DynamoWorkerProcess(ManagedProcess):
@pytest.mark.gpu_1
@pytest.mark.e2e
@pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME)
@pytest.mark.timeout(600)
def test_etcd_ha_failover_trtllm_aggregated(request, predownload_models):
"""
Test ETCD High Availability with repeated node failures and recoveries for TRT-LLM in aggregated mode.
......@@ -206,6 +207,7 @@ def test_etcd_ha_failover_trtllm_aggregated(request, predownload_models):
@pytest.mark.gpu_1
@pytest.mark.e2e
@pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME)
@pytest.mark.timeout(600)
def test_etcd_ha_failover_trtllm_disaggregated(
request, predownload_models, set_ucx_tls_no_mm
):
......@@ -285,6 +287,7 @@ def test_etcd_ha_failover_trtllm_disaggregated(
@pytest.mark.gpu_1
@pytest.mark.e2e
@pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME)
@pytest.mark.timeout(600)
def test_etcd_non_ha_shutdown_trtllm_aggregated(request, predownload_models):
"""
Test that frontend and worker shut down when single ETCD node is terminated for TRT-LLM in aggregated mode.
......@@ -344,6 +347,7 @@ def test_etcd_non_ha_shutdown_trtllm_aggregated(request, predownload_models):
@pytest.mark.gpu_1
@pytest.mark.e2e
@pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME)
@pytest.mark.timeout(600)
def test_etcd_non_ha_shutdown_trtllm_disaggregated(
request, predownload_models, set_ucx_tls_no_mm
):
......
......@@ -117,6 +117,7 @@ class DynamoWorkerProcess(ManagedProcess):
@pytest.mark.e2e
@pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME)
@pytest.mark.nightly
@pytest.mark.timeout(600)
def test_etcd_ha_failover_vllm_aggregated(request, predownload_models):
"""
Test ETCD High Availability with repeated node failures and recoveries.
......@@ -188,6 +189,7 @@ def test_etcd_ha_failover_vllm_aggregated(request, predownload_models):
@pytest.mark.e2e
@pytest.mark.nightly
@pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME)
@pytest.mark.timeout(600)
def test_etcd_ha_failover_vllm_disaggregated(
request, predownload_models, set_ucx_tls_no_mm
):
......@@ -265,6 +267,7 @@ def test_etcd_ha_failover_vllm_disaggregated(
@pytest.mark.e2e
@pytest.mark.nightly
@pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME)
@pytest.mark.timeout(600)
def test_etcd_non_ha_shutdown_vllm_aggregated(request, predownload_models):
"""
Test that frontend and worker shut down when single ETCD node is terminated.
......@@ -320,6 +323,7 @@ def test_etcd_non_ha_shutdown_vllm_aggregated(request, predownload_models):
@pytest.mark.e2e
@pytest.mark.nightly
@pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME)
@pytest.mark.timeout(600)
def test_etcd_non_ha_shutdown_vllm_disaggregated(
request, predownload_models, set_ucx_tls_no_mm
):
......
......@@ -15,39 +15,27 @@ import requests
from tests.utils.constants import FAULT_TOLERANCE_MODEL_NAME
from tests.utils.engine_process import FRONTEND_PORT
from tests.utils.managed_process import (
DynamoFrontendProcess as BaseDynamoFrontendProcess,
)
from tests.utils.managed_process import ManagedProcess
logger = logging.getLogger(__name__)
class DynamoFrontendProcess(ManagedProcess):
"""Process manager for Dynamo frontend with ETCD HA support"""
def __init__(self, request, etcd_endpoints: list):
command = ["python", "-m", "dynamo.frontend"]
# Set debug logging and ETCD endpoints
env = os.environ.copy()
env["DYN_LOG"] = "debug"
env["ETCD_ENDPOINTS"] = ",".join(etcd_endpoints)
# Unset DYN_SYSTEM_PORT - frontend doesn't use system metrics server
env.pop("DYN_SYSTEM_PORT", None)
log_dir = f"{request.node.name}_frontend"
# Clean up any existing log directory from previous runs
try:
shutil.rmtree(log_dir)
logger.info(f"Cleaned up existing log directory: {log_dir}")
except FileNotFoundError:
pass
class DynamoFrontendProcess(BaseDynamoFrontendProcess):
"""Process manager for Dynamo frontend with ETCD HA support."""
def __init__(self, request, etcd_endpoints: list[str]):
extra_env = {
"DYN_LOG": "debug",
"ETCD_ENDPOINTS": ",".join(etcd_endpoints),
}
super().__init__(
command=command,
env=env,
display_output=True,
request,
router_mode="round-robin",
extra_env=extra_env,
terminate_existing=True,
log_dir=log_dir,
)
......
......@@ -190,6 +190,7 @@ def test_request_migration_trtllm_worker_failure(
verify_migration_occurred(frontend)
@pytest.mark.timeout(290) # 3x average
@pytest.mark.skip(reason="TRT-LLM graceful shutdown not yet implemented")
def test_request_migration_trtllm_graceful_shutdown(
request, runtime_services_dynamic_ports, set_ucx_tls_no_mm, predownload_models
......@@ -324,6 +325,7 @@ def test_no_request_migration_trtllm_worker_failure(
), f"Unexpected migration message: {e}"
@pytest.mark.timeout(185) # 3x average
@pytest.mark.skip(reason="TRT-LLM graceful shutdown not yet implemented")
def test_no_request_migration_trtllm_graceful_shutdown(
request, runtime_services_dynamic_ports, set_ucx_tls_no_mm, predownload_models
......
......@@ -2,8 +2,6 @@
# SPDX-License-Identifier: Apache-2.0
import logging
import os
import shutil
import threading
import time
......@@ -11,68 +9,32 @@ import pytest
import requests
from tests.utils.constants import FAULT_TOLERANCE_MODEL_NAME
from tests.utils.managed_process import (
DynamoFrontendProcess as BaseDynamoFrontendProcess,
)
from tests.utils.managed_process import ManagedProcess
from tests.utils.port_utils import allocate_port, deallocate_port
logger = logging.getLogger(__name__)
class DynamoFrontendProcess(ManagedProcess):
"""Process manager for Dynamo frontend"""
class DynamoFrontendProcess(BaseDynamoFrontendProcess):
"""Fault-tolerance frontend wrapper (keeps env settings from the historical helper)."""
def __init__(self, request):
# Allocate frontend port
frontend_port = allocate_port(8100)
self.frontend_port = frontend_port
command = [
"python",
"-m",
"dynamo.frontend",
"--router-mode",
"round-robin",
"--http-port",
str(frontend_port),
]
env = os.environ.copy()
env["DYN_REQUEST_PLANE"] = request.getfixturevalue("request_plane")
# Disable canary health check - these tests expect full control over requests
# sent to the workers where canary health check intermittently sends dummy
# requests to workers interfering with the test process which may cause
# intermittent failures
env["DYN_HEALTH_CHECK_ENABLED"] = "false"
# Unset DYN_SYSTEM_PORT - frontend doesn't use system metrics server
env.pop("DYN_SYSTEM_PORT", None)
log_dir = f"{request.node.name}_frontend"
# Clean up any existing log directory from previous runs
try:
shutil.rmtree(log_dir)
logger.info(f"Cleaned up existing log directory: {log_dir}")
except FileNotFoundError:
# Directory doesn't exist, which is fine
pass
extra_env = {
"DYN_REQUEST_PLANE": request.getfixturevalue("request_plane"),
# These tests expect full control over requests sent to workers. The canary
# health check can inject extra requests and cause intermittent failures.
"DYN_HEALTH_CHECK_ENABLED": "false",
}
super().__init__(
command=command,
env=env,
display_output=True,
terminate_existing=False, # Don't terminate other processes of the same name, we'll only terminate our own PID
log_dir=log_dir,
request,
frontend_port=0, # allocate a free port (xdist-safe)
router_mode="round-robin",
extra_env=extra_env,
terminate_existing=False,
)
def __exit__(self, exc_type, exc_val, exc_tb):
"""Release allocated port when frontend exits."""
try:
# frontend_port is always allocated in __init__
deallocate_port(self.frontend_port)
except Exception as e:
logger.warning(f"Failed to release frontend port: {e}")
return super().__exit__(exc_type, exc_val, exc_tb)
def start_completion_request(frontend_port: int) -> tuple:
"""
......
......@@ -11,45 +11,12 @@ import requests
from tests.utils.constants import FAULT_TOLERANCE_MODEL_NAME
from tests.utils.engine_process import FRONTEND_PORT
from tests.utils.managed_process import ManagedProcess
from tests.utils.managed_process import DynamoFrontendProcess, ManagedProcess
from tests.utils.payloads import check_models_api, completions_response_handler
logger = logging.getLogger(__name__)
class DynamoFrontendProcess(ManagedProcess):
"""Process manager for Dynamo frontend"""
def __init__(self, request):
command = ["python", "-m", "dynamo.frontend", "--router-mode", "round-robin"]
# Unset DYN_SYSTEM_PORT - frontend doesn't use system metrics server
env = os.environ.copy()
env.pop("DYN_SYSTEM_PORT", None)
log_dir = f"{request.node.name}_frontend"
# Clean up any existing log directory from previous runs
try:
shutil.rmtree(log_dir)
logger.info(f"Cleaned up existing log directory: {log_dir}")
except FileNotFoundError:
# Directory doesn't exist, which is fine
pass
super().__init__(
command=command,
env=env,
display_output=True,
terminate_existing=True,
log_dir=log_dir,
)
def get_pid(self) -> int | None:
"""Get the PID of the worker process"""
return self.proc.pid if self.proc else None
class DynamoWorkerProcess(ManagedProcess):
"""Process manager for Dynamo worker with vLLM backend"""
......@@ -163,6 +130,7 @@ def send_completion_request(
@pytest.mark.e2e
@pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME)
@pytest.mark.nightly
@pytest.mark.timeout(160) # 3x average (~50s)
@pytest.mark.skip(reason="Flaky, temporarily disabled")
def test_vllm_health_check_active(request, runtime_services):
"""
......@@ -220,6 +188,7 @@ def test_vllm_health_check_active(request, runtime_services):
@pytest.mark.e2e
@pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME)
@pytest.mark.nightly
@pytest.mark.timeout(160) # 3x average (~50s)
def test_vllm_health_check_passive(request, runtime_services, predownload_models):
"""
End-to-end test for worker fault tolerance with migration support.
......
......@@ -595,6 +595,7 @@ class DynamoFrontendProcess(ManagedProcess):
router_mode: str = "round-robin",
extra_args: Optional[list[str]] = None,
extra_env: Optional[dict[str, str]] = None,
terminate_existing: bool = True,
):
# TODO: Refactor remaining duplicate "DynamoFrontendProcess" helpers in tests to
# use this shared implementation (and delete the copies):
......@@ -602,10 +603,6 @@ class DynamoFrontendProcess(ManagedProcess):
# - tests/frontend/test_completion_mocker_engine.py
# - tests/frontend/grpc/test_tensor_parameters.py
# - tests/frontend/grpc/test_tensor_mocker_engine.py
# - tests/fault_tolerance/cancellation/utils.py
# - tests/fault_tolerance/migration/utils.py
# - tests/fault_tolerance/etcd_ha/utils.py
# - tests/fault_tolerance/test_vllm_health_check.py
self._allocated_http_port: Optional[int] = None
if frontend_port == 0:
# Treat `0` as "allocate a random free port" for xdist-safe tests.
......@@ -646,7 +643,7 @@ class DynamoFrontendProcess(ManagedProcess):
command=command,
env=env,
display_output=True,
terminate_existing=True,
terminate_existing=terminate_existing,
log_dir=log_dir,
)
......@@ -658,6 +655,11 @@ class DynamoFrontendProcess(ManagedProcess):
deallocate_port(self._allocated_http_port)
self._allocated_http_port = None
@property
def frontend_port(self) -> int:
"""Back-compat alias for older tests that expect `frontend.frontend_port`."""
return self.http_port
def main():
# NOTE: This entrypoint is for manual testing/debugging of `ManagedProcess` only.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment