Unverified Commit 0f01e724 authored by Jacky's avatar Jacky Committed by GitHub
Browse files

fix: ETCD cluster unhealthy when adding new member during E2E test & fix vLLM...


fix: ETCD cluster unhealthy when adding new member during E2E test & fix vLLM disagg launch config changed (#6976)
Signed-off-by: default avatarJacky <18255193+kthui@users.noreply.github.com>
parent 35b0ce62
...@@ -5,6 +5,7 @@ import json ...@@ -5,6 +5,7 @@ import json
import logging import logging
import os import os
import shutil import shutil
from enum import Enum
import pytest import pytest
...@@ -28,10 +29,21 @@ pytestmark = [ ...@@ -28,10 +29,21 @@ pytestmark = [
] ]
class WorkerMode(Enum):
AGGREGATED = "aggregated"
PREFILL = "prefill"
DECODE = "decode"
class DynamoWorkerProcess(ManagedProcess): class DynamoWorkerProcess(ManagedProcess):
"""Process manager for Dynamo worker with vLLM backend and ETCD HA support""" """Process manager for Dynamo worker with vLLM backend and ETCD HA support"""
def __init__(self, request, etcd_endpoints: list, is_prefill: bool = False): def __init__(
self,
request,
etcd_endpoints: list,
mode: WorkerMode = WorkerMode.AGGREGATED,
):
command = [ command = [
"python3", "python3",
"-m", "-m",
...@@ -46,16 +58,15 @@ class DynamoWorkerProcess(ManagedProcess): ...@@ -46,16 +58,15 @@ class DynamoWorkerProcess(ManagedProcess):
] ]
# Set port based on worker type # Set port based on worker type
port = "8082" if is_prefill else "8081" port = "8082" if mode == WorkerMode.PREFILL else "8081"
# Configure health check based on worker type # Configure disaggregation mode, KV transfer, and health checks per worker type
if is_prefill: if mode == WorkerMode.PREFILL:
# Prefill workers check their own status endpoint
command.extend(["--disaggregation-mode", "prefill"]) command.extend(["--disaggregation-mode", "prefill"])
health_check_urls = [(f"http://localhost:{port}/health", self.is_ready)] health_check_urls = [(f"http://localhost:{port}/health", self.is_ready)]
else: else:
# Decode workers should also check their own status endpoint first, if mode == WorkerMode.DECODE:
# then verify the frontend sees the model command.extend(["--disaggregation-mode", "decode"])
health_check_urls = [ health_check_urls = [
(f"http://localhost:{port}/health", self.is_ready), (f"http://localhost:{port}/health", self.is_ready),
(f"http://localhost:{FRONTEND_PORT}/v1/models", check_models_api), (f"http://localhost:{FRONTEND_PORT}/v1/models", check_models_api),
...@@ -69,7 +80,22 @@ class DynamoWorkerProcess(ManagedProcess): ...@@ -69,7 +80,22 @@ class DynamoWorkerProcess(ManagedProcess):
env["DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS"] = '["generate"]' env["DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS"] = '["generate"]'
env["DYN_SYSTEM_PORT"] = port env["DYN_SYSTEM_PORT"] = port
if is_prefill: # Both prefill and decode workers need kv-transfer-config for disaggregated mode
if mode != WorkerMode.AGGREGATED:
command.extend(
[
"--kv-transfer-config",
json.dumps(
{
"kv_connector": "NixlConnector",
"kv_role": "kv_both",
}
),
]
)
# KV events config and NIXL side channel port only for prefill worker
if mode == WorkerMode.PREFILL:
command.extend( command.extend(
[ [
"--kv-events-config", "--kv-events-config",
...@@ -86,7 +112,7 @@ class DynamoWorkerProcess(ManagedProcess): ...@@ -86,7 +112,7 @@ class DynamoWorkerProcess(ManagedProcess):
env["VLLM_NIXL_SIDE_CHANNEL_PORT"] = "5601" env["VLLM_NIXL_SIDE_CHANNEL_PORT"] = "5601"
# Set log directory based on worker type # Set log directory based on worker type
worker_type = "prefill_worker" if is_prefill else "worker" worker_type = "prefill_worker" if mode == WorkerMode.PREFILL else "worker"
log_dir = f"{request.node.name}_{worker_type}" log_dir = f"{request.node.name}_{worker_type}"
# Clean up any existing log directory from previous runs # Clean up any existing log directory from previous runs
...@@ -112,20 +138,18 @@ class DynamoWorkerProcess(ManagedProcess): ...@@ -112,20 +138,18 @@ class DynamoWorkerProcess(ManagedProcess):
log_dir=log_dir, log_dir=log_dir,
) )
self.is_prefill = is_prefill self.mode = mode
def is_ready(self, response) -> bool: def is_ready(self, response) -> bool:
"""Check the health of the worker process""" """Check the health of the worker process"""
worker_type = "Prefill worker" if self.mode == WorkerMode.PREFILL else "Worker"
try: try:
data = response.json() data = response.json()
if data.get("status") == "ready": if data.get("status") == "ready":
worker_type = "Prefill worker" if self.is_prefill else "Worker"
logger.info(f"{worker_type} status is ready") logger.info(f"{worker_type} status is ready")
return True return True
worker_type = "Prefill worker" if self.is_prefill else "Worker"
logger.warning(f"{worker_type} status is not ready: {data.get('status')}") logger.warning(f"{worker_type} status is not ready: {data.get('status')}")
except ValueError: except ValueError:
worker_type = "Prefill worker" if self.is_prefill else "Worker"
logger.warning(f"{worker_type} health response is not valid JSON") logger.warning(f"{worker_type} health response is not valid JSON")
return False return False
...@@ -242,11 +266,15 @@ def test_etcd_ha_failover_vllm_disaggregated( ...@@ -242,11 +266,15 @@ def test_etcd_ha_failover_vllm_disaggregated(
logger.info("Frontend started successfully") logger.info("Frontend started successfully")
# Step 4: Start the prefill worker # Step 4: Start the prefill worker
with DynamoWorkerProcess(request, etcd_endpoints, is_prefill=True): with DynamoWorkerProcess(
request, etcd_endpoints, mode=WorkerMode.PREFILL
):
logger.info("Prefill worker started successfully") logger.info("Prefill worker started successfully")
# Step 5: Start the decode worker # Step 5: Start the decode worker
with DynamoWorkerProcess(request, etcd_endpoints, is_prefill=False): with DynamoWorkerProcess(
request, etcd_endpoints, mode=WorkerMode.DECODE
):
logger.info("Decode worker started successfully") logger.info("Decode worker started successfully")
# Step 6: Send initial inference request to verify system is working # Step 6: Send initial inference request to verify system is working
...@@ -369,13 +397,13 @@ def test_etcd_non_ha_shutdown_vllm_disaggregated( ...@@ -369,13 +397,13 @@ def test_etcd_non_ha_shutdown_vllm_disaggregated(
# Step 4: Start the prefill worker # Step 4: Start the prefill worker
with DynamoWorkerProcess( with DynamoWorkerProcess(
request, etcd_endpoints, is_prefill=True request, etcd_endpoints, mode=WorkerMode.PREFILL
) as prefill_worker: ) as prefill_worker:
logger.info("Prefill worker started successfully") logger.info("Prefill worker started successfully")
# Step 5: Start the decode worker # Step 5: Start the decode worker
with DynamoWorkerProcess( with DynamoWorkerProcess(
request, etcd_endpoints, is_prefill=False request, etcd_endpoints, mode=WorkerMode.DECODE
) as decode_worker: ) as decode_worker:
logger.info("Decode worker started successfully") logger.info("Decode worker started successfully")
......
...@@ -294,9 +294,12 @@ class EtcdCluster: ...@@ -294,9 +294,12 @@ class EtcdCluster:
except Exception as e: except Exception as e:
raise RuntimeError(f"Error during member removal: {e}") raise RuntimeError(f"Error during member removal: {e}")
# Add the new member to the cluster # Add the new member to the cluster, retrying if the cluster is temporarily unhealthy
# after member removal (etcd may reject adds until raft peers are fully connected)
logger.info(f"Adding new member {name} to cluster with peer URL {peer_url}") logger.info(f"Adding new member {name} to cluster with peer URL {peer_url}")
try: max_attempts = 20
last_err = ""
for attempt in range(max_attempts):
add_result = subprocess.run( add_result = subprocess.run(
["etcdctl", "member", "add", name, f"--peer-urls={peer_url}"], ["etcdctl", "member", "add", name, f"--peer-urls={peer_url}"],
env=etcdctl_env, env=etcdctl_env,
...@@ -304,11 +307,18 @@ class EtcdCluster: ...@@ -304,11 +307,18 @@ class EtcdCluster:
text=True, text=True,
timeout=5, timeout=5,
) )
if add_result.returncode != 0: if add_result.returncode == 0:
raise RuntimeError(f"Failed to add new member: {add_result.stderr}") logger.info(f"Successfully added new member {name}")
logger.info(f"Successfully added new member {name}") break
except Exception as e: last_err = add_result.stderr.strip()
raise RuntimeError(f"Error adding new member: {e}") logger.warning(
f"Member add attempt {attempt + 1}/{max_attempts} failed: {last_err}"
)
time.sleep(0.5) # time for cluster to stabilize before retrying
else:
raise RuntimeError(
f"Failed to add new member after {max_attempts} attempts: {last_err}"
)
def start(self): def start(self):
"""Start ETCD cluster with configured number of replicas""" """Start ETCD cluster with configured number of replicas"""
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment