Unverified Commit 8ed69ea2 authored by Tzu-Ling Kan's avatar Tzu-Ling Kan Committed by GitHub
Browse files

fix(test): fix vllm disaggregated cancellation tests (#6758)


Signed-off-by: default avatarTzu-Ling <tzulingk@nvidia.com>
Co-authored-by: default avatarClaude Sonnet 4.6 <noreply@anthropic.com>
parent aedfc0a3
...@@ -32,7 +32,6 @@ logger = logging.getLogger(__name__) ...@@ -32,7 +32,6 @@ logger = logging.getLogger(__name__)
pytestmark = [ pytestmark = [
pytest.mark.fault_tolerance, pytest.mark.fault_tolerance,
pytest.mark.vllm, pytest.mark.vllm,
pytest.mark.gpu_1,
pytest.mark.e2e, pytest.mark.e2e,
pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME), pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME),
pytest.mark.parametrize("request_plane", ["nats", "tcp"], indirect=True), pytest.mark.parametrize("request_plane", ["nats", "tcp"], indirect=True),
...@@ -46,7 +45,7 @@ class DynamoWorkerProcess(ManagedProcess): ...@@ -46,7 +45,7 @@ class DynamoWorkerProcess(ManagedProcess):
self, self,
request, request,
frontend_port: int, frontend_port: int,
is_prefill: bool = False, is_prefill: bool | None = None,
): ):
# Allocate system port for this worker # Allocate system port for this worker
system_port = allocate_port(9100) system_port = allocate_port(9100)
...@@ -66,16 +65,35 @@ class DynamoWorkerProcess(ManagedProcess): ...@@ -66,16 +65,35 @@ class DynamoWorkerProcess(ManagedProcess):
"16384", "16384",
] ]
# Configure health check based on worker type # Configure disaggregation mode, KV transfer, and health checks per worker type
if is_prefill: if is_prefill is True:
# Prefill workers check their own status endpoint # Prefill worker: disaggregated prefill mode; check own status endpoint only
command.extend(["--disaggregation-mode", "prefill"]) command.extend(["--disaggregation-mode", "prefill"])
command.extend(
[
"--kv-transfer-config",
'{"kv_connector":"NixlConnector","kv_role":"kv_both"}',
]
)
health_check_urls = [ health_check_urls = [
(f"http://localhost:{system_port}/health", self.is_ready) (f"http://localhost:{system_port}/health", self.is_ready)
] ]
elif is_prefill is False:
# Decode worker: disaggregated decode mode; also verify frontend sees the model
command.extend(["--disaggregation-mode", "decode"])
command.extend(
[
"--kv-transfer-config",
'{"kv_connector":"NixlConnector","kv_role":"kv_both"}',
]
)
health_check_urls = [
(f"http://localhost:{system_port}/health", self.is_ready),
(f"http://localhost:{frontend_port}/v1/models", check_models_api),
(f"http://localhost:{frontend_port}/health", check_health_generate),
]
else: else:
# Decode workers should also check their own status endpoint first, # Aggregated worker: no disaggregation mode; verify frontend sees the model
# then verify the frontend sees the model
health_check_urls = [ health_check_urls = [
(f"http://localhost:{system_port}/health", self.is_ready), (f"http://localhost:{system_port}/health", self.is_ready),
(f"http://localhost:{frontend_port}/v1/models", check_models_api), (f"http://localhost:{frontend_port}/v1/models", check_models_api),
...@@ -98,7 +116,7 @@ class DynamoWorkerProcess(ManagedProcess): ...@@ -98,7 +116,7 @@ class DynamoWorkerProcess(ManagedProcess):
# Set KV events config and NIXL side channel port only for prefill worker # Set KV events config and NIXL side channel port only for prefill worker
# to avoid conflicts with decode worker # to avoid conflicts with decode worker
if is_prefill: if is_prefill is True:
command.extend( command.extend(
[ [
"--kv-events-config", "--kv-events-config",
...@@ -117,7 +135,12 @@ class DynamoWorkerProcess(ManagedProcess): ...@@ -117,7 +135,12 @@ class DynamoWorkerProcess(ManagedProcess):
] = "5601" # TODO: use dynamic port allocation ] = "5601" # TODO: use dynamic port allocation
# Set log directory based on worker type # Set log directory based on worker type
worker_type = "prefill_worker" if is_prefill else "worker" if is_prefill is True:
worker_type = "prefill_worker"
elif is_prefill is False:
worker_type = "decode_worker"
else:
worker_type = "worker"
log_dir = f"{request.node.name}_{worker_type}" log_dir = f"{request.node.name}_{worker_type}"
# Clean up any existing log directory from previous runs # Clean up any existing log directory from previous runs
...@@ -179,6 +202,7 @@ class DynamoWorkerProcess(ManagedProcess): ...@@ -179,6 +202,7 @@ class DynamoWorkerProcess(ManagedProcess):
@pytest.mark.timeout(110) # 3x average @pytest.mark.timeout(110) # 3x average
@pytest.mark.post_merge @pytest.mark.post_merge
@pytest.mark.gpu_1
def test_request_cancellation_vllm_aggregated( def test_request_cancellation_vllm_aggregated(
request, runtime_services_dynamic_ports, predownload_models request, runtime_services_dynamic_ports, predownload_models
): ):
...@@ -261,6 +285,7 @@ def test_request_cancellation_vllm_aggregated( ...@@ -261,6 +285,7 @@ def test_request_cancellation_vllm_aggregated(
@pytest.mark.timeout(150) # 3x average @pytest.mark.timeout(150) # 3x average
@pytest.mark.nightly @pytest.mark.nightly
@pytest.mark.gpu_2
def test_request_cancellation_vllm_decode_cancel( def test_request_cancellation_vllm_decode_cancel(
request, runtime_services_dynamic_ports, set_ucx_tls_no_mm, predownload_models request, runtime_services_dynamic_ports, set_ucx_tls_no_mm, predownload_models
): ):
...@@ -343,6 +368,7 @@ def test_request_cancellation_vllm_decode_cancel( ...@@ -343,6 +368,7 @@ def test_request_cancellation_vllm_decode_cancel(
@pytest.mark.timeout(150) # 3x average @pytest.mark.timeout(150) # 3x average
@pytest.mark.nightly @pytest.mark.nightly
@pytest.mark.gpu_2
def test_request_cancellation_vllm_prefill_cancel( def test_request_cancellation_vllm_prefill_cancel(
request, runtime_services_dynamic_ports, set_ucx_tls_no_mm, predownload_models request, runtime_services_dynamic_ports, set_ucx_tls_no_mm, predownload_models
): ):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment