Unverified Commit b69c5079 authored by Alec's avatar Alec Committed by GitHub
Browse files

fix: add better port logic (#2175)

parent dabfea3d
...@@ -2,13 +2,9 @@ ...@@ -2,13 +2,9 @@
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
import asyncio
import json
import logging import logging
import os import os
import socket
import sys import sys
import time
from typing import Optional from typing import Optional
from vllm.config import KVTransferConfig from vllm.config import KVTransferConfig
...@@ -16,9 +12,20 @@ from vllm.distributed.kv_events import KVEventsConfig ...@@ -16,9 +12,20 @@ from vllm.distributed.kv_events import KVEventsConfig
from vllm.engine.arg_utils import AsyncEngineArgs from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.utils import FlexibleArgumentParser from vllm.utils import FlexibleArgumentParser
from .ports import (
DEFAULT_DYNAMO_PORT_MAX,
DEFAULT_DYNAMO_PORT_MIN,
DynamoPortRange,
EtcdContext,
PortAllocationRequest,
PortMetadata,
allocate_and_reserve_port,
allocate_and_reserve_port_block,
get_host_ip,
)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Only used if you run it manually from the command line
DEFAULT_ENDPOINT = "dyn://dynamo.backend.generate" DEFAULT_ENDPOINT = "dyn://dynamo.backend.generate"
DEFAULT_MODEL = "Qwen/Qwen3-0.6B" DEFAULT_MODEL = "Qwen/Qwen3-0.6B"
...@@ -34,6 +41,7 @@ class Config: ...@@ -34,6 +41,7 @@ class Config:
migration_limit: int = 0 migration_limit: int = 0
kv_port: Optional[int] = None kv_port: Optional[int] = None
side_channel_port: Optional[int] = None side_channel_port: Optional[int] = None
port_range: DynamoPortRange
# mirror vLLM # mirror vLLM
model: str model: str
...@@ -64,6 +72,18 @@ def parse_args() -> Config: ...@@ -64,6 +72,18 @@ def parse_args() -> Config:
default=0, default=0,
help="Maximum number of times a request may be migrated to a different engine worker. The number may be overridden by the engine.", help="Maximum number of times a request may be migrated to a different engine worker. The number may be overridden by the engine.",
) )
parser.add_argument(
"--dynamo-port-min",
type=int,
default=DEFAULT_DYNAMO_PORT_MIN,
help=f"Minimum port number for Dynamo services (default: {DEFAULT_DYNAMO_PORT_MIN}). Must be in registered ports range (1024-49151).",
)
parser.add_argument(
"--dynamo-port-max",
type=int,
default=DEFAULT_DYNAMO_PORT_MAX,
help=f"Maximum port number for Dynamo services (default: {DEFAULT_DYNAMO_PORT_MAX}). Must be in registered ports range (1024-49151).",
)
parser = AsyncEngineArgs.add_cli_args(parser) parser = AsyncEngineArgs.add_cli_args(parser)
args = parser.parse_args() args = parser.parse_args()
...@@ -110,6 +130,9 @@ def parse_args() -> Config: ...@@ -110,6 +130,9 @@ def parse_args() -> Config:
config.engine_args = engine_args config.engine_args = engine_args
config.is_prefill_worker = args.is_prefill_worker config.is_prefill_worker = args.is_prefill_worker
config.migration_limit = args.migration_limit config.migration_limit = args.migration_limit
config.port_range = DynamoPortRange(
min=args.dynamo_port_min, max=args.dynamo_port_max
)
if config.engine_args.block_size is None: if config.engine_args.block_size is None:
config.engine_args.block_size = 16 config.engine_args.block_size = 16
...@@ -120,106 +143,66 @@ def parse_args() -> Config: ...@@ -120,106 +143,66 @@ def parse_args() -> Config:
return config return config
async def allocate_and_reserve_port(
namespace,
etcd_client,
worker_id: str,
reason: str,
max_attempts: int = 100,
) -> int:
"""
Get an OS-assigned port and atomically reserve it in ETCD.
Retries until successful or max_attempts reached.
Args:
max_attempts: Maximum number of ports to try (default: 100)
Raises:
RuntimeError: If unable to reserve a port within max_attempts
OSError: If unable to create sockets (system resource issues)
"""
node_name = socket.gethostname()
try:
node_ip = socket.gethostbyname(node_name)
except socket.gaierror:
# If hostname cannot be resolved, fall back to localhost
logger.warning(
f"Hostname '{node_name}' cannot be resolved, falling back to '127.0.0.1'"
)
node_ip = "127.0.0.1"
for attempt in range(1, max_attempts + 1):
# Hold socket open just long enough to reserve in ETCD
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(("", 0))
port = sock.getsockname()[1]
# Reserve in ETCD while holding the socket
key = f"dyn://{namespace}/ports/{node_ip}/{port}"
value = {
"worker_id": worker_id,
"reason": reason,
"reserved_at": time.time(),
"pid": os.getpid(),
}
try:
await etcd_client.kv_create(
key=key,
value=json.dumps(value).encode(),
lease_id=etcd_client.primary_lease_id(),
)
logger.debug(f"Reserved OS-assigned port {port} for {worker_id}")
return port
except Exception as e:
logger.debug(
f"Port {port} on {node_name} was already reserved (attempt {attempt}): {e}"
)
if attempt < max_attempts:
await asyncio.sleep(0.01)
raise RuntimeError(
f"Failed to allocate and reserve a port after {max_attempts} attempts"
)
async def configure_ports_with_etcd(config: Config, etcd_client): async def configure_ports_with_etcd(config: Config, etcd_client):
"""Configure all settings that require ETCD, including port allocation and vLLM overrides.""" """Configure all settings that require ETCD, including port allocation and vLLM overrides."""
# First, allocate ports etcd_context = EtcdContext(client=etcd_client, namespace=config.namespace)
dp_rank = config.engine_args.data_parallel_rank or 0 dp_rank = config.engine_args.data_parallel_rank or 0
worker_id = f"vllm-{config.component}-dp{dp_rank}" worker_id = f"vllm-{config.component}-dp{dp_rank}"
# Allocate KV events port # Allocate KV events port
kv_port = await allocate_and_reserve_port( if config.engine_args.enable_prefix_caching:
namespace=config.namespace, kv_metadata = PortMetadata(worker_id=worker_id, reason="zmq_kv_event_port")
etcd_client=etcd_client, kv_port = await allocate_and_reserve_port(
worker_id=f"{worker_id}", etcd_context=etcd_context,
reason="zmq_kv_event_port", metadata=kv_metadata,
port_range=config.port_range,
)
config.kv_port = kv_port
logger.info(f"Allocated ZMQ KV events port: {kv_port} (worker_id={worker_id})")
# Allocate side channel ports
# https://github.com/vllm-project/vllm/blob/releases/v0.10.0/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py#L372
# NIXL calculates ports as: base_port + (dp_rank * tp_size) + tp_rank
# For dp_rank, we need to reserve tp_size consecutive ports
tp_size = config.engine_args.tensor_parallel_size or 1
# The first port for this dp_rank will be at: base_port + (dp_rank * tp_size)
# We need to allocate tp_size consecutive ports starting from there
nixl_metadata = PortMetadata(worker_id=worker_id, reason="nixl_side_channel_port")
nixl_request = PortAllocationRequest(
etcd_context=etcd_context,
metadata=nixl_metadata,
port_range=config.port_range,
block_size=tp_size,
) )
allocated_ports = await allocate_and_reserve_port_block(nixl_request)
first_port_for_dp_rank = allocated_ports[0]
# Calculate the base port that NIXL expects
# base_port = first_port_for_dp_rank - (dp_rank * tp_size)
nixl_offset = dp_rank * tp_size
base_side_channel_port = first_port_for_dp_rank - nixl_offset
if base_side_channel_port < 0:
raise ValueError(
f"NIXL base port calculation resulted in negative port: "
f"first_allocated_port={first_port_for_dp_rank}, offset={nixl_offset}, "
f"base_port={base_side_channel_port}. Current range: {config.port_range.min}-{config.port_range.max}. "
f"Consider using a higher port range."
)
# Allocate side channel port config.side_channel_port = base_side_channel_port
side_channel_port = await allocate_and_reserve_port(
namespace=config.namespace,
etcd_client=etcd_client,
worker_id=f"{worker_id}",
reason="nixl_side_channel_port",
)
# Update config with allocated ports logger.info(
config.kv_port = kv_port f"Allocated NIXL side channel ports: base={base_side_channel_port}, "
config.side_channel_port = side_channel_port f"allocated_ports={allocated_ports} (worker_id={worker_id}, dp_rank={dp_rank}, tp_size={tp_size})"
)
def overwrite_args(config): def overwrite_args(config):
"""Set vLLM defaults for Dynamo.""" """Set vLLM defaults for Dynamo."""
assert (
config.kv_port is not None
), "Must set the kv_port, use configure_ports_with_etcd"
assert ( assert (
config.side_channel_port is not None config.side_channel_port is not None
), "Must set the kv_port, use configure_ports_with_etcd" ), "Must set the kv_port, use configure_ports_with_etcd"
...@@ -263,36 +246,6 @@ def overwrite_args(config): ...@@ -263,36 +246,6 @@ def overwrite_args(config):
raise ValueError(f"{key} not found in AsyncEngineArgs from vLLM.") raise ValueError(f"{key} not found in AsyncEngineArgs from vLLM.")
def get_host_ip() -> str:
"""Get the IP address of the host.
This is needed for the side channel to work in multi-node deployments.
"""
try:
host_name = socket.gethostname()
except socket.error as e:
logger.warning(f"Failed to get hostname: {e}, falling back to '127.0.0.1'")
return "127.0.0.1"
else:
try:
# Get the IP address of the hostname - this is needed for the side channel to work in multi-node deployments
host_ip = socket.gethostbyname(host_name)
# Test if the IP is actually usable by binding to it
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as test_socket:
test_socket.bind((host_ip, 0))
return host_ip
except socket.gaierror as e:
logger.warning(
f"Hostname '{host_name}' cannot be resolved: {e}, falling back to '127.0.0.1'"
)
return "127.0.0.1"
except socket.error as e:
# If hostname is not usable for binding, fall back to localhost
logger.warning(
f"Hostname '{host_name}' is not usable for binding: {e}, falling back to '127.0.0.1'"
)
return "127.0.0.1"
def set_side_channel_host_and_port(config: Config): def set_side_channel_host_and_port(config: Config):
"""vLLM V1 NixlConnector creates a side channel to exchange metadata with other NIXL connectors. """vLLM V1 NixlConnector creates a side channel to exchange metadata with other NIXL connectors.
This sets the port number for the side channel. This sets the port number for the side channel.
......
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""Port allocation and management utilities for Dynamo services."""
import asyncio
import json
import logging
import os
import random
import socket
import time
from contextlib import contextmanager
from dataclasses import dataclass, field
from dynamo.runtime import EtcdKvCache
logger = logging.getLogger(__name__)
# Default port range in the registered ports section
DEFAULT_DYNAMO_PORT_MIN = 20000
DEFAULT_DYNAMO_PORT_MAX = 30000
@dataclass
class DynamoPortRange:
"""Port range configuration for Dynamo services"""
min: int
max: int
def __post_init__(self):
if self.min < 1024 or self.max > 49151:
raise ValueError(
f"Port range {self.min}-{self.max} is outside registered ports range (1024-49151)"
)
if self.min >= self.max:
raise ValueError(
f"Invalid port range: min ({self.min}) must be less than max ({self.max})"
)
@dataclass
class EtcdContext:
"""Context for ETCD operations"""
client: EtcdKvCache # etcd client instance
namespace: str # Namespace for keys (used in key prefix)
def make_port_key(self, port: int) -> str:
"""Generate ETCD key for a port reservation"""
node_ip = get_host_ip()
return f"dyn://{self.namespace}/ports/{node_ip}/{port}"
@dataclass
class PortMetadata:
"""Metadata to store with port reservations in ETCD"""
worker_id: str # Worker identifier (e.g., "vllm-backend-dp0")
reason: str # Purpose of the port (e.g., "nixl_side_channel_port")
block_info: dict = field(default_factory=dict) # Optional block allocation info
def to_etcd_value(self) -> dict:
"""Convert to dictionary for ETCD storage"""
value = {
"worker_id": self.worker_id,
"reason": self.reason,
"reserved_at": time.time(),
"pid": os.getpid(),
}
if self.block_info:
value.update(self.block_info)
return value
@dataclass
class PortAllocationRequest:
"""Parameters for port allocation"""
etcd_context: EtcdContext
metadata: PortMetadata
port_range: DynamoPortRange
block_size: int = 1
max_attempts: int = 100
@contextmanager
def hold_ports(ports: int | list[int]):
"""Context manager to hold port binding(s).
Holds socket bindings to ensure exclusive access to ports during reservation.
Can handle a single port or multiple ports.
Args:
ports: Single port number or list of port numbers to hold
"""
if isinstance(ports, int):
ports = [ports]
sockets = []
try:
for port in ports:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(("", port))
sockets.append(sock)
yield
finally:
for sock in sockets:
sock.close()
def check_port_available(port: int) -> bool:
"""Check if a specific port is available for binding."""
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.bind(("", port))
return True
except OSError:
return False
async def reserve_port_in_etcd(
etcd_context: EtcdContext,
port: int,
metadata: PortMetadata,
) -> None:
"""Reserve a single port in ETCD."""
key = etcd_context.make_port_key(port)
value = metadata.to_etcd_value()
await etcd_context.client.kv_create(
key=key,
value=json.dumps(value).encode(),
lease_id=etcd_context.client.primary_lease_id(),
)
async def allocate_and_reserve_port_block(request: PortAllocationRequest) -> list[int]:
"""
Allocate a contiguous block of ports from the specified range and atomically reserve them in ETCD.
Returns a list of all allocated ports in order.
This function uses a context manager to hold port bindings while reserving in ETCD,
preventing race conditions between multiple processes.
Args:
request: PortAllocationRequest containing all allocation parameters
Returns:
list[int]: List of all allocated ports in ascending order
Raises:
RuntimeError: If unable to reserve a port block within max_attempts
OSError: If unable to create sockets (system resource issues)
"""
# Create a list of valid starting ports (must have room for the entire block)
max_start_port = request.port_range.max - request.block_size + 1
if max_start_port < request.port_range.min:
raise ValueError(
f"Port range {request.port_range.min}-{request.port_range.max} is too small for block size {request.block_size}"
)
available_start_ports = list(range(request.port_range.min, max_start_port + 1))
random.shuffle(available_start_ports)
actual_max_attempts = min(len(available_start_ports), request.max_attempts)
for attempt in range(1, actual_max_attempts + 1):
start_port = available_start_ports[attempt - 1]
ports_to_reserve = list(range(start_port, start_port + request.block_size))
try:
# Try to bind to all ports in the block atomically
with hold_ports(ports_to_reserve):
logger.debug(
f"Successfully bound to ports {ports_to_reserve}, now reserving in ETCD"
)
# We have exclusive access to these ports, now reserve them in ETCD
for i, port in enumerate(ports_to_reserve):
port_metadata = PortMetadata(
worker_id=f"{request.metadata.worker_id}-{i}"
if request.block_size > 1
else request.metadata.worker_id,
reason=request.metadata.reason,
block_info={
"block_index": i,
"block_size": request.block_size,
"block_start": start_port,
}
if request.block_size > 1
else {},
)
await reserve_port_in_etcd(
etcd_context=request.etcd_context,
port=port,
metadata=port_metadata,
)
logger.debug(
f"Reserved port block {ports_to_reserve} from range {request.port_range.min}-{request.port_range.max} "
f"for {request.metadata.worker_id} (block_size={request.block_size})"
)
return ports_to_reserve
except OSError as e:
logger.debug(
f"Failed to bind to port block starting at {start_port} (attempt {attempt}): {e}"
)
except Exception as e:
logger.debug(
f"Failed to reserve port block starting at {start_port} in ETCD (attempt {attempt}): {e}"
)
if attempt < actual_max_attempts:
await asyncio.sleep(0.01)
raise RuntimeError(
f"Failed to allocate and reserve a port block of size {request.block_size} from range "
f"{request.port_range.min}-{request.port_range.max} after {actual_max_attempts} attempts"
)
async def allocate_and_reserve_port(
etcd_context: EtcdContext,
metadata: PortMetadata,
port_range: DynamoPortRange,
max_attempts: int = 100,
) -> int:
"""
Allocate a port from the specified range and atomically reserve it in ETCD.
This is a convenience wrapper around allocate_and_reserve_port_block with block_size=1.
Args:
etcd_context: ETCD context for operations
metadata: Port metadata for ETCD storage
port_range: DynamoPortRange object specifying min and max ports to try
max_attempts: Maximum number of ports to try (default: 100)
Returns:
int: The allocated port number
Raises:
RuntimeError: If unable to reserve a port within max_attempts
OSError: If unable to create sockets (system resource issues)
"""
request = PortAllocationRequest(
etcd_context=etcd_context,
metadata=metadata,
port_range=port_range,
block_size=1,
max_attempts=max_attempts,
)
allocated_ports = await allocate_and_reserve_port_block(request)
return allocated_ports[0] # Return the single allocated port
def get_host_ip() -> str:
"""Get the IP address of the host.
This is needed for the side channel to work in multi-node deployments.
"""
try:
host_name = socket.gethostname()
except socket.error as e:
logger.warning(f"Failed to get hostname: {e}, falling back to '127.0.0.1'")
return "127.0.0.1"
else:
try:
# Get the IP address of the hostname - this is needed for the side channel to work in multi-node deployments
host_ip = socket.gethostbyname(host_name)
# Test if the IP is actually usable by binding to it
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as test_socket:
test_socket.bind((host_ip, 0))
return host_ip
except socket.gaierror as e:
logger.warning(
f"Hostname '{host_name}' cannot be resolved: {e}, falling back to '127.0.0.1'"
)
return "127.0.0.1"
except socket.error as e:
# If hostname is not usable for binding, fall back to localhost
logger.warning(
f"Hostname '{host_name}' is not usable for binding: {e}, falling back to '127.0.0.1'"
)
return "127.0.0.1"
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment