Unverified Commit a629b86a authored by Alec's avatar Alec Committed by GitHub
Browse files

refactor: remove port reservation system in favor of manual specification (#4142)


Signed-off-by: default avataralec-flowers <aflowers@nvidia.com>
parent 6336eea4
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
import logging import logging
import os import os
import socket
from typing import Any, Dict, Optional from typing import Any, Dict, Optional
from vllm.config import KVTransferConfig from vllm.config import KVTransferConfig
...@@ -13,24 +14,12 @@ from vllm.utils import FlexibleArgumentParser ...@@ -13,24 +14,12 @@ from vllm.utils import FlexibleArgumentParser
from dynamo._core import get_reasoning_parser_names, get_tool_parser_names from dynamo._core import get_reasoning_parser_names, get_tool_parser_names
from dynamo.common.config_dump import add_config_dump_args, register_encoder from dynamo.common.config_dump import add_config_dump_args, register_encoder
from dynamo.runtime import DistributedRuntime
from . import __version__, envs
from . import __version__
from .ports import (
DEFAULT_DYNAMO_PORT_MAX,
DEFAULT_DYNAMO_PORT_MIN,
DynamoPortRange,
PortAllocationRequest,
PortMetadata,
allocate_and_reserve_port,
allocate_and_reserve_port_block,
get_host_ip,
)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
DEFAULT_MODEL = "Qwen/Qwen3-0.6B" DEFAULT_MODEL = "Qwen/Qwen3-0.6B"
VALID_CONNECTORS = {"nixl", "lmcache", "kvbm", "null", "none"} VALID_CONNECTORS = {"nixl", "lmcache", "kvbm", "null", "none"}
# Global LMCache configuration - initialize once on module import # Global LMCache configuration - initialize once on module import
...@@ -48,7 +37,6 @@ class Config: ...@@ -48,7 +37,6 @@ class Config:
is_decode_worker: bool is_decode_worker: bool
migration_limit: int = 0 migration_limit: int = 0
kv_port: Optional[int] = None kv_port: Optional[int] = None
port_range: DynamoPortRange
custom_jinja_template: Optional[str] = None custom_jinja_template: Optional[str] = None
# mirror vLLM # mirror vLLM
...@@ -115,18 +103,6 @@ def parse_args() -> Config: ...@@ -115,18 +103,6 @@ def parse_args() -> Config:
default=0, default=0,
help="Maximum number of times a request may be migrated to a different engine worker. The number may be overridden by the engine.", help="Maximum number of times a request may be migrated to a different engine worker. The number may be overridden by the engine.",
) )
parser.add_argument(
"--dynamo-port-min",
type=int,
default=DEFAULT_DYNAMO_PORT_MIN,
help=f"Minimum port number for Dynamo services (default: {DEFAULT_DYNAMO_PORT_MIN}). Must be in registered ports range (1024-49151).",
)
parser.add_argument(
"--dynamo-port-max",
type=int,
default=DEFAULT_DYNAMO_PORT_MAX,
help=f"Maximum port number for Dynamo services (default: {DEFAULT_DYNAMO_PORT_MAX}). Must be in registered ports range (1024-49151).",
)
parser.add_argument( parser.add_argument(
"--connector", "--connector",
nargs="*", nargs="*",
...@@ -249,9 +225,6 @@ def parse_args() -> Config: ...@@ -249,9 +225,6 @@ def parse_args() -> Config:
config.is_prefill_worker = args.is_prefill_worker config.is_prefill_worker = args.is_prefill_worker
config.is_decode_worker = args.is_decode_worker config.is_decode_worker = args.is_decode_worker
config.migration_limit = args.migration_limit config.migration_limit = args.migration_limit
config.port_range = DynamoPortRange(
min=args.dynamo_port_min, max=args.dynamo_port_max
)
config.tool_call_parser = args.dyn_tool_call_parser config.tool_call_parser = args.dyn_tool_call_parser
config.reasoning_parser = args.dyn_reasoning_parser config.reasoning_parser = args.dyn_reasoning_parser
config.custom_jinja_template = args.custom_jinja_template config.custom_jinja_template = args.custom_jinja_template
...@@ -315,67 +288,14 @@ def parse_args() -> Config: ...@@ -315,67 +288,14 @@ def parse_args() -> Config:
return config return config
async def configure_ports(runtime: DistributedRuntime, config: Config): async def configure_ports(config: Config):
"""Configure including port allocation and vLLM overrides.""" """Configure port settings from dedicated environment overrides."""
dp_rank = config.engine_args.data_parallel_rank or 0
worker_id = f"vllm-{config.component}-dp{dp_rank}"
# Allocate KV events port
if config.engine_args.enable_prefix_caching: if config.engine_args.enable_prefix_caching:
kv_metadata = PortMetadata(worker_id=worker_id, reason="zmq_kv_event_port") config.kv_port = envs.DYN_VLLM_KV_EVENT_PORT
kv_port = await allocate_and_reserve_port(
runtime=runtime,
namespace=config.namespace,
metadata=kv_metadata,
port_range=config.port_range,
)
config.kv_port = kv_port
logger.info(f"Allocated ZMQ KV events port: {kv_port} (worker_id={worker_id})")
# Check if NIXL is needed based on connector list
needs_nixl = config.has_connector("nixl")
if needs_nixl:
# Allocate side channel ports
# https://github.com/vllm-project/vllm/blob/releases/v0.10.0/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py#L372
# NIXL calculates ports as: base_port + (dp_rank * tp_size) + tp_rank
# For dp_rank, we need to reserve tp_size consecutive ports
tp_size = config.engine_args.tensor_parallel_size or 1
# The first port for this dp_rank will be at: base_port + (dp_rank * tp_size)
# We need to allocate tp_size consecutive ports starting from there
nixl_metadata = PortMetadata(
worker_id=worker_id, reason="nixl_side_channel_port"
)
nixl_request = PortAllocationRequest(
metadata=nixl_metadata,
port_range=config.port_range,
block_size=tp_size,
)
allocated_ports = await allocate_and_reserve_port_block(
runtime, config.namespace, nixl_request
)
first_port_for_dp_rank = allocated_ports[0]
# Calculate the base port that NIXL expects
# base_port = first_port_for_dp_rank - (dp_rank * tp_size)
nixl_offset = dp_rank * tp_size
base_side_channel_port = first_port_for_dp_rank - nixl_offset
if base_side_channel_port < 0: if config.has_connector("nixl"):
raise ValueError( ensure_side_channel_host()
f"NIXL base port calculation resulted in negative port: "
f"first_allocated_port={first_port_for_dp_rank}, offset={nixl_offset}, "
f"base_port={base_side_channel_port}. Current range: {config.port_range.min}-{config.port_range.max}. "
f"Consider using a higher port range."
)
logger.info(
f"Allocated NIXL side channel ports: base={base_side_channel_port}, "
f"allocated_ports={allocated_ports} (worker_id={worker_id}, dp_rank={dp_rank}, tp_size={tp_size})"
)
set_side_channel_host_and_port(base_side_channel_port)
def create_kv_events_config(config: Config) -> Optional[KVEventsConfig]: def create_kv_events_config(config: Config) -> Optional[KVEventsConfig]:
...@@ -385,18 +305,18 @@ def create_kv_events_config(config: Config) -> Optional[KVEventsConfig]: ...@@ -385,18 +305,18 @@ def create_kv_events_config(config: Config) -> Optional[KVEventsConfig]:
return None return None
# If user provided their own config, use that # If user provided their own config, use that
if getattr(config.engine_args, "kv_events_config"): if c := getattr(config.engine_args, "kv_events_config"):
logger.info("Using user-provided kv_events_config") logger.info(f"Using user-provided kv_events_config {c}")
return None return None
# Create default events config for prefix caching # Create default events config for prefix caching
logger.info("Creating Dynamo default kv_events_config for prefix caching")
if config.kv_port is None: if config.kv_port is None:
raise ValueError( raise ValueError(
"config.kv_port is not set; call configure_ports(...) before overwrite_args " "config.kv_port is not set; call configure_ports(...) before overwrite_args "
"or provide --kv-event-config to supply an explicit endpoint." "or provide --kv-event-config to supply an explicit endpoint."
) )
dp_rank = config.engine_args.data_parallel_rank or 0 dp_rank = config.engine_args.data_parallel_rank or 0
return KVEventsConfig( return KVEventsConfig(
enable_kv_cache_events=True, enable_kv_cache_events=True,
publisher="zmq", publisher="zmq",
...@@ -472,6 +392,10 @@ def overwrite_args(config): ...@@ -472,6 +392,10 @@ def overwrite_args(config):
defaults["kv_transfer_config"] = kv_transfer_config defaults["kv_transfer_config"] = kv_transfer_config
kv_events_config = create_kv_events_config(config) kv_events_config = create_kv_events_config(config)
logger.info(
f"Using Dynamo default kv_events_config for publishing kv events over zmq: {kv_events_config}"
)
if kv_events_config: if kv_events_config:
defaults["kv_events_config"] = kv_events_config defaults["kv_events_config"] = kv_events_config
...@@ -484,11 +408,45 @@ def overwrite_args(config): ...@@ -484,11 +408,45 @@ def overwrite_args(config):
raise ValueError(f"{key} not found in AsyncEngineArgs from vLLM.") raise ValueError(f"{key} not found in AsyncEngineArgs from vLLM.")
def set_side_channel_host_and_port(side_channel_port: int): def get_host_ip() -> str:
"""vLLM V1 NixlConnector creates a side channel to exchange metadata with other NIXL connectors. """Get the IP address of the host for side-channel coordination."""
This sets the port number for the side channel. try:
""" host_name = socket.gethostname()
except socket.error as exc:
logger.warning("Failed to get hostname: %s, falling back to 127.0.0.1", exc)
return "127.0.0.1"
try:
host_ip = socket.gethostbyname(host_name)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as test_socket:
test_socket.bind((host_ip, 0))
return host_ip
except socket.gaierror as exc:
logger.warning(
"Hostname %s cannot be resolved: %s, falling back to 127.0.0.1",
host_name,
exc,
)
return "127.0.0.1"
except socket.error as exc:
logger.warning(
"Hostname %s is not usable for binding: %s, falling back to 127.0.0.1",
host_name,
exc,
)
return "127.0.0.1"
def ensure_side_channel_host():
"""Ensure the NIXL side-channel host is available without overriding user settings."""
existing_host = os.getenv("VLLM_NIXL_SIDE_CHANNEL_HOST")
if existing_host:
logger.debug(
"Preserving existing VLLM_NIXL_SIDE_CHANNEL_HOST=%s", existing_host
)
return
host_ip = get_host_ip() host_ip = get_host_ip()
os.environ["VLLM_NIXL_SIDE_CHANNEL_HOST"] = host_ip os.environ["VLLM_NIXL_SIDE_CHANNEL_HOST"] = host_ip
os.environ["VLLM_NIXL_SIDE_CHANNEL_PORT"] = str(side_channel_port) logger.debug("Set VLLM_NIXL_SIDE_CHANNEL_HOST to %s", host_ip)
logger.debug(f"Set NIXL side channel to {host_ip}:{side_channel_port}")
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Environment variable configuration for Dynamo vLLM integration.
This module provides a centralized location for managing environment variables
used by Dynamo's vLLM backend, following vLLM's pattern.
"""
import os
from collections.abc import Callable
from typing import TYPE_CHECKING, Any
# Port range constants
REGISTERED_PORT_MIN = 1024
REGISTERED_PORT_MAX = 49151
if TYPE_CHECKING:
DYN_VLLM_KV_EVENT_PORT: int = 20080
def _resolve_port(env_var: str, default_port: int) -> int:
"""
Resolve port from environment variable with validation.
Args:
env_var: Environment variable name
default_port: Default port if env var not set
Returns:
Validated port number
Raises:
ValueError: If port is invalid or out of range
"""
env_value = os.getenv(env_var)
if env_value is None:
port = default_port
else:
try:
port = int(env_value)
except ValueError as exc:
raise ValueError(
f"{env_var} must be an integer port number, got {env_value!r}."
) from exc
if not (REGISTERED_PORT_MIN <= port <= REGISTERED_PORT_MAX):
raise ValueError(
f"{env_var} port {port} is outside of the registered port range "
f"({REGISTERED_PORT_MIN}-{REGISTERED_PORT_MAX})."
)
return port
# Environment variables configuration
environment_variables: dict[str, Callable[[], Any]] = {
# Port used for KV events publishing to the frontend
# Note: This env variable is ignored if explicitly using --kv-events-config ''
"DYN_VLLM_KV_EVENT_PORT": lambda: _resolve_port("DYN_VLLM_KV_EVENT_PORT", 20080),
}
def __getattr__(name: str):
"""
Gets environment variables lazily.
"""
if name in environment_variables:
return environment_variables[name]()
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
def __dir__():
return list(environment_variables.keys())
def is_set(name: str) -> bool:
"""Check if an environment variable is explicitly set."""
if name in environment_variables:
return name in os.environ
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
...@@ -74,7 +74,7 @@ async def graceful_shutdown(runtime): ...@@ -74,7 +74,7 @@ async def graceful_shutdown(runtime):
async def worker(runtime: DistributedRuntime): async def worker(runtime: DistributedRuntime):
config = parse_args() config = parse_args()
await configure_ports(runtime, config) await configure_ports(config)
overwrite_args(config) overwrite_args(config)
# Set up signal handler for graceful shutdown # Set up signal handler for graceful shutdown
......
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""Port allocation and management utilities for Dynamo services."""
import json
import logging
import os
import socket
import time
from dataclasses import dataclass
from dynamo.runtime import DistributedRuntime
logger = logging.getLogger(__name__)
# Default port range in the registered ports section
DEFAULT_DYNAMO_PORT_MIN = 20000
DEFAULT_DYNAMO_PORT_MAX = 30000
@dataclass
class DynamoPortRange:
"""Port range configuration for Dynamo services"""
min: int
max: int
def __post_init__(self):
if self.min < 1024 or self.max > 49151:
raise ValueError(
f"Port range {self.min}-{self.max} is outside registered ports range (1024-49151)"
)
if self.min >= self.max:
raise ValueError(
f"Invalid port range: min ({self.min}) must be less than max ({self.max})"
)
@dataclass
class PortMetadata:
"""Metadata to store with port reservations"""
worker_id: str # Worker identifier (e.g., "vllm-backend-dp0")
reason: str # Purpose of the port (e.g., "nixl_side_channel_port")
@dataclass
class PortAllocationRequest:
"""Parameters for port allocation"""
metadata: PortMetadata
port_range: DynamoPortRange
block_size: int = 1
def __post_init__(self):
if self.block_size < 1:
raise ValueError("block_size must be >= 1")
range_len = self.port_range.max - self.port_range.min + 1
if self.block_size > range_len:
raise ValueError(
f"block_size {self.block_size} exceeds range length {range_len} "
f"({self.port_range.min}-{self.port_range.max})"
)
def check_port_available(port: int) -> bool:
"""Check if a specific port is available for binding."""
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.bind(("", port))
return True
except OSError:
return False
async def allocate_and_reserve_port_block(
runtime: DistributedRuntime, namespace: str, request: PortAllocationRequest
) -> list[int]:
"""
Allocate a contiguous block of ports from the specified range and atomically reserve them.
Returns a list of all allocated ports in order.
Args:
request: PortAllocationRequest containing all allocation parameters
Returns:
list[int]: List of all allocated ports in ascending order
"""
# Create a list of valid starting ports (must have room for the entire block)
context_json = {
"worker_id": str(request.metadata.worker_id),
"reason": request.metadata.reason,
"reserved_at": time.time(),
"pid": os.getpid(),
"block_size": request.block_size,
}
return await runtime.allocate_port_block(
namespace,
request.port_range.min,
request.port_range.max,
request.block_size,
json.dumps(context_json),
)
async def allocate_and_reserve_port(
runtime: DistributedRuntime,
namespace: str,
metadata: PortMetadata,
port_range: DynamoPortRange,
) -> int:
"""
Allocate a port from the specified range and atomically reserve it.
This is a convenience wrapper around allocate_and_reserve_port_block with block_size=1.
Args:
metadata: Port metadata / context
port_range: DynamoPortRange object specifying min and max ports to try
Returns:
int: The allocated port number
"""
request = PortAllocationRequest(
metadata=metadata,
port_range=port_range,
block_size=1,
)
allocated_ports = await allocate_and_reserve_port_block(runtime, namespace, request)
if not allocated_ports:
raise RuntimeError("Failed to allocate required ports")
return allocated_ports[0] # Return the single allocated port
def get_host_ip() -> str:
"""Get the IP address of the host.
This is needed for the side channel to work in multi-node deployments.
"""
try:
host_name = socket.gethostname()
except socket.error as e:
logger.warning(f"Failed to get hostname: {e}, falling back to '127.0.0.1'")
return "127.0.0.1"
else:
try:
# Get the IP address of the hostname - this is needed for the side channel to work in multi-node deployments
host_ip = socket.gethostbyname(host_name)
# Test if the IP is actually usable by binding to it
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as test_socket:
test_socket.bind((host_ip, 0))
return host_ip
except socket.gaierror as e:
logger.warning(
f"Hostname '{host_name}' cannot be resolved: {e}, falling back to '127.0.0.1'"
)
return "127.0.0.1"
except socket.error as e:
# If hostname is not usable for binding, fall back to localhost
logger.warning(
f"Hostname '{host_name}' is not usable for binding: {e}, falling back to '127.0.0.1'"
)
return "127.0.0.1"
...@@ -25,12 +25,17 @@ CUDA_VISIBLE_DEVICES=0 DYN_KVBM_CPU_CACHE_GB=2 \ ...@@ -25,12 +25,17 @@ CUDA_VISIBLE_DEVICES=0 DYN_KVBM_CPU_CACHE_GB=2 \
python3 -m dynamo.vllm \ python3 -m dynamo.vllm \
--model $MODEL \ --model $MODEL \
--enforce-eager \ --enforce-eager \
--connector kvbm --gpu-memory-utilization 0.4 & --connector kvbm \
--gpu-memory-utilization 0.4 \
--kv-events-config '{"publisher":"zmq","topic":"kv-events","endpoint":"tcp://*:5556"}' &
DYN_KVBM_LEADER_ZMQ_PUB_PORT=56003 \ DYN_KVBM_LEADER_ZMQ_PUB_PORT=56003 \
DYN_KVBM_LEADER_ZMQ_ACK_PORT=56004 \ DYN_KVBM_LEADER_ZMQ_ACK_PORT=56004 \
VLLM_NIXL_SIDE_CHANNEL_PORT=20097 \
CUDA_VISIBLE_DEVICES=0 DYN_KVBM_CPU_CACHE_GB=2 \ CUDA_VISIBLE_DEVICES=0 DYN_KVBM_CPU_CACHE_GB=2 \
python3 -m dynamo.vllm \ python3 -m dynamo.vllm \
--model $MODEL \ --model $MODEL \
--enforce-eager \ --enforce-eager \
--connector kvbm --gpu-memory-utilization 0.4 --connector kvbm \
--gpu-memory-utilization 0.4 \
--kv-events-config '{"publisher":"zmq","topic":"kv-events","endpoint":"tcp://*:5557"}'
...@@ -23,10 +23,13 @@ CUDA_VISIBLE_DEVICES=0 python3 -m dynamo.vllm \ ...@@ -23,10 +23,13 @@ CUDA_VISIBLE_DEVICES=0 python3 -m dynamo.vllm \
--model $MODEL \ --model $MODEL \
--block-size $BLOCK_SIZE \ --block-size $BLOCK_SIZE \
--enforce-eager \ --enforce-eager \
--connector none & --connector none \
--kv-events-config '{"publisher":"zmq","topic":"kv-events","endpoint":"tcp://*:5556"}' &
VLLM_NIXL_SIDE_CHANNEL_PORT=20097 \
CUDA_VISIBLE_DEVICES=1 python3 -m dynamo.vllm \ CUDA_VISIBLE_DEVICES=1 python3 -m dynamo.vllm \
--model $MODEL \ --model $MODEL \
--block-size $BLOCK_SIZE \ --block-size $BLOCK_SIZE \
--enforce-eager \ --enforce-eager \
--connector none --connector none \
--kv-events-config '{"publisher":"zmq","topic":"kv-events","endpoint":"tcp://*:5557"}'
...@@ -12,6 +12,8 @@ python -m dynamo.frontend --router-mode kv --http-port=8000 & ...@@ -12,6 +12,8 @@ python -m dynamo.frontend --router-mode kv --http-port=8000 &
# Chose Qwen3-30B because its a small MOE that can fit on smaller GPUs (L40S for example) # Chose Qwen3-30B because its a small MOE that can fit on smaller GPUs (L40S for example)
# --enforce-eager is added for quick deployment. for production use, need to remove this flag # --enforce-eager is added for quick deployment. for production use, need to remove this flag
for i in {0..3}; do for i in {0..3}; do
DYN_VLLM_KV_EVENT_PORT=$((20080 + i)) \
VLLM_NIXL_SIDE_CHANNEL_PORT=$((20096 + i)) \
CUDA_VISIBLE_DEVICES=$i python3 -m dynamo.vllm \ CUDA_VISIBLE_DEVICES=$i python3 -m dynamo.vllm \
--model Qwen/Qwen3-30B-A3B \ --model Qwen/Qwen3-30B-A3B \
--data-parallel-rank $i \ --data-parallel-rank $i \
......
...@@ -10,6 +10,8 @@ python -m dynamo.frontend --http-port=8000 & ...@@ -10,6 +10,8 @@ python -m dynamo.frontend --http-port=8000 &
# --enforce-eager is added for quick deployment. for production use, need to remove this flag # --enforce-eager is added for quick deployment. for production use, need to remove this flag
CUDA_VISIBLE_DEVICES=0 python3 -m dynamo.vllm --model Qwen/Qwen3-0.6B --enforce-eager & CUDA_VISIBLE_DEVICES=0 python3 -m dynamo.vllm --model Qwen/Qwen3-0.6B --enforce-eager &
DYN_VLLM_KV_EVENT_PORT=20081 \
VLLM_NIXL_SIDE_CHANNEL_PORT=20097 \
CUDA_VISIBLE_DEVICES=1 python3 -m dynamo.vllm \ CUDA_VISIBLE_DEVICES=1 python3 -m dynamo.vllm \
--model Qwen/Qwen3-0.6B \ --model Qwen/Qwen3-0.6B \
--enforce-eager \ --enforce-eager \
......
...@@ -13,6 +13,8 @@ CUDA_VISIBLE_DEVICES=0 python3 -m dynamo.vllm --model Qwen/Qwen3-0.6B --connecto ...@@ -13,6 +13,8 @@ CUDA_VISIBLE_DEVICES=0 python3 -m dynamo.vllm --model Qwen/Qwen3-0.6B --connecto
# run prefill worker on GPU 1 with KVBM enabled using 20GB of CPU cache # run prefill worker on GPU 1 with KVBM enabled using 20GB of CPU cache
# NOTE: remove --enforce-eager for production use # NOTE: remove --enforce-eager for production use
DYN_VLLM_KV_EVENT_PORT=20081 \
VLLM_NIXL_SIDE_CHANNEL_PORT=20097 \
DYN_KVBM_CPU_CACHE_GB=20 \ DYN_KVBM_CPU_CACHE_GB=20 \
CUDA_VISIBLE_DEVICES=1 \ CUDA_VISIBLE_DEVICES=1 \
python3 -m dynamo.vllm \ python3 -m dynamo.vllm \
......
...@@ -10,11 +10,15 @@ python -m dynamo.frontend --router-mode kv --http-port=8000 & ...@@ -10,11 +10,15 @@ python -m dynamo.frontend --router-mode kv --http-port=8000 &
# run decode workers on GPU 0 and 1, without enabling KVBM # run decode workers on GPU 0 and 1, without enabling KVBM
# NOTE: remove --enforce-eager for production use # NOTE: remove --enforce-eager for production use
CUDA_VISIBLE_DEVICES=0 python3 -m dynamo.vllm --model Qwen/Qwen3-0.6B --connector nixl --enforce-eager & CUDA_VISIBLE_DEVICES=0 python3 -m dynamo.vllm --model Qwen/Qwen3-0.6B --connector nixl --enforce-eager &
DYN_VLLM_KV_EVENT_PORT=20081 \
VLLM_NIXL_SIDE_CHANNEL_PORT=20097 \
CUDA_VISIBLE_DEVICES=1 python3 -m dynamo.vllm --model Qwen/Qwen3-0.6B --connector nixl --enforce-eager & CUDA_VISIBLE_DEVICES=1 python3 -m dynamo.vllm --model Qwen/Qwen3-0.6B --connector nixl --enforce-eager &
# run prefill workers on GPU 2 and 3 with KVBM enabled using 20GB of CPU cache # run prefill workers on GPU 2 and 3 with KVBM enabled using 20GB of CPU cache
# NOTE: use different barrier id prefixes for each prefill worker to avoid conflicts # NOTE: use different barrier id prefixes for each prefill worker to avoid conflicts
# NOTE: remove --enforce-eager for production use # NOTE: remove --enforce-eager for production use
DYN_VLLM_KV_EVENT_PORT=20082 \
VLLM_NIXL_SIDE_CHANNEL_PORT=20098 \
DYN_KVBM_CPU_CACHE_GB=20 \ DYN_KVBM_CPU_CACHE_GB=20 \
CUDA_VISIBLE_DEVICES=2 \ CUDA_VISIBLE_DEVICES=2 \
python3 -m dynamo.vllm \ python3 -m dynamo.vllm \
...@@ -23,6 +27,8 @@ CUDA_VISIBLE_DEVICES=2 \ ...@@ -23,6 +27,8 @@ CUDA_VISIBLE_DEVICES=2 \
--connector kvbm nixl \ --connector kvbm nixl \
--enforce-eager & --enforce-eager &
DYN_VLLM_KV_EVENT_PORT=20083 \
VLLM_NIXL_SIDE_CHANNEL_PORT=20099 \
DYN_KVBM_LEADER_ZMQ_PUB_PORT=56003 \ DYN_KVBM_LEADER_ZMQ_PUB_PORT=56003 \
DYN_KVBM_LEADER_ZMQ_ACK_PORT=56004 \ DYN_KVBM_LEADER_ZMQ_ACK_PORT=56004 \
DYN_KVBM_CPU_CACHE_GB=20 \ DYN_KVBM_CPU_CACHE_GB=20 \
......
...@@ -14,6 +14,8 @@ CUDA_VISIBLE_DEVICES=0 python3 -m dynamo.vllm --model Qwen/Qwen3-0.6B & ...@@ -14,6 +14,8 @@ CUDA_VISIBLE_DEVICES=0 python3 -m dynamo.vllm --model Qwen/Qwen3-0.6B &
sleep 20 sleep 20
# run prefill worker on GPU 1 with LMCache # run prefill worker on GPU 1 with LMCache
DYN_VLLM_KV_EVENT_PORT=20081 \
VLLM_NIXL_SIDE_CHANNEL_PORT=20097 \
ENABLE_LMCACHE=1 \ ENABLE_LMCACHE=1 \
LMCACHE_CHUNK_SIZE=256 \ LMCACHE_CHUNK_SIZE=256 \
LMCACHE_LOCAL_CPU=True \ LMCACHE_LOCAL_CPU=True \
......
...@@ -23,24 +23,31 @@ python -m dynamo.frontend \ ...@@ -23,24 +23,31 @@ python -m dynamo.frontend \
CUDA_VISIBLE_DEVICES=0 python3 -m dynamo.vllm \ CUDA_VISIBLE_DEVICES=0 python3 -m dynamo.vllm \
--model $MODEL \ --model $MODEL \
--block-size $BLOCK_SIZE \ --block-size $BLOCK_SIZE \
--enforce-eager & --enforce-eager \
--kv-events-config '{"publisher":"zmq","topic":"kv-events","endpoint":"tcp://*:5556"}' &
VLLM_NIXL_SIDE_CHANNEL_PORT=20097 \
CUDA_VISIBLE_DEVICES=1 python3 -m dynamo.vllm \ CUDA_VISIBLE_DEVICES=1 python3 -m dynamo.vllm \
--model $MODEL \ --model $MODEL \
--block-size $BLOCK_SIZE \ --block-size $BLOCK_SIZE \
--enforce-eager & --enforce-eager \
--kv-events-config '{"publisher":"zmq","topic":"kv-events","endpoint":"tcp://*:5557"}' &
# two prefill workers # two prefill workers
# When registered with --is-prefill-worker, these workers are automatically detected # When registered with --is-prefill-worker, these workers are automatically detected
# by the frontend, which activates an internal prefill router for KV-aware prefill routing # by the frontend, which activates an internal prefill router for KV-aware prefill routing
VLLM_NIXL_SIDE_CHANNEL_PORT=20098 \
CUDA_VISIBLE_DEVICES=2 python3 -m dynamo.vllm \ CUDA_VISIBLE_DEVICES=2 python3 -m dynamo.vllm \
--model $MODEL \ --model $MODEL \
--block-size $BLOCK_SIZE \ --block-size $BLOCK_SIZE \
--enforce-eager \ --enforce-eager \
--is-prefill-worker & --is-prefill-worker \
--kv-events-config '{"publisher":"zmq","topic":"kv-events","endpoint":"tcp://*:5558"}'&
VLLM_NIXL_SIDE_CHANNEL_PORT=20099 \
CUDA_VISIBLE_DEVICES=3 python3 -m dynamo.vllm \ CUDA_VISIBLE_DEVICES=3 python3 -m dynamo.vllm \
--model $MODEL \ --model $MODEL \
--block-size $BLOCK_SIZE \ --block-size $BLOCK_SIZE \
--enforce-eager \ --enforce-eager \
--is-prefill-worker --is-prefill-worker \
--kv-events-config '{"publisher":"zmq","topic":"kv-events","endpoint":"tcp://*:5559"}'
...@@ -93,6 +93,8 @@ mkdir -p $LOG_DIR ...@@ -93,6 +93,8 @@ mkdir -p $LOG_DIR
for ((i=0; i<GPUS_PER_NODE; i++)); do for ((i=0; i<GPUS_PER_NODE; i++)); do
dp_rank=$((i + NODE_RANK * GPUS_PER_NODE)) dp_rank=$((i + NODE_RANK * GPUS_PER_NODE))
CUDA_VISIBLE_DEVICES=$i \ CUDA_VISIBLE_DEVICES=$i \
DYN_VLLM_KV_EVENT_PORT=$((20080 + i)) \
VLLM_NIXL_SIDE_CHANNEL_PORT=$((20096 + i)) \
VLLM_ALL2ALL_BACKEND="deepep_low_latency" \ VLLM_ALL2ALL_BACKEND="deepep_low_latency" \
VLLM_USE_DEEP_GEMM=1 \ VLLM_USE_DEEP_GEMM=1 \
VLLM_RANDOMIZE_DP_DUMMY_INPUTS=1 \ VLLM_RANDOMIZE_DP_DUMMY_INPUTS=1 \
......
...@@ -64,6 +64,12 @@ class DynamoWorkerProcess(ManagedProcess): ...@@ -64,6 +64,12 @@ class DynamoWorkerProcess(ManagedProcess):
env["DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS"] = '["generate"]' env["DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS"] = '["generate"]'
env["DYN_SYSTEM_PORT"] = port env["DYN_SYSTEM_PORT"] = port
# Set KV event port and NIXL side channel port only for prefill worker
# to avoid conflicts with decode worker
if is_prefill:
env["DYN_VLLM_KV_EVENT_PORT"] = "20082"
env["VLLM_NIXL_SIDE_CHANNEL_PORT"] = "5601"
# Set log directory based on worker type # Set log directory based on worker type
worker_type = "prefill_worker" if is_prefill else "worker" worker_type = "prefill_worker" if is_prefill else "worker"
log_dir = f"{request.node.name}_{worker_type}" log_dir = f"{request.node.name}_{worker_type}"
......
...@@ -47,6 +47,9 @@ class DynamoWorkerProcess(ManagedProcess): ...@@ -47,6 +47,9 @@ class DynamoWorkerProcess(ManagedProcess):
# Set debug logging environment # Set debug logging environment
env = os.environ.copy() env = os.environ.copy()
env["DYN_VLLM_KV_EVENT_PORT"] = f"2008{worker_id[-1]}"
env["VLLM_NIXL_SIDE_CHANNEL_PORT"] = f"560{worker_id[-1]}"
env["DYN_LOG"] = "debug" env["DYN_LOG"] = "debug"
env["DYN_SYSTEM_ENABLED"] = "true" env["DYN_SYSTEM_ENABLED"] = "true"
env["DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS"] = '["generate"]' env["DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS"] = '["generate"]'
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment