Unverified Commit 6da23fcf authored by Keiven C's avatar Keiven C Committed by GitHub
Browse files

test: make serve tests xdist-safe with dynamic ports (#4951)


Signed-off-by: default avatarKeiven Chang <keivenchang@users.noreply.github.com>
Co-authored-by: default avatarKeiven Chang <keivenchang@users.noreply.github.com>
parent 7d90b2d1
......@@ -268,6 +268,13 @@ def parse_args():
async def async_main():
# The system status server port is a worker concern.
#
# Serve tests set DYN_SYSTEM_PORT for the worker, but aggregated launch scripts
# start `dynamo.frontend` first. If the frontend inherits DYN_SYSTEM_PORT, it can
# bind that port before the worker, causing port conflicts and/or scraping the
# wrong metrics endpoint.
os.environ.pop("DYN_SYSTEM_PORT", None)
flags = parse_args()
dump_config(flags.dump_config_to, flags)
......
......@@ -54,7 +54,9 @@ DYNAMO_PID=$!
#AssertionError: Prefill round robin balance is required when dp size > 1. Please make sure that the prefill instance is launched with `--load-balance-method round_robin` and `--prefill-round-robin-balance` is set for decode server.
# run prefill worker
OTEL_SERVICE_NAME=dynamo-worker-prefill DYN_SYSTEM_PORT=${DYN_SYSTEM_PORT_PREFILL:-8081} \
# Use DYN_SYSTEM_PORT1/2 instead of *_PREFILL/*_DECODE env names so test
# harnesses can set one simple pair for disaggregated deployments.
OTEL_SERVICE_NAME=dynamo-worker-prefill DYN_SYSTEM_PORT=${DYN_SYSTEM_PORT1:-8081} \
python3 -m dynamo.sglang \
--model-path silence09/DeepSeek-R1-Small-2layers \
--served-model-name silence09/DeepSeek-R1-Small-2layers \
......@@ -72,7 +74,7 @@ python3 -m dynamo.sglang \
PREFILL_PID=$!
# run decode worker
OTEL_SERVICE_NAME=dynamo-worker-decode DYN_SYSTEM_PORT=${DYN_SYSTEM_PORT_DECODE:-8082} \
OTEL_SERVICE_NAME=dynamo-worker-decode DYN_SYSTEM_PORT=${DYN_SYSTEM_PORT2:-8082} \
CUDA_VISIBLE_DEVICES=2,3 python3 -m dynamo.sglang \
--model-path silence09/DeepSeek-R1-Small-2layers \
--served-model-name silence09/DeepSeek-R1-Small-2layers \
......
......@@ -56,7 +56,9 @@ python3 -m dynamo.frontend \
DYNAMO_PID=$!
# run prefill router
OTEL_SERVICE_NAME=dynamo-router-prefill DYN_SYSTEM_PORT=${DYN_SYSTEM_PORT_PREFILL_ROUTER:-8081} \
# Use numeric DYN_SYSTEM_PORT{N} env vars so launchers/test harnesses can set
# ports without encoding role names (prefill/decode) in the env var.
OTEL_SERVICE_NAME=dynamo-router-prefill DYN_SYSTEM_PORT=${DYN_SYSTEM_PORT1:-8081} \
python3 -m dynamo.router \
--endpoint dynamo.prefill.generate \
--block-size 64 \
......@@ -65,7 +67,7 @@ python3 -m dynamo.router \
PREFILL_ROUTER_PID=$!
# run prefill worker
OTEL_SERVICE_NAME=dynamo-worker-prefill-1 DYN_SYSTEM_PORT=${DYN_SYSTEM_PORT_PREFILL_WORKER1:-8082} \
OTEL_SERVICE_NAME=dynamo-worker-prefill-1 DYN_SYSTEM_PORT=${DYN_SYSTEM_PORT2:-8082} \
python3 -m dynamo.sglang \
--model-path deepseek-ai/DeepSeek-R1-Distill-Llama-8B \
--served-model-name deepseek-ai/DeepSeek-R1-Distill-Llama-8B \
......@@ -81,7 +83,7 @@ python3 -m dynamo.sglang \
PREFILL_PID=$!
# run prefill worker
OTEL_SERVICE_NAME=dynamo-worker-prefill-2 DYN_SYSTEM_PORT=${DYN_SYSTEM_PORT_PREFILL_WORKER2:-8083} \
OTEL_SERVICE_NAME=dynamo-worker-prefill-2 DYN_SYSTEM_PORT=${DYN_SYSTEM_PORT3:-8083} \
CUDA_VISIBLE_DEVICES=1 python3 -m dynamo.sglang \
--model-path deepseek-ai/DeepSeek-R1-Distill-Llama-8B \
--served-model-name deepseek-ai/DeepSeek-R1-Distill-Llama-8B \
......@@ -97,7 +99,7 @@ CUDA_VISIBLE_DEVICES=1 python3 -m dynamo.sglang \
PREFILL_PID=$!
# run decode worker
OTEL_SERVICE_NAME=dynamo-worker-decode-1 DYN_SYSTEM_PORT=${DYN_SYSTEM_PORT_DECODE_WORKER1:-8084} \
OTEL_SERVICE_NAME=dynamo-worker-decode-1 DYN_SYSTEM_PORT=${DYN_SYSTEM_PORT4:-8084} \
CUDA_VISIBLE_DEVICES=3 python3 -m dynamo.sglang \
--model-path deepseek-ai/DeepSeek-R1-Distill-Llama-8B \
--served-model-name deepseek-ai/DeepSeek-R1-Distill-Llama-8B \
......@@ -113,7 +115,7 @@ CUDA_VISIBLE_DEVICES=3 python3 -m dynamo.sglang \
PREFILL_PID=$!
# run decode worker
OTEL_SERVICE_NAME=dynamo-worker-decode-2 DYN_SYSTEM_PORT=${DYN_SYSTEM_PORT_DECODE_WORKER2:-8085} \
OTEL_SERVICE_NAME=dynamo-worker-decode-2 DYN_SYSTEM_PORT=${DYN_SYSTEM_PORT5:-8085} \
CUDA_VISIBLE_DEVICES=2 python3 -m dynamo.sglang \
--model-path deepseek-ai/DeepSeek-R1-Distill-Llama-8B \
--served-model-name deepseek-ai/DeepSeek-R1-Distill-Llama-8B \
......
......@@ -19,6 +19,11 @@ python -m dynamo.frontend \
# run workers
# --enforce-eager is added for quick deployment. for production use, need to remove this flag
#
# If multiple workers are launched, they must not share the same system/metrics port.
# Use DYN_SYSTEM_PORT{1,2} so tests/launchers can provide a simple numbered port set.
#
DYN_SYSTEM_PORT=${DYN_SYSTEM_PORT1:-8081} \
CUDA_VISIBLE_DEVICES=0 python3 -m dynamo.vllm \
--model $MODEL \
--block-size $BLOCK_SIZE \
......@@ -26,6 +31,7 @@ CUDA_VISIBLE_DEVICES=0 python3 -m dynamo.vllm \
--connector none \
--kv-events-config '{"publisher":"zmq","topic":"kv-events","endpoint":"tcp://*:20080","enable_kv_cache_events":true}' &
DYN_SYSTEM_PORT=${DYN_SYSTEM_PORT2:-8082} \
VLLM_NIXL_SIDE_CHANNEL_PORT=20097 \
CUDA_VISIBLE_DEVICES=1 python3 -m dynamo.vllm \
--model $MODEL \
......
......@@ -9,8 +9,10 @@ trap 'echo Cleaning up...; kill 0' EXIT
python -m dynamo.frontend &
# --enforce-eager is added for quick deployment. for production use, need to remove this flag
CUDA_VISIBLE_DEVICES=0 python3 -m dynamo.vllm --model Qwen/Qwen3-0.6B --enforce-eager &
DYN_SYSTEM_PORT=${DYN_SYSTEM_PORT1:-8081} \
CUDA_VISIBLE_DEVICES=0 python3 -m dynamo.vllm --model Qwen/Qwen3-0.6B --enforce-eager &
DYN_SYSTEM_PORT=${DYN_SYSTEM_PORT2:-8082} \
DYN_VLLM_KV_EVENT_PORT=20081 \
VLLM_NIXL_SIDE_CHANNEL_PORT=20097 \
CUDA_VISIBLE_DEVICES=1 python3 -m dynamo.vllm \
......
......@@ -48,7 +48,9 @@ DYNAMO_PID=$!
# run decode worker with metrics on port 8081
# --enforce-eager is added for quick deployment. for production use, need to remove this flag
DYN_SYSTEM_PORT=${DYN_SYSTEM_PORT:-8081} \
# For disaggregated deployments we standardize on DYN_SYSTEM_PORT1/2 instead of
# *_PREFILL/*_DECODE env names so test harnesses can set one simple pair.
DYN_SYSTEM_PORT=${DYN_SYSTEM_PORT1:-8081} \
CUDA_VISIBLE_DEVICES=0 \
python3 -m dynamo.vllm \
--model Qwen/Qwen3-0.6B \
......@@ -66,7 +68,7 @@ echo "Waiting for decode worker to initialize..."
sleep 10
# run prefill worker with metrics on port 8082 (foreground)
DYN_SYSTEM_PORT=${DYN_SYSTEM_PORT_PREFILL:-8082} \
DYN_SYSTEM_PORT=${DYN_SYSTEM_PORT2:-8082} \
DYN_VLLM_KV_EVENT_PORT=20081 \
VLLM_NIXL_SIDE_CHANNEL_PORT=20097 \
CUDA_VISIBLE_DEVICES=0 \
......
......@@ -17,11 +17,12 @@ export DYN_LORA_PATH=/tmp/dynamo_loras_minio
mkdir -p $DYN_LORA_PATH
# run ingress
python -m dynamo.frontend --http-port=8000 &
# dynamo.frontend accepts either --http-port flag or DYN_HTTP_PORT env var.
python -m dynamo.frontend &
# run worker
# --enforce-eager is added for quick deployment. for production use, need to remove this flag
DYN_SYSTEM_ENABLED=true DYN_SYSTEM_PORT=8081 \
DYN_SYSTEM_ENABLED=true DYN_SYSTEM_PORT=${DYN_SYSTEM_PORT1:-8081} \
python -m dynamo.vllm --model Qwen/Qwen3-0.6B --enforce-eager \
--connector none \
--enable-lora \
......
......@@ -31,7 +31,7 @@ python -m dynamo.frontend \
# run workers
# --enforce-eager is added for quick deployment. for production use, need to remove this flag
DYN_SYSTEM_ENABLED=true DYN_SYSTEM_PORT=8082 \
DYN_SYSTEM_ENABLED=true DYN_SYSTEM_PORT=${DYN_SYSTEM_PORT1:-8081} \
CUDA_VISIBLE_DEVICES=0 python3 -m dynamo.vllm \
--model $MODEL \
--block-size $BLOCK_SIZE \
......@@ -41,7 +41,7 @@ CUDA_VISIBLE_DEVICES=0 python3 -m dynamo.vllm \
--max-lora-rank 64 \
--kv-events-config '{"publisher":"zmq","topic":"kv-events","endpoint":"tcp://*:20080","enable_kv_cache_events":true}' &
DYN_SYSTEM_ENABLED=true DYN_SYSTEM_PORT=8081 \
DYN_SYSTEM_ENABLED=true DYN_SYSTEM_PORT=${DYN_SYSTEM_PORT2:-8082} \
VLLM_NIXL_SIDE_CHANNEL_PORT=20097 \
CUDA_VISIBLE_DEVICES=1 python3 -m dynamo.vllm \
--model $MODEL \
......
......@@ -3,15 +3,19 @@
"""Common base classes and utilities for engine tests (vLLM, TRT-LLM, etc.)"""
import dataclasses
import logging
import os
from collections.abc import Mapping
from copy import deepcopy
from typing import Any, Dict, Optional
import pytest
from dynamo.common.utils.paths import WORKSPACE_DIR
from tests.serve.conftest import ServicePorts
from tests.utils.client import send_request
from tests.utils.constants import DefaultPort
from tests.utils.engine_process import EngineConfig, EngineProcess
DEFAULT_TIMEOUT = 10
......@@ -22,6 +26,8 @@ SERVE_TEST_DIR = os.path.join(WORKSPACE_DIR, "tests/serve")
def run_serve_deployment(
config: EngineConfig,
request: Any,
*,
ports: ServicePorts | None = None, # pass `dynamo_dynamic_ports` here
extra_env: Optional[Dict[str, str]] = None,
) -> None:
"""Run a standard serve deployment test for any EngineConfig.
......@@ -41,30 +47,83 @@ def run_serve_deployment(
logger.info("Using model: %s", config.model)
logger.info("Script: %s", config.script_name)
merged_env: dict[str, str] = {}
if extra_env:
merged_env.update(extra_env)
if ports is not None:
dynamic_frontend_port = int(ports.frontend_port)
dynamic_system_port1 = int(ports.system_port1)
dynamic_system_port2 = int(ports.system_port2)
# The environments are used by the bash scripts to set the ports.
merged_env.update(
{
"DYN_HTTP_PORT": str(dynamic_frontend_port),
# Alias for PORT1 (many scripts only read this).
"DYN_SYSTEM_PORT": str(dynamic_system_port1),
"DYN_SYSTEM_PORT1": str(dynamic_system_port1),
"DYN_SYSTEM_PORT2": str(dynamic_system_port2),
}
)
# Ensure EngineProcess health checks hit the correct frontend port.
config = dataclasses.replace(config, frontend_port=dynamic_frontend_port)
else:
# Backward compat: infer from config/extra_env if no explicit ports are passed.
dynamic_frontend_port = int(config.frontend_port)
dynamic_system_port1 = int(
merged_env.get("DYN_SYSTEM_PORT1")
or merged_env.get("DYN_SYSTEM_PORT")
or DefaultPort.SYSTEM1.value
)
dynamic_system_port2 = int(
merged_env.get("DYN_SYSTEM_PORT2") or DefaultPort.SYSTEM2.value
)
with EngineProcess.from_script(
config, request, extra_env=extra_env
config, request, extra_env=merged_env
) as server_process:
for payload in config.request_payloads:
logger.info("TESTING: Payload: %s", payload.__class__.__name__)
for _payload in config.request_payloads:
logger.info("TESTING: Payload: %s", _payload.__class__.__name__)
payload_item = payload
# Make a per-iteration copy so tests can safely override ports/fields
# without mutating shared config instances across parametrized cases.
payload = deepcopy(_payload)
# inject model
if hasattr(payload_item, "with_model"):
payload_item = payload_item.with_model(config.model)
if payload_item.port != config.models_port:
logger.warning(
f"Current payload port: {payload_item.port} doesn't match the model port: {config.models_port}"
)
for _ in range(payload_item.repeat_count):
if hasattr(payload, "with_model"):
payload = payload.with_model(config.model)
# Default behavior: requests go to the frontend port, except metrics which target
# worker system ports (mapped from DefaultPort -> per-test ports).
if getattr(payload, "endpoint", "") == "/metrics":
if payload.port == DefaultPort.SYSTEM1.value:
payload.port = dynamic_system_port1
elif payload.port == DefaultPort.SYSTEM2.value:
payload.port = dynamic_system_port2
else:
payload.port = dynamic_frontend_port
# Optional extra system ports for specialized payloads (e.g. LoRA control-plane APIs).
# BasePayload always defines `system_ports` (usually empty); map defaults
# (SYSTEM_PORT1/2) to per-test system ports when present.
if payload.system_ports:
mapped_system_ports: list[int] = []
for p in payload.system_ports:
if p == DefaultPort.SYSTEM1.value:
mapped_system_ports.append(dynamic_system_port1)
elif p == DefaultPort.SYSTEM2.value:
mapped_system_ports.append(dynamic_system_port2)
else:
mapped_system_ports.append(p)
payload.system_ports = mapped_system_ports
for _ in range(payload.repeat_count):
response = send_request(
url=payload_item.url(),
payload=payload_item.body,
timeout=payload_item.timeout,
method=payload_item.method,
url=payload.url(),
payload=payload.body,
timeout=payload.timeout,
method=payload.method,
)
server_process.check_response(payload_item, response)
server_process.check_response(payload, response)
def params_with_model_mark(configs: Mapping[str, EngineConfig]):
......
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import os
import shutil
from dataclasses import dataclass
from typing import Generator
import pytest
from pytest_httpserver import HTTPServer
from dynamo.common.utils.paths import WORKSPACE_DIR
from tests.serve.lora_utils import MinioLoraConfig, MinioService
from tests.utils.constants import DefaultPort
from tests.utils.port_utils import allocate_port, allocate_ports, deallocate_ports
# Shared constants for multimodal testing
IMAGE_SERVER_PORT = 8765
......@@ -18,6 +22,38 @@ MULTIMODAL_IMG_PATH = os.path.join(
MULTIMODAL_IMG_URL = f"http://localhost:{IMAGE_SERVER_PORT}/llm-graphic.png"
@dataclass(frozen=True)
class ServicePorts:
frontend_port: int
system_port1: int
system_port2: int
@pytest.fixture(scope="function")
def dynamo_dynamic_ports() -> Generator[ServicePorts, None, None]:
"""Allocate per-test ports for serve-style deployments.
- frontend_port: OpenAI-compatible HTTP ingress (dynamo.frontend)
- system_port1/system_port2: worker metrics/system ports (used by some scripts)
Note: some disaggregated launch scripts can spawn more than two workers; if/when
serve tests start exercising those scripts, we'll extend this fixture to allocate
additional system ports (e.g. system_port3+ / DYN_SYSTEM_PORT3+).
"""
frontend_port = allocate_port(DefaultPort.FRONTEND.value)
system_ports = allocate_ports(2, DefaultPort.SYSTEM1.value)
ports = [frontend_port, *system_ports]
try:
yield ServicePorts(
frontend_port=frontend_port,
system_port1=system_ports[0],
system_port2=system_ports[1],
)
finally:
deallocate_ports(ports)
@pytest.fixture(scope="session")
def httpserver_listen_address():
return ("127.0.0.1", IMAGE_SERVER_PORT)
......@@ -72,6 +108,12 @@ def minio_lora_service():
# Use config.get_env_vars() for environment setup
# Use config.get_s3_uri() to get the S3 URI for loading LoRA
"""
# LoRA serve tests spin up a local MinIO via Docker. Some environments are
# intentionally minimal (e.g. vLLM-only containers) and do not include the
# docker CLI, in which case we skip the LoRA tests.
if shutil.which("docker") is None:
pytest.skip("LoRA serve tests require the docker CLI (MinIO container).")
config = MinioLoraConfig()
service = MinioService(config)
......
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import dataclasses
import logging
import os
from dataclasses import dataclass, field
......@@ -13,6 +14,7 @@ from tests.serve.common import (
params_with_model_mark,
run_serve_deployment,
)
from tests.utils.constants import DefaultPort
from tests.utils.engine_process import EngineConfig
from tests.utils.payload_builder import (
chat_payload,
......@@ -54,7 +56,7 @@ sglang_configs = {
],
model="Qwen/Qwen3-0.6B",
env={},
models_port=8000,
frontend_port=DefaultPort.FRONTEND.value,
request_payloads=[
chat_payload_default(),
completion_payload_default(),
......@@ -68,7 +70,7 @@ sglang_configs = {
marks=[pytest.mark.gpu_2, pytest.mark.post_merge],
model="Qwen/Qwen3-0.6B",
env={},
models_port=8000,
frontend_port=DefaultPort.FRONTEND.value,
request_payloads=[
chat_payload_default(),
completion_payload_default(),
......@@ -76,7 +78,8 @@ sglang_configs = {
),
"disaggregated_same_gpu": SGLangConfig(
# Uses disagg_same_gpu.sh for single-GPU disaggregated testing
# Validates metrics from both prefill (port 8081) and decode (port 8082) workers
# Validates metrics from both prefill (DefaultPort.SYSTEM1) and decode
# (DefaultPort.SYSTEM2) workers
name="disaggregated_same_gpu",
directory=sglang_dir,
script_name="disagg_same_gpu.sh",
......@@ -87,14 +90,24 @@ sglang_configs = {
],
model="Qwen/Qwen3-0.6B",
env={},
models_port=8000,
frontend_port=DefaultPort.FRONTEND.value,
request_payloads=[
chat_payload_default(),
completion_payload_default(),
# Validate dynamo_component_* and sglang:* metrics from prefill worker (port 8081)
metric_payload_default(min_num_requests=6, backend="sglang", port=8081),
# Validate dynamo_component_* and sglang:* metrics from decode worker (port 8082)
metric_payload_default(min_num_requests=6, backend="sglang", port=8082),
# Validate dynamo_component_* and sglang:* metrics from prefill worker
# (DefaultPort.SYSTEM1)
metric_payload_default(
min_num_requests=6,
backend="sglang",
port=DefaultPort.SYSTEM1.value,
),
# Validate dynamo_component_* and sglang:* metrics from decode worker
# (DefaultPort.SYSTEM2)
metric_payload_default(
min_num_requests=6,
backend="sglang",
port=DefaultPort.SYSTEM2.value,
),
],
),
"kv_events": SGLangConfig(
......@@ -106,7 +119,7 @@ sglang_configs = {
env={
"DYN_LOG": "dynamo_llm::kv_router::publisher=trace,dynamo_llm::kv_router::scheduler=info",
},
models_port=8000,
frontend_port=DefaultPort.FRONTEND.value,
request_payloads=[
chat_payload_default(
expected_log=[
......@@ -135,7 +148,7 @@ sglang_configs = {
],
model="Qwen/Qwen3-0.6B",
env={},
models_port=8000,
frontend_port=DefaultPort.FRONTEND.value,
request_payloads=[
chat_payload_default(
expected_response=["Successfully Applied Chat Template"]
......@@ -150,7 +163,7 @@ sglang_configs = {
model="Qwen/Qwen2.5-VL-7B-Instruct",
delayed_start=0,
timeout=360,
models_port=8000,
frontend_port=DefaultPort.FRONTEND.value,
request_payloads=[
chat_payload(
[
......@@ -183,7 +196,7 @@ sglang_configs = {
],
model="Qwen/Qwen3-Embedding-4B",
delayed_start=0,
models_port=8000,
frontend_port=DefaultPort.FRONTEND.value,
request_payloads=[
# Test default payload with multiple inputs
embedding_payload_default(
......@@ -241,11 +254,17 @@ def sglang_config_test(request):
@pytest.mark.e2e
@pytest.mark.sglang
def test_sglang_deployment(
sglang_config_test, request, runtime_services, predownload_models
sglang_config_test,
request,
runtime_services_dynamic_ports,
dynamo_dynamic_ports,
predownload_models,
):
"""Test SGLang deployment scenarios using common helpers"""
config = sglang_config_test
run_serve_deployment(config, request)
config = dataclasses.replace(
sglang_config_test, frontend_port=dynamo_dynamic_ports.frontend_port
)
run_serve_deployment(config, request, ports=dynamo_dynamic_ports)
@pytest.mark.e2e
......@@ -255,7 +274,9 @@ def test_sglang_deployment(
@pytest.mark.skip(
reason="Requires 4 GPUs - enable when hardware is consistently available"
)
def test_sglang_disagg_dp_attention(request, runtime_services, predownload_models):
def test_sglang_disagg_dp_attention(
request, runtime_services_dynamic_ports, dynamo_dynamic_ports, predownload_models
):
"""Test sglang disaggregated with DP attention (requires 4 GPUs)"""
# Kept for reference; this test uses a different launch path and is skipped
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import dataclasses
import logging
import os
from dataclasses import dataclass, field
......@@ -12,6 +13,7 @@ from tests.serve.common import (
params_with_model_mark,
run_serve_deployment,
)
from tests.utils.constants import DefaultPort
from tests.utils.engine_process import EngineConfig
from tests.utils.payload_builder import (
TEXT_PROMPT,
......@@ -54,7 +56,7 @@ trtllm_configs = {
), # 3x measured time (44.66s) + download time (150s)
],
model="Qwen/Qwen3-0.6B",
models_port=8000,
frontend_port=DefaultPort.FRONTEND.value,
request_payloads=[
chat_payload_default(),
completion_payload_default(),
......@@ -67,7 +69,7 @@ trtllm_configs = {
script_name="disagg.sh",
marks=[pytest.mark.gpu_2, pytest.mark.trtllm, pytest.mark.post_merge],
model="Qwen/Qwen3-0.6B",
models_port=8000,
frontend_port=DefaultPort.FRONTEND.value,
request_payloads=[
chat_payload_default(),
completion_payload_default(),
......@@ -87,12 +89,16 @@ trtllm_configs = {
), # 3x measured time (103.66s) + download time (150s)
],
model="Qwen/Qwen3-0.6B",
models_port=8000,
frontend_port=DefaultPort.FRONTEND.value,
request_payloads=[
chat_payload_default(),
completion_payload_default(),
metric_payload_default(port=8081, min_num_requests=6, backend="trtllm"),
metric_payload_default(port=8082, min_num_requests=6, backend="trtllm"),
metric_payload_default(
port=DefaultPort.SYSTEM1.value, min_num_requests=6, backend="trtllm"
),
metric_payload_default(
port=DefaultPort.SYSTEM2.value, min_num_requests=6, backend="trtllm"
),
],
),
"aggregated_logprobs": TRTLLMConfig(
......@@ -101,7 +107,7 @@ trtllm_configs = {
script_name="agg.sh",
marks=[pytest.mark.gpu_1, pytest.mark.pre_merge, pytest.mark.trtllm],
model="Qwen/Qwen3-0.6B",
models_port=8000,
frontend_port=DefaultPort.FRONTEND.value,
request_payloads=[
chat_payload(content=TEXT_PROMPT, logprobs=True, top_logprobs=5),
chat_payload(content=TEXT_PROMPT, logprobs=False, top_logprobs=5),
......@@ -115,7 +121,7 @@ trtllm_configs = {
script_name="disagg.sh",
marks=[pytest.mark.gpu_2, pytest.mark.post_merge, pytest.mark.trtllm],
model="Qwen/Qwen3-0.6B",
models_port=8000,
frontend_port=DefaultPort.FRONTEND.value,
request_payloads=[
chat_payload(content=TEXT_PROMPT, logprobs=True, top_logprobs=5),
chat_payload(content=TEXT_PROMPT, logprobs=False, top_logprobs=5),
......@@ -136,7 +142,7 @@ trtllm_configs = {
), # 3x measured time (37.91s) + download time (180s)
],
model="Qwen/Qwen3-0.6B",
models_port=8000,
frontend_port=DefaultPort.FRONTEND.value,
request_payloads=[
chat_payload_default(
expected_log=[
......@@ -155,7 +161,7 @@ trtllm_configs = {
script_name="disagg_router.sh",
marks=[pytest.mark.gpu_2, pytest.mark.trtllm, pytest.mark.nightly],
model="Qwen/Qwen3-0.6B",
models_port=8000,
frontend_port=DefaultPort.FRONTEND.value,
request_payloads=[
chat_payload_default(),
completion_payload_default(),
......@@ -167,7 +173,7 @@ trtllm_configs = {
script_name="disagg_multimodal.sh",
marks=[pytest.mark.gpu_2, pytest.mark.trtllm, pytest.mark.multimodal],
model="Qwen/Qwen2-VL-7B-Instruct",
models_port=8000,
frontend_port=DefaultPort.FRONTEND.value,
timeout=900,
delayed_start=60,
request_payloads=[multimodal_payload_default()],
......@@ -205,13 +211,28 @@ def trtllm_config_test(request):
@pytest.mark.trtllm
@pytest.mark.e2e
def test_deployment(trtllm_config_test, request, runtime_services, predownload_models):
def test_deployment(
trtllm_config_test,
request,
runtime_services_dynamic_ports,
dynamo_dynamic_ports,
predownload_models,
):
"""
Test dynamo deployments with different configurations.
"""
config = trtllm_config_test
extra_env = {"MODEL_PATH": config.model, "SERVED_MODEL_NAME": config.model}
run_serve_deployment(config, request, extra_env=extra_env)
# Use per-test ports so tests can run safely under pytest-xdist.
config = dataclasses.replace(
trtllm_config_test, frontend_port=dynamo_dynamic_ports.frontend_port
)
# Non-port env stays here; ports are wired by run_serve_deployment(ports=...).
config.env.update(
{
"MODEL_PATH": config.model,
"SERVED_MODEL_NAME": config.model,
}
)
run_serve_deployment(config, request, ports=dynamo_dynamic_ports)
# TODO make this a normal guy
......@@ -220,7 +241,11 @@ def test_deployment(trtllm_config_test, request, runtime_services, predownload_m
@pytest.mark.trtllm
@pytest.mark.timeout(660) # 3x measured time (159.68s) + download time (180s)
def test_chat_only_aggregated_with_test_logits_processor(
request, runtime_services, predownload_models, monkeypatch
request,
runtime_services_dynamic_ports,
dynamo_dynamic_ports,
predownload_models,
monkeypatch,
):
"""
Run a single aggregated chat-completions test using Qwen 0.6B with the
......@@ -244,4 +269,13 @@ def test_chat_only_aggregated_with_test_logits_processor(
timeout=base.timeout,
)
run_serve_deployment(config, request)
config = dataclasses.replace(
config, frontend_port=dynamo_dynamic_ports.frontend_port
)
config.env.update(
{
"MODEL_PATH": config.model,
"SERVED_MODEL_NAME": config.model,
}
)
run_serve_deployment(config, request, ports=dynamo_dynamic_ports)
......@@ -2,6 +2,7 @@
# SPDX-License-Identifier: Apache-2.0
import base64
import dataclasses
import logging
import os
import random
......@@ -17,6 +18,7 @@ from tests.serve.common import (
)
from tests.serve.conftest import MULTIMODAL_IMG_PATH, MULTIMODAL_IMG_URL
from tests.serve.lora_utils import MinioLoraConfig
from tests.utils.constants import DefaultPort
from tests.utils.engine_process import EngineConfig
from tests.utils.payload_builder import (
chat_payload,
......@@ -45,7 +47,10 @@ vllm_dir = os.environ.get("VLLM_DIR") or os.path.join(
# vLLM test configurations
# NOTE: pytest.mark.gpu_1 tests take ~5.5 minutes total to run sequentially (with models pre-cached)
# TODO: Parallelize these tests to reduce total execution time
# TODO: Now that these tests use dynamic ports, optimize the runtime by bin-packing and running
# multiple engine deployments in parallel (while keeping GPU contention under control). This may
# require annotating each config with approximate GPU RAM usage so a future collector/launcher can
# bin-pack safely.
vllm_configs = {
"aggregated": VLLMConfig(
name="aggregated",
......@@ -565,19 +570,29 @@ def vllm_config_test(request):
@pytest.mark.e2e
@pytest.mark.nightly
def test_serve_deployment(
vllm_config_test, request, runtime_services, predownload_models, image_server
vllm_config_test,
request,
runtime_services_dynamic_ports,
dynamo_dynamic_ports,
predownload_models,
image_server,
):
"""
Test dynamo serve deployments with different graph configurations.
"""
config = vllm_config_test
run_serve_deployment(config, request)
config = dataclasses.replace(
vllm_config_test, frontend_port=dynamo_dynamic_ports.frontend_port
)
run_serve_deployment(config, request, ports=dynamo_dynamic_ports)
@pytest.mark.vllm
@pytest.mark.e2e
@pytest.mark.gpu_2
def test_multimodal_b64(request, runtime_services, predownload_models):
@pytest.mark.timeout(360) # Match VLLMConfig.timeout for this multimodal deployment
def test_multimodal_b64(
request, runtime_services_dynamic_ports, dynamo_dynamic_ports, predownload_models
):
"""
Test multimodal inference with base64 url passthrough.
......@@ -618,7 +633,10 @@ def test_multimodal_b64(request, runtime_services, predownload_models):
request_payloads=[b64_payload],
)
run_serve_deployment(config, request)
config = dataclasses.replace(
config, frontend_port=dynamo_dynamic_ports.frontend_port
)
run_serve_deployment(config, request, ports=dynamo_dynamic_ports)
# LoRA Test Directory
......@@ -628,7 +646,7 @@ lora_dir = os.path.join(vllm_dir, "launch/lora")
def lora_chat_payload(
lora_name: str,
s3_uri: str,
system_port: int = 8081,
system_port: int = DefaultPort.SYSTEM1.value,
repeat_count: int = 2,
expected_response: Optional[list] = None,
expected_log: Optional[list] = None,
......@@ -666,7 +684,11 @@ def lora_chat_payload(
@pytest.mark.timeout(600)
@pytest.mark.nightly
def test_lora_aggregated(
request, runtime_services, predownload_models, minio_lora_service
request,
runtime_services_dynamic_ports,
predownload_models,
minio_lora_service,
dynamo_dynamic_ports,
):
"""
Test LoRA inference with aggregated vLLM deployment.
......@@ -683,7 +705,7 @@ def test_lora_aggregated(
lora_payload = lora_chat_payload(
lora_name=minio_config.lora_name,
s3_uri=minio_config.get_s3_uri(),
system_port=8081,
system_port=DefaultPort.SYSTEM1.value,
repeat_count=2,
)
......@@ -699,7 +721,15 @@ def test_lora_aggregated(
request_payloads=[lora_payload],
)
run_serve_deployment(config, request, extra_env=minio_config.get_env_vars())
config = dataclasses.replace(
config, frontend_port=dynamo_dynamic_ports.frontend_port
)
run_serve_deployment(
config,
request,
ports=dynamo_dynamic_ports,
extra_env=minio_config.get_env_vars(),
)
@pytest.mark.vllm
......@@ -709,7 +739,11 @@ def test_lora_aggregated(
@pytest.mark.timeout(600)
@pytest.mark.nightly
def test_lora_aggregated_router(
request, runtime_services, predownload_models, minio_lora_service
request,
runtime_services_dynamic_ports,
predownload_models,
minio_lora_service,
dynamo_dynamic_ports,
):
"""
Test LoRA inference with aggregated vLLM deployment using KV router.
......@@ -723,19 +757,19 @@ def test_lora_aggregated_router(
minio_config: MinioLoraConfig = minio_lora_service
# Create payloads that load LoRA on both workers and test inference
# Worker 1 (port 8081)
# Worker 1 (DefaultPort.SYSTEM1)
lora_payload_worker1 = lora_chat_payload(
lora_name=minio_config.lora_name,
s3_uri=minio_config.get_s3_uri(),
system_port=8081,
system_port=DefaultPort.SYSTEM1.value,
repeat_count=1,
)
# Worker 2 (port 8082)
# Worker 2 (DefaultPort.SYSTEM2)
lora_payload_worker2 = lora_chat_payload(
lora_name=minio_config.lora_name,
s3_uri=minio_config.get_s3_uri(),
system_port=8082,
system_port=DefaultPort.SYSTEM2.value,
repeat_count=1,
)
......@@ -768,4 +802,9 @@ def test_lora_aggregated_router(
],
)
run_serve_deployment(config, request, extra_env=env_vars)
config = dataclasses.replace(
config, frontend_port=dynamo_dynamic_ports.frontend_port
)
run_serve_deployment(
config, request, ports=dynamo_dynamic_ports, extra_env=env_vars
)
......@@ -8,6 +8,7 @@ avoid importing from conftest and to keep values consistent.
"""
import os
from enum import IntEnum
QWEN = "Qwen/Qwen3-0.6B"
LLAMA = "deepseek-ai/DeepSeek-R1-Distill-Llama-8B" # on an l4 gpu, must limit --max-seq-len, otherwise it will not fit
......@@ -21,6 +22,16 @@ TEST_MODELS = [
QWEN_EMBEDDING,
]
# Default ports used by test payloads/scripts when not overridden.
# Tests that need xdist-safety should allocate real ports via fixtures and map
# these defaults to per-test ports at runtime.
class DefaultPort(IntEnum):
FRONTEND = 8000
SYSTEM1 = 8081
SYSTEM2 = 8082
# Env-driven defaults for specific test groups
# Allows overriding via environment variables
ROUTER_MODEL_NAME = os.environ.get("ROUTER_MODEL_NAME", QWEN)
......
......@@ -10,12 +10,15 @@ from typing import Any, Dict, List, Optional
import requests
from tests.utils.constants import DefaultPort
from tests.utils.managed_process import ManagedProcess
from tests.utils.payloads import BasePayload, check_health_generate, check_models_api
logger = logging.getLogger(__name__)
FRONTEND_PORT = 8000
FRONTEND_PORT = (
DefaultPort.FRONTEND.value
) # Do NOT use this in tests! Use allocate_port() instead.
class EngineResponseError(Exception):
......@@ -43,7 +46,7 @@ class EngineConfig:
script_name: Optional[str] = None
command: Optional[List[str]] = None
script_args: Optional[List[str]] = None
models_port: int = 8000
frontend_port: int = DefaultPort.FRONTEND.value
timeout: int = 600
delayed_start: int = 0
env: Dict[str, str] = field(default_factory=dict)
......@@ -174,9 +177,12 @@ class EngineProcess(ManagedProcess):
working_dir=config.directory,
health_check_ports=[],
health_check_urls=[
(f"http://localhost:{config.models_port}/v1/models", check_models_api),
(
f"http://localhost:{config.models_port}/health",
f"http://localhost:{config.frontend_port}/v1/models",
check_models_api,
),
(
f"http://localhost:{config.frontend_port}/health",
check_health_generate,
),
],
......
......@@ -27,6 +27,9 @@ from typing import Any, List, Optional
import psutil
import requests
from tests.utils.constants import DefaultPort
from tests.utils.port_utils import allocate_port, deallocate_port
def terminate_process(process, logger=logging.getLogger(), immediate_kill=False):
try:
......@@ -584,12 +587,54 @@ class DynamoFrontendProcess(ManagedProcess):
_logger = logging.getLogger()
def __init__(self, request):
command = ["python", "-m", "dynamo.frontend", "--router-mode", "round-robin"]
def __init__(
self,
request: Any,
*,
frontend_port: Optional[int] = None,
router_mode: str = "round-robin",
extra_args: Optional[list[str]] = None,
extra_env: Optional[dict[str, str]] = None,
):
# TODO: Refactor remaining duplicate "DynamoFrontendProcess" helpers in tests to
# use this shared implementation (and delete the copies):
# - tests/frontend/test_vllm.py
# - tests/frontend/test_completion_mocker_engine.py
# - tests/frontend/grpc/test_tensor_parameters.py
# - tests/frontend/grpc/test_tensor_mocker_engine.py
# - tests/router/common.py
# - tests/router/test_router_e2e_with_vllm.py
# - tests/router/test_router_e2e_with_sglang.py
# - tests/router/test_router_e2e_with_trtllm.py
# - tests/fault_tolerance/cancellation/utils.py
# - tests/fault_tolerance/migration/utils.py
# - tests/fault_tolerance/etcd_ha/utils.py
# - tests/fault_tolerance/test_vllm_health_check.py
self._allocated_http_port: Optional[int] = None
if frontend_port == 0:
# Treat `0` as "allocate a random free port" for xdist-safe tests.
# We allocate within the i16-safe range required by the Rust side.
frontend_port = allocate_port(DefaultPort.FRONTEND.value)
self._allocated_http_port = frontend_port
# If frontend_port is unset, dynamo.frontend defaults to DefaultPort.FRONTEND.
self.http_port = (
DefaultPort.FRONTEND.value if frontend_port is None else int(frontend_port)
)
command = ["python", "-m", "dynamo.frontend", "--router-mode", router_mode]
# dynamo.frontend defaults to 8000 when neither env nor flag is provided.
if frontend_port is not None:
command.extend(["--http-port", str(frontend_port)])
if extra_args:
command.extend(extra_args)
# Unset DYN_SYSTEM_PORT - frontend doesn't use system metrics server
env = os.environ.copy()
env.pop("DYN_SYSTEM_PORT", None)
if extra_env:
env.update(extra_env)
log_dir = f"{request.node.name}_frontend"
......@@ -609,16 +654,20 @@ class DynamoFrontendProcess(ManagedProcess):
log_dir=log_dir,
)
def __exit__(self, exc_type, exc_val, exc_tb):
try:
return super().__exit__(exc_type, exc_val, exc_tb)
finally:
if self._allocated_http_port is not None:
deallocate_port(self._allocated_http_port)
self._allocated_http_port = None
def main():
# NOTE: This entrypoint is for manual testing/debugging of `ManagedProcess` only.
# It is not used by the pytest suite.
with ManagedProcess(
command=[
"dynamo",
"run",
"in=http",
"out=vllm",
"deepseek-ai/DeepSeek-R1-Distill-Llama-8B",
],
command=["python", "-m", "dynamo.frontend"],
display_output=True,
terminate_existing=True,
health_check_ports=[8000],
......
......@@ -4,6 +4,7 @@
from typing import Any, Dict, List, Optional, Union
from tests.utils.client import send_request
from tests.utils.constants import DefaultPort
from tests.utils.payloads import (
ChatPayload,
ChatPayloadWithLogprobs,
......@@ -115,7 +116,7 @@ def metric_payload_default(
repeat_count: int = 1,
expected_log: Optional[List[str]] = None,
backend: Optional[str] = None,
port: int = 8081,
port: int = DefaultPort.SYSTEM1.value,
) -> MetricsPayload:
return MetricsPayload(
body={},
......
......@@ -18,12 +18,13 @@ import math
import re
import time
from copy import deepcopy
from dataclasses import dataclass
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, List, Optional
import requests
from dynamo import prometheus_names # type: ignore[attr-defined]
from tests.utils.constants import DefaultPort
logger = logging.getLogger(__name__)
......@@ -40,9 +41,12 @@ class BasePayload:
# Connection info
host: str = "localhost"
port: int = 8000
port: int = DefaultPort.FRONTEND.value
endpoint: str = ""
method: str = "POST"
# Optional additional ports used by specialized payloads (e.g. LoRA system/control-plane APIs).
# This is intentionally empty by default to preserve prior semantics.
system_ports: list[int] = field(default_factory=list)
def url(self) -> str:
ep = self.endpoint.lstrip("/")
......@@ -256,7 +260,7 @@ class LoraTestChatPayload(ChatPayload):
body: dict,
lora_name: str,
s3_uri: str,
system_port: int = 8081,
system_port: int = DefaultPort.SYSTEM1.value,
repeat_count: int = 1,
expected_response: Optional[list] = None,
expected_log: Optional[list] = None,
......@@ -269,7 +273,7 @@ class LoraTestChatPayload(ChatPayload):
expected_log=expected_log or [],
timeout=timeout,
)
self.system_port = system_port
self.system_ports = [system_port]
self.lora_name = lora_name
self.s3_uri = s3_uri
self._lora_loaded = False
......@@ -282,7 +286,7 @@ class LoraTestChatPayload(ChatPayload):
from tests.serve.lora_utils import load_lora_adapter
load_lora_adapter(
system_port=self.system_port,
system_port=self.system_ports[0],
lora_name=self.lora_name,
s3_uri=self.s3_uri,
timeout=self.timeout,
......@@ -455,7 +459,7 @@ class MetricCheck:
class MetricsPayload(BasePayload):
endpoint: str = "/metrics"
method: str = "GET"
port: int = 8081
port: int = DefaultPort.SYSTEM1.value
min_num_requests: int = 1
backend: Optional[
str
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment