"lib/vscode:/vscode.git/clone" did not exist on "2a95ef631ac1604fa797fc676deb615253f4d9e0"
Unverified Commit c5ad4a87 authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

test(router): reserve contiguous zmq ports (#7448)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent e5aebdfd
...@@ -40,7 +40,11 @@ from tests.router.helper import ( ...@@ -40,7 +40,11 @@ from tests.router.helper import (
) )
from tests.utils.constants import ROUTER_MODEL_NAME from tests.utils.constants import ROUTER_MODEL_NAME
from tests.utils.managed_process import ManagedProcess from tests.utils.managed_process import ManagedProcess
from tests.utils.port_utils import allocate_ports, deallocate_ports from tests.utils.port_utils import (
allocate_contiguous_ports,
allocate_ports,
deallocate_ports,
)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -226,14 +230,12 @@ class MockerProcess: ...@@ -226,14 +230,12 @@ class MockerProcess:
# Alias for consistency with vLLM/SGLang workers # Alias for consistency with vLLM/SGLang workers
self.data_parallel_size = self.dp_size self.data_parallel_size = self.dp_size
# Allocate ZMQ base ports for KV event publishing. # Allocate contiguous ZMQ port blocks for KV event publishing because
# Each worker's DP ranks bind on base_port + dp_rank, so we need bases # the mocker binds base_port + dp_rank for each DP rank.
# spaced dp_size apart. Allocate num_mockers * dp_size ports total,
# then pick every dp_size'th port as a base.
if zmq_kv_events: if zmq_kv_events:
dp_size = mocker_args.get("dp_size", 1) dp_size = mocker_args.get("dp_size", 1)
self._zmq_kv_events_ports = allocate_ports( self._zmq_kv_events_ports = allocate_contiguous_ports(
num_mockers * dp_size, BASE_PORT_ZMQ num_mockers, dp_size, BASE_PORT_ZMQ
) )
bases = [self._zmq_kv_events_ports[i * dp_size] for i in range(num_mockers)] bases = [self._zmq_kv_events_ports[i * dp_size] for i in range(num_mockers)]
if not standalone_indexer: if not standalone_indexer:
...@@ -243,11 +245,11 @@ class MockerProcess: ...@@ -243,11 +245,11 @@ class MockerProcess:
f"(bases: {bases}) for {num_mockers} workers" f"(bases: {bases}) for {num_mockers} workers"
) )
# Allocate ZMQ replay ports (same layout as event ports) # Allocate contiguous ZMQ replay port blocks with the same layout.
if zmq_replay and zmq_kv_events: if zmq_replay and zmq_kv_events:
dp_size = mocker_args.get("dp_size", 1) dp_size = mocker_args.get("dp_size", 1)
self._zmq_replay_ports = allocate_ports( self._zmq_replay_ports = allocate_contiguous_ports(
num_mockers * dp_size, BASE_PORT_ZMQ + 1000 num_mockers, dp_size, BASE_PORT_ZMQ + 1000
) )
replay_bases = [ replay_bases = [
self._zmq_replay_ports[i * dp_size] for i in range(num_mockers) self._zmq_replay_ports[i * dp_size] for i in range(num_mockers)
......
...@@ -198,6 +198,126 @@ def allocate_ports(count: int, start_port: int) -> list[int]: ...@@ -198,6 +198,126 @@ def allocate_ports(count: int, start_port: int) -> list[int]:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN) fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)
def allocate_contiguous_ports(
count: int, block_size: int, start_port: int
) -> list[int]:
"""Find and return contiguous port blocks in i16 range with flock-based locking.
Args:
count: Number of contiguous blocks to allocate
block_size: Size of each contiguous block
start_port: Starting port number for allocation (required)
Returns:
list[int]: Flattened list of allocated ports grouped into contiguous blocks
"""
if count <= 0:
return []
if block_size <= 0:
raise ValueError(f"block_size must be positive, got {block_size}")
caller_file = "unknown"
caller_function = "unknown"
caller_line = 0
frame = inspect.currentframe()
if frame and frame.f_back:
caller_frame = frame.f_back
caller_info = inspect.getframeinfo(caller_frame)
caller_function = caller_frame.f_code.co_name
caller_file = caller_info.filename
caller_line = caller_info.lineno
if start_port < _PORT_MIN or start_port > _PORT_MAX:
raise ValueError(
f"start_port must be between {_PORT_MIN} and {_PORT_MAX}, got {start_port}"
)
if start_port + block_size - 1 > _PORT_MAX:
raise ValueError(
f"start_port {start_port} with block_size {block_size} exceeds {_PORT_MAX}"
)
_PORT_LOCK_FILE.parent.mkdir(parents=True, exist_ok=True)
_PORT_LOCK_FILE.touch(exist_ok=True)
if not os.access(_PORT_LOCK_FILE, os.W_OK):
raise PermissionError(
f"Port allocation lock file is not writable: {_PORT_LOCK_FILE}"
)
with open(_PORT_LOCK_FILE, "r+") as lock_file:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX)
try:
registry = _load_port_registry()
registry = _cleanup_stale_allocations(registry)
allocated_ports = set(int(p) for p in registry.keys())
ports: list[int] = []
current_port = start_port + random.randint(0, 100)
if current_port + block_size - 1 > _PORT_MAX:
current_port = _PORT_MIN
max_retries = 500
attempts = 0
while len(ports) < count * block_size and attempts < max_retries:
attempts += 1
base_port = current_port
current_port += 1
if current_port + block_size - 1 > _PORT_MAX:
current_port = _PORT_MIN
candidate_ports = list(range(base_port, base_port + block_size))
if candidate_ports[-1] > _PORT_MAX:
continue
if any(
port in allocated_ports or port in ports for port in candidate_ports
):
continue
sockets: list[socket.socket] = []
try:
for port in candidate_ports:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(("", port))
sockets.append(sock)
except OSError:
for sock in sockets:
sock.close()
continue
for sock in sockets:
sock.close()
ports.extend(candidate_ports)
timestamp = time.time()
for port in candidate_ports:
registry[str(port)] = {
"timestamp": timestamp,
"caller_file": caller_file,
"caller_function": caller_function,
"caller_line": caller_line,
}
if len(ports) < count * block_size:
raise RuntimeError(
f"Could not find {count} contiguous port blocks of size {block_size} "
f"after {max_retries} retries"
)
_save_port_registry(registry)
return ports
finally:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)
def allocate_port(start_port: int) -> int: def allocate_port(start_port: int) -> int:
"""Find and return a single available port in i16 range. """Find and return a single available port in i16 range.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment