Unverified Commit 166e1f4d authored by Keiven C's avatar Keiven C Committed by GitHub
Browse files

feat: use dynamic port allocation for DYN_SYSTEM_PORT in e2e router t… (#6262)


Signed-off-by: default avatarKeiven Chang <keivenchang@users.noreply.github.com>
parent d3fb7d57
...@@ -47,6 +47,7 @@ class DynamoWorkerProcess(ManagedProcess): ...@@ -47,6 +47,7 @@ class DynamoWorkerProcess(ManagedProcess):
env = os.environ.copy() env = os.environ.copy()
env["DYN_LOG"] = "debug" env["DYN_LOG"] = "debug"
env["DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS"] = '["generate"]' env["DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS"] = '["generate"]'
# TODO: Replace hardcoded port with allocate_ports() for xdist-safe parallel execution
env["DYN_SYSTEM_PORT"] = "9345" env["DYN_SYSTEM_PORT"] = "9345"
env["DYN_KVBM_CPU_CACHE_GB"] = "20" env["DYN_KVBM_CPU_CACHE_GB"] = "20"
env["DYN_KVBM_DISK_CACHE_GB"] = "60" env["DYN_KVBM_DISK_CACHE_GB"] = "60"
......
...@@ -43,6 +43,7 @@ class DynamoWorkerProcess(ManagedProcess): ...@@ -43,6 +43,7 @@ class DynamoWorkerProcess(ManagedProcess):
env = os.environ.copy() env = os.environ.copy()
env["DYN_LOG"] = "debug" env["DYN_LOG"] = "debug"
env["DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS"] = '["generate"]' env["DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS"] = '["generate"]'
# TODO: Replace hardcoded port with allocate_ports() for xdist-safe parallel execution
env["DYN_SYSTEM_PORT"] = "9345" env["DYN_SYSTEM_PORT"] = "9345"
# TODO: Have the managed process take a command name explicitly to distinguish # TODO: Have the managed process take a command name explicitly to distinguish
......
...@@ -62,6 +62,7 @@ class DynamoWorkerProcess(ManagedProcess): ...@@ -62,6 +62,7 @@ class DynamoWorkerProcess(ManagedProcess):
env = os.environ.copy() env = os.environ.copy()
env["DYN_LOG"] = "debug" env["DYN_LOG"] = "debug"
env["DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS"] = '["generate"]' env["DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS"] = '["generate"]'
# TODO: Replace hardcoded port with allocate_ports() for xdist-safe parallel execution
env["DYN_SYSTEM_PORT"] = "9345" env["DYN_SYSTEM_PORT"] = "9345"
env["DYN_KVBM_CPU_CACHE_GB"] = "20" env["DYN_KVBM_CPU_CACHE_GB"] = "20"
env["DYN_KVBM_DISK_CACHE_GB"] = "60" env["DYN_KVBM_DISK_CACHE_GB"] = "60"
......
...@@ -117,6 +117,14 @@ class SGLangProcess: ...@@ -117,6 +117,14 @@ class SGLangProcess:
self.worker_processes = [] self.worker_processes = []
self.store_backend = store_backend self.store_backend = store_backend
# Dynamically allocate unique system and KV event ports (one per worker)
# to avoid conflicts in parallel test runs.
self._system_ports = allocate_ports(num_workers, DefaultPort.SYSTEM1.value)
self._kv_event_ports = allocate_ports(num_workers, DefaultPort.SYSTEM1.value)
request.addfinalizer(
lambda: deallocate_ports(self._system_ports + self._kv_event_ports)
)
if sglang_args is None: if sglang_args is None:
sglang_args = {} sglang_args = {}
...@@ -181,8 +189,8 @@ class SGLangProcess: ...@@ -181,8 +189,8 @@ class SGLangProcess:
) )
# Add per-worker KV events config for ZMQ publishing # Add per-worker KV events config for ZMQ publishing
# Each worker needs a unique port to avoid conflicts # Ports are dynamically allocated for xdist-safe parallel execution.
kv_events_port = 20080 + worker_idx kv_events_port = self._kv_event_ports[worker_idx]
kv_events_config = f'{{"publisher":"zmq","topic":"kv-events","endpoint":"tcp://*:{kv_events_port}"}}' kv_events_config = f'{{"publisher":"zmq","topic":"kv-events","endpoint":"tcp://*:{kv_events_port}"}}'
command.extend(["--kv-events-config", kv_events_config]) command.extend(["--kv-events-config", kv_events_config])
...@@ -190,11 +198,16 @@ class SGLangProcess: ...@@ -190,11 +198,16 @@ class SGLangProcess:
if durable_kv_events: if durable_kv_events:
command.append("--durable-kv-events") command.append("--durable-kv-events")
# Each SGLang worker needs a unique DYN_SYSTEM_PORT to avoid conflicts.
# Ports are dynamically allocated for xdist-safe parallel execution.
system_port = self._system_ports[worker_idx]
env = os.environ.copy() # Copy parent environment env = os.environ.copy() # Copy parent environment
env_vars = { env_vars = {
"CUDA_VISIBLE_DEVICES": gpu_device, "CUDA_VISIBLE_DEVICES": gpu_device,
"DYN_NAMESPACE": self.namespace, "DYN_NAMESPACE": self.namespace,
"DYN_REQUEST_PLANE": request_plane, "DYN_REQUEST_PLANE": request_plane,
"DYN_SYSTEM_PORT": str(system_port),
"PYTHONHASHSEED": "0", # for deterministic event id's "PYTHONHASHSEED": "0", # for deterministic event id's
} }
...@@ -219,13 +232,13 @@ class SGLangProcess: ...@@ -219,13 +232,13 @@ class SGLangProcess:
if data_parallel_size is not None: if data_parallel_size is not None:
logger.info( logger.info(
f"Created {data_parallel_size} DP ranks per worker on GPU(s) {gpu_device} " f"Created {data_parallel_size} DP ranks per worker on GPU(s) {gpu_device} "
f"(mem_frac={mem_fraction_static}, kv_port={kv_events_port}) " f"(mem_frac={mem_fraction_static}, system_port={system_port}, kv_port={kv_events_port}) "
f"with endpoint: {self.endpoint}" f"with endpoint: {self.endpoint}"
) )
else: else:
logger.info( logger.info(
f"Created SGLang worker {worker_idx} on GPU {gpu_device} " f"Created SGLang worker {worker_idx} on GPU {gpu_device} "
f"(mem_frac={mem_fraction_static}, kv_port={kv_events_port}) " f"(mem_frac={mem_fraction_static}, system_port={system_port}, kv_port={kv_events_port}) "
f"with endpoint: {self.endpoint}" f"with endpoint: {self.endpoint}"
) )
......
...@@ -121,6 +121,11 @@ class TRTLLMProcess: ...@@ -121,6 +121,11 @@ class TRTLLMProcess:
self.worker_processes = [] self.worker_processes = []
self.store_backend = store_backend self.store_backend = store_backend
# Dynamically allocate unique system ports (one per worker) to avoid
# conflicts when tests run in parallel via pytest-xdist.
self._system_ports = allocate_ports(num_workers, DefaultPort.SYSTEM1.value)
request.addfinalizer(lambda: deallocate_ports(self._system_ports))
if trtllm_args is None: if trtllm_args is None:
trtllm_args = {} trtllm_args = {}
...@@ -182,8 +187,8 @@ class TRTLLMProcess: ...@@ -182,8 +187,8 @@ class TRTLLMProcess:
command.append("--enable-attention-dp") command.append("--enable-attention-dp")
# Each TRT-LLM worker needs a unique DYN_SYSTEM_PORT to avoid conflicts. # Each TRT-LLM worker needs a unique DYN_SYSTEM_PORT to avoid conflicts.
# See examples/backends/trtllm/launch/disagg_same_gpu.sh for reference. # Ports are dynamically allocated for xdist-safe parallel execution.
system_port = 8081 + worker_idx system_port = self._system_ports[worker_idx]
env = os.environ.copy() # Copy parent environment env = os.environ.copy() # Copy parent environment
env_vars = { env_vars = {
...@@ -191,7 +196,6 @@ class TRTLLMProcess: ...@@ -191,7 +196,6 @@ class TRTLLMProcess:
"DYN_NAMESPACE": self.namespace, "DYN_NAMESPACE": self.namespace,
"DYN_REQUEST_PLANE": request_plane, "DYN_REQUEST_PLANE": request_plane,
"PYTHONHASHSEED": "0", # for deterministic event id's "PYTHONHASHSEED": "0", # for deterministic event id's
# Set unique system port for each worker to avoid port conflicts
"DYN_SYSTEM_PORT": str(system_port), "DYN_SYSTEM_PORT": str(system_port),
} }
......
...@@ -118,6 +118,17 @@ class VLLMProcess: ...@@ -118,6 +118,17 @@ class VLLMProcess:
self.worker_processes = [] self.worker_processes = []
self.store_backend = store_backend self.store_backend = store_backend
# Dynamically allocate unique system, KV event, and NIXL side-channel
# ports (one of each per worker) to avoid conflicts in parallel test runs.
self._system_ports = allocate_ports(num_workers, DefaultPort.SYSTEM1.value)
self._kv_event_ports = allocate_ports(num_workers, DefaultPort.SYSTEM1.value)
self._nixl_ports = allocate_ports(num_workers, DefaultPort.SYSTEM1.value)
request.addfinalizer(
lambda: deallocate_ports(
self._system_ports + self._kv_event_ports + self._nixl_ports
)
)
if vllm_args is None: if vllm_args is None:
vllm_args = {} vllm_args = {}
...@@ -202,13 +213,19 @@ class VLLMProcess: ...@@ -202,13 +213,19 @@ class VLLMProcess:
if durable_kv_events: if durable_kv_events:
command.append("--durable-kv-events") command.append("--durable-kv-events")
# Ports are dynamically allocated for xdist-safe parallel execution.
system_port = self._system_ports[worker_idx]
kv_event_port = self._kv_event_ports[worker_idx]
nixl_port = self._nixl_ports[worker_idx]
env = os.environ.copy() # Copy parent environment env = os.environ.copy() # Copy parent environment
env_vars = { env_vars = {
"CUDA_VISIBLE_DEVICES": gpu_device, "CUDA_VISIBLE_DEVICES": gpu_device,
"DYN_NAMESPACE": self.namespace, "DYN_NAMESPACE": self.namespace,
"DYN_REQUEST_PLANE": request_plane, "DYN_REQUEST_PLANE": request_plane,
"DYN_VLLM_KV_EVENT_PORT": str(20080 + worker_idx), "DYN_SYSTEM_PORT": str(system_port),
"VLLM_NIXL_SIDE_CHANNEL_PORT": str(20090 + worker_idx), "DYN_VLLM_KV_EVENT_PORT": str(kv_event_port),
"VLLM_NIXL_SIDE_CHANNEL_PORT": str(nixl_port),
"PYTHONHASHSEED": "0", # for deterministic event id's "PYTHONHASHSEED": "0", # for deterministic event id's
} }
...@@ -233,13 +250,13 @@ class VLLMProcess: ...@@ -233,13 +250,13 @@ class VLLMProcess:
if data_parallel_size is not None: if data_parallel_size is not None:
logger.info( logger.info(
f"Created {data_parallel_size} DP ranks per worker on GPU(s) {gpu_device} " f"Created {data_parallel_size} DP ranks per worker on GPU(s) {gpu_device} "
f"(gpu_mem={gpu_memory_utilization}) " f"(gpu_mem={gpu_memory_utilization}, system_port={system_port}) "
f"with endpoint: {self.endpoint}" f"with endpoint: {self.endpoint}"
) )
else: else:
logger.info( logger.info(
f"Created vLLM worker {worker_idx} on GPU {gpu_device} " f"Created vLLM worker {worker_idx} on GPU {gpu_device} "
f"(gpu_mem={gpu_memory_utilization}) " f"(gpu_mem={gpu_memory_utilization}, system_port={system_port}) "
f"with endpoint: {self.endpoint}" f"with endpoint: {self.endpoint}"
) )
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment