# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 """ Test Execution Times (Last Run: 2026-01-13): - test_request_migration_sglang_aggregated: ~75s - test_request_migration_sglang_prefill: N/A - test_request_migration_sglang_kv_transfer: N/A - test_request_migration_sglang_decode: ~75s """ import logging import os import shutil import pytest from tests.utils.constants import FAULT_TOLERANCE_MODEL_NAME from tests.utils.managed_process import ManagedProcess from tests.utils.payloads import check_models_api from tests.utils.port_utils import allocate_port, deallocate_port # Customized utils for migration tests from .utils import DynamoFrontendProcess, run_migration_test logger = logging.getLogger(__name__) pytestmark = [ pytest.mark.fault_tolerance, pytest.mark.sglang, pytest.mark.gpu_1, pytest.mark.e2e, pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME), pytest.mark.post_merge, # post_merge to pinpoint failure commit pytest.mark.parametrize( "migration_limit", [3, 0], ids=["migration_enabled", "migration_disabled"] ), pytest.mark.parametrize( "immediate_kill", [ pytest.param(True, id="worker_failure"), pytest.param( False, id="graceful_shutdown", marks=pytest.mark.xfail( strict=False, reason="SGLang graceful shutdown not yet implemented" ), ), ], ), pytest.mark.parametrize( "request_api", [ pytest.param("chat"), pytest.param( "completion", marks=pytest.mark.skip(reason="Behavior unverified yet"), ), ], ), pytest.mark.parametrize( "stream", [ pytest.param(True, id="stream"), pytest.param( False, id="unary", marks=pytest.mark.skip(reason="Behavior unverified yet"), ), ], ), pytest.mark.parametrize("request_plane", ["nats", "tcp"], indirect=True), ] class DynamoWorkerProcess(ManagedProcess): """Process manager for Dynamo worker with SGLang backend Supports both aggregated mode (single worker) and disaggregated mode (separate prefill and decode workers). Args: request: pytest request fixture worker_id: Unique identifier for the worker (e.g., "worker1", "worker2") frontend_port: Port where the frontend is running disagg_mode: None for aggregated, "prefill" or "decode" for disaggregated """ def __init__( self, request, worker_id: str, frontend_port: int, disagg_mode: str | None = None, ): self.worker_id = worker_id self.system_port = allocate_port(9100) self.disagg_mode = disagg_mode command = [ "python3", "-m", "dynamo.sglang", "--model-path", FAULT_TOLERANCE_MODEL_NAME, "--served-model-name", FAULT_TOLERANCE_MODEL_NAME, "--trust-remote-code", "--page-size", "16", "--tp", "1", "--mem-fraction-static", "0.3", "--context-length", "8192", ] if disagg_mode is None: # Aggregated command.append("--skip-tokenizer-init") else: # Disaggregated command.extend( [ "--disaggregation-mode", disagg_mode, "--disaggregation-bootstrap-port", f"1234{worker_id[-1]}", "--host", "0.0.0.0", "--disaggregation-transfer-backend", "nixl", ] ) if disagg_mode == "prefill": command.extend(["--port", "40000"]) # Set environment variables env = os.environ.copy() env["DYN_REQUEST_PLANE"] = request.getfixturevalue("request_plane") env["DYN_LOG"] = "debug" # Disable canary health check - these tests expect full control over requests # sent to the workers where canary health check intermittently sends dummy # requests to workers interfering with the test process which may cause # intermittent failures env["DYN_HEALTH_CHECK_ENABLED"] = "false" env["DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS"] = '["generate"]' env["DYN_SYSTEM_PORT"] = str(self.system_port) env["DYN_HTTP_PORT"] = str(frontend_port) # Configure health check based on worker type health_check_urls = [ (f"http://localhost:{self.system_port}/health", self.is_ready) ] if disagg_mode is None or disagg_mode == "decode": health_check_urls.append( (f"http://localhost:{frontend_port}/v1/models", check_models_api) ) # TODO: Have the managed process take a command name explicitly to distinguish # between processes started with the same command. log_dir = f"{request.node.name}_{worker_id}" # Clean up any existing log directory from previous runs try: shutil.rmtree(log_dir) logger.info(f"Cleaned up existing log directory: {log_dir}") except FileNotFoundError: # Directory doesn't exist, which is fine pass super().__init__( command=command, env=env, health_check_urls=health_check_urls, timeout=300, display_output=True, terminate_all_matching_process_names=False, stragglers=["SGLANG:EngineCore"], straggler_commands=["-m dynamo.sglang"], log_dir=log_dir, ) def __exit__(self, exc_type, exc_val, exc_tb): """Release allocated port when worker exits.""" try: # system_port is a required parameter, always set in __init__ deallocate_port(self.system_port) except Exception as e: logging.warning(f"Failed to release SGLang worker port: {e}") return super().__exit__(exc_type, exc_val, exc_tb) def is_ready(self, response) -> bool: """Check the health of the worker process""" try: data = response.json() if data.get("status") == "ready": logger.info(f"{self.worker_id} status is ready") return True logger.warning( f"{self.worker_id} status is not ready: {data.get('status')}" ) except ValueError: logger.warning(f"{self.worker_id} health response is not valid JSON") return False @pytest.mark.timeout(230) # 3x average def test_request_migration_sglang_aggregated( request, runtime_services_dynamic_ports, set_ucx_tls_no_mm, predownload_models, migration_limit, immediate_kill, request_api, stream, ): """ End-to-end test for aggregated worker request migration. Parameters: immediate_kill: True for abrupt kill (SIGKILL), False for graceful shutdown (SIGTERM) migration_limit: > 0 to verify migration succeeds, 0 to verify request fails request_api: "chat" for chat completion API, "completion" for completion API stream: True for streaming, False for non-streaming """ # Step 1: Start the frontend with DynamoFrontendProcess(request, migration_limit=migration_limit) as frontend: logger.info("Frontend started successfully") # Step 2: Start 2 workers with DynamoWorkerProcess(request, "worker1", frontend.frontend_port) as worker1: logger.info(f"Worker 1 PID: {worker1.get_pid()}") with DynamoWorkerProcess( request, "worker2", frontend.frontend_port, ) as worker2: logger.info(f"Worker 2 PID: {worker2.get_pid()}") # Step 3: Run migration test run_migration_test( frontend, worker1, worker2, receiving_pattern="New Request ID: ", migration_limit=migration_limit, immediate_kill=immediate_kill, use_chat_completion=(request_api == "chat"), stream=stream, ) @pytest.mark.skip(reason="Cannot reliably migrate at Prefill that finish < 1 ms") @pytest.mark.xfail(strict=False, reason="Prefill migration not yet supported") @pytest.mark.timeout(230) # 3x average def test_request_migration_sglang_prefill( request, runtime_services_dynamic_ports, set_ucx_tls_no_mm, predownload_models, migration_limit, immediate_kill, request_api, stream, ): """ End-to-end test for prefill worker request migration in disaggregated mode. Setup: 1 decode worker + 2 prefill workers Parameters: immediate_kill: True for abrupt kill (SIGKILL), False for graceful shutdown (SIGTERM) migration_limit: > 0 to verify migration succeeds, 0 to verify request fails request_api: "chat" for chat completion API, "completion" for completion API stream: True for streaming, False for non-streaming """ # Step 1: Start the frontend with DynamoFrontendProcess( request, migration_limit=migration_limit, enforce_disagg=True ) as frontend: logger.info("Frontend started successfully") # Step 2: Start decode worker first (required for prefill workers to connect) with DynamoWorkerProcess( request, "worker0", frontend.frontend_port, disagg_mode="decode", ) as decode_worker: logger.info(f"Decode Worker PID: {decode_worker.get_pid()}") # Step 3: Start 2 prefill workers with DynamoWorkerProcess( request, "worker1", frontend.frontend_port, disagg_mode="prefill", ) as prefill1: logger.info(f"Prefill Worker 1 PID: {prefill1.get_pid()}") with DynamoWorkerProcess( request, "worker2", frontend.frontend_port, disagg_mode="prefill", ) as prefill2: logger.info(f"Prefill Worker 2 PID: {prefill2.get_pid()}") # Step 4: Run migration test run_migration_test( frontend, prefill1, prefill2, receiving_pattern="New Request ID: ", migration_limit=migration_limit, immediate_kill=immediate_kill, use_chat_completion=(request_api == "chat"), stream=stream, use_long_prompt=True, ) @pytest.mark.skip(reason="KV cache transfer may fail") @pytest.mark.timeout(230) # 3x average def test_request_migration_sglang_kv_transfer( request, runtime_services_dynamic_ports, set_ucx_tls_no_mm, predownload_models, migration_limit, immediate_kill, request_api, stream, ): """ End-to-end test for request migration during KV transfer in disaggregated mode. Setup: 1 prefill worker + 2 decode workers Parameters: immediate_kill: True for abrupt kill (SIGKILL), False for graceful shutdown (SIGTERM) migration_limit: > 0 to verify migration succeeds, 0 to verify request fails request_api: "chat" for chat completion API, "completion" for completion API stream: True for streaming, False for non-streaming """ # Step 1: Start the frontend with DynamoFrontendProcess( request, migration_limit=migration_limit, enforce_disagg=True ) as frontend: logger.info("Frontend started successfully") # Step 2: Start prefill worker first with DynamoWorkerProcess( request, "worker0", frontend.frontend_port, disagg_mode="prefill", ) as prefill_worker: logger.info(f"Prefill Worker PID: {prefill_worker.get_pid()}") # Step 3: Start 2 decode workers with DynamoWorkerProcess( request, "worker1", frontend.frontend_port, disagg_mode="decode", ) as decode1: logger.info(f"Decode Worker 1 PID: {decode1.get_pid()}") with DynamoWorkerProcess( request, "worker2", frontend.frontend_port, disagg_mode="decode", ) as decode2: logger.info(f"Decode Worker 2 PID: {decode2.get_pid()}") # Step 4: Run migration test run_migration_test( frontend, decode1, decode2, receiving_pattern="New Request ID: ", migration_limit=migration_limit, immediate_kill=immediate_kill, use_chat_completion=(request_api == "chat"), stream=stream, use_long_prompt=True, ) @pytest.mark.timeout(230) # 3x average def test_request_migration_sglang_decode( request, runtime_services_dynamic_ports, set_ucx_tls_no_mm, predownload_models, migration_limit, immediate_kill, request_api, stream, ): """ End-to-end test for decode worker request migration in disaggregated mode. Setup: 1 prefill worker + 2 decode workers Parameters: immediate_kill: True for abrupt kill (SIGKILL), False for graceful shutdown (SIGTERM) migration_limit: > 0 to verify migration succeeds, 0 to verify request fails request_api: "chat" for chat completion API, "completion" for completion API stream: True for streaming, False for non-streaming """ if not stream: pytest.skip( "Decode test requires streaming to wait for response before stopping worker" ) # Step 1: Start the frontend with DynamoFrontendProcess( request, migration_limit=migration_limit, enforce_disagg=True ) as frontend: logger.info("Frontend started successfully") # Step 2: Start prefill worker first with DynamoWorkerProcess( request, "worker0", frontend.frontend_port, disagg_mode="prefill", ) as prefill_worker: logger.info(f"Prefill Worker PID: {prefill_worker.get_pid()}") # Step 3: Start 2 decode workers with DynamoWorkerProcess( request, "worker1", frontend.frontend_port, disagg_mode="decode", ) as decode1: logger.info(f"Decode Worker 1 PID: {decode1.get_pid()}") with DynamoWorkerProcess( request, "worker2", frontend.frontend_port, disagg_mode="decode", ) as decode2: logger.info(f"Decode Worker 2 PID: {decode2.get_pid()}") # Step 4: Run migration test run_migration_test( frontend, decode1, decode2, receiving_pattern="New Request ID: ", migration_limit=migration_limit, immediate_kill=immediate_kill, use_chat_completion=(request_api == "chat"), stream=stream, wait_for_new_response_before_stop=True, )