Unverified Commit c22280cc authored by Keiven C's avatar Keiven C Committed by GitHub
Browse files

refactor: frontend/grpc tests to use dynamic ports (#4992)


Signed-off-by: default avatarKeiven Chang <keivenchang@users.noreply.github.com>
Co-authored-by: default avatarKeiven Chang <keivenchang@users.noreply.github.com>
parent cd7b1492
...@@ -242,6 +242,12 @@ def parse_args(): ...@@ -242,6 +242,12 @@ def parse_args():
default=False, default=False,
help="Start KServe gRPC server.", help="Start KServe gRPC server.",
) )
parser.add_argument(
"--grpc-metrics-port",
type=int,
default=8788,
help="HTTP metrics port for gRPC service (u16). Only used with --kserve-grpc-server. Defaults to 8788.",
)
add_config_dump_args(parser) add_config_dump_args(parser)
parser.add_argument( parser.add_argument(
"--custom-backend-metrics-endpoint", "--custom-backend-metrics-endpoint",
...@@ -372,6 +378,8 @@ async def async_main(): ...@@ -372,6 +378,8 @@ async def async_main():
kwargs["tls_key_path"] = flags.tls_key_path kwargs["tls_key_path"] = flags.tls_key_path
if flags.namespace: if flags.namespace:
kwargs["namespace"] = flags.namespace kwargs["namespace"] = flags.namespace
if flags.kserve_grpc_server and flags.grpc_metrics_port:
kwargs["http_metrics_port"] = flags.grpc_metrics_port
if flags.custom_backend_metrics_endpoint: if flags.custom_backend_metrics_endpoint:
kwargs[ kwargs[
"custom_backend_metrics_endpoint" "custom_backend_metrics_endpoint"
......
...@@ -154,6 +154,7 @@ pub(crate) struct EntrypointArgs { ...@@ -154,6 +154,7 @@ pub(crate) struct EntrypointArgs {
kv_cache_block_size: Option<u32>, kv_cache_block_size: Option<u32>,
http_host: Option<String>, http_host: Option<String>,
http_port: u16, http_port: u16,
http_metrics_port: Option<u16>,
tls_cert_path: Option<PathBuf>, tls_cert_path: Option<PathBuf>,
tls_key_path: Option<PathBuf>, tls_key_path: Option<PathBuf>,
extra_engine_args: Option<PathBuf>, extra_engine_args: Option<PathBuf>,
...@@ -168,7 +169,7 @@ pub(crate) struct EntrypointArgs { ...@@ -168,7 +169,7 @@ pub(crate) struct EntrypointArgs {
impl EntrypointArgs { impl EntrypointArgs {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
#[new] #[new]
#[pyo3(signature = (engine_type, model_path=None, model_name=None, endpoint_id=None, context_length=None, template_file=None, router_config=None, kv_cache_block_size=None, http_host=None, http_port=None, tls_cert_path=None, tls_key_path=None, extra_engine_args=None, namespace=None, custom_backend_metrics_endpoint=None, custom_backend_metrics_polling_interval=None, is_prefill=false, engine_factory=None))] #[pyo3(signature = (engine_type, model_path=None, model_name=None, endpoint_id=None, context_length=None, template_file=None, router_config=None, kv_cache_block_size=None, http_host=None, http_port=None, http_metrics_port=None, tls_cert_path=None, tls_key_path=None, extra_engine_args=None, namespace=None, custom_backend_metrics_endpoint=None, custom_backend_metrics_polling_interval=None, is_prefill=false, engine_factory=None))]
pub fn new( pub fn new(
py: Python<'_>, py: Python<'_>,
engine_type: EngineType, engine_type: EngineType,
...@@ -181,6 +182,7 @@ impl EntrypointArgs { ...@@ -181,6 +182,7 @@ impl EntrypointArgs {
kv_cache_block_size: Option<u32>, kv_cache_block_size: Option<u32>,
http_host: Option<String>, http_host: Option<String>,
http_port: Option<u16>, http_port: Option<u16>,
http_metrics_port: Option<u16>,
tls_cert_path: Option<PathBuf>, tls_cert_path: Option<PathBuf>,
tls_key_path: Option<PathBuf>, tls_key_path: Option<PathBuf>,
extra_engine_args: Option<PathBuf>, extra_engine_args: Option<PathBuf>,
...@@ -226,6 +228,7 @@ impl EntrypointArgs { ...@@ -226,6 +228,7 @@ impl EntrypointArgs {
kv_cache_block_size, kv_cache_block_size,
http_host, http_host,
http_port: http_port.unwrap_or(DEFAULT_HTTP_PORT), http_port: http_port.unwrap_or(DEFAULT_HTTP_PORT),
http_metrics_port,
tls_cert_path, tls_cert_path,
tls_key_path, tls_key_path,
extra_engine_args, extra_engine_args,
...@@ -267,6 +270,7 @@ pub fn make_engine<'p>( ...@@ -267,6 +270,7 @@ pub fn make_engine<'p>(
.router_config(args.router_config.clone().map(|rc| rc.into())) .router_config(args.router_config.clone().map(|rc| rc.into()))
.http_host(args.http_host.clone()) .http_host(args.http_host.clone())
.http_port(args.http_port) .http_port(args.http_port)
.http_metrics_port(args.http_metrics_port)
.tls_cert_path(args.tls_cert_path.clone()) .tls_cert_path(args.tls_cert_path.clone())
.tls_key_path(args.tls_key_path.clone()) .tls_key_path(args.tls_key_path.clone())
.is_mocker(matches!(args.engine_type, EngineType::Mocker)) .is_mocker(matches!(args.engine_type, EngineType::Mocker))
......
...@@ -21,10 +21,15 @@ pub async fn run( ...@@ -21,10 +21,15 @@ pub async fn run(
distributed_runtime: DistributedRuntime, distributed_runtime: DistributedRuntime,
engine_config: EngineConfig, engine_config: EngineConfig,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let grpc_service_builder = kserve::KserveService::builder() let mut grpc_service_builder = kserve::KserveService::builder()
.port(engine_config.local_model().http_port()) // [WIP] generalize port.. .port(engine_config.local_model().http_port()) // [WIP] generalize port..
.with_request_template(engine_config.local_model().request_template()); .with_request_template(engine_config.local_model().request_template());
// Set HTTP metrics port if provided (for parallel test execution)
if let Some(http_metrics_port) = engine_config.local_model().http_metrics_port() {
grpc_service_builder = grpc_service_builder.http_metrics_port(http_metrics_port);
}
let grpc_service = match engine_config { let grpc_service = match engine_config {
EngineConfig::Dynamic { ref model, .. } => { EngineConfig::Dynamic { ref model, .. } => {
let grpc_service = grpc_service_builder.build()?; let grpc_service = grpc_service_builder.build()?;
......
...@@ -43,6 +43,7 @@ pub struct LocalModelBuilder { ...@@ -43,6 +43,7 @@ pub struct LocalModelBuilder {
kv_cache_block_size: u32, kv_cache_block_size: u32,
http_host: Option<String>, http_host: Option<String>,
http_port: u16, http_port: u16,
http_metrics_port: Option<u16>,
tls_cert_path: Option<PathBuf>, tls_cert_path: Option<PathBuf>,
tls_key_path: Option<PathBuf>, tls_key_path: Option<PathBuf>,
migration_limit: u32, migration_limit: u32,
...@@ -64,6 +65,7 @@ impl Default for LocalModelBuilder { ...@@ -64,6 +65,7 @@ impl Default for LocalModelBuilder {
kv_cache_block_size: DEFAULT_KV_CACHE_BLOCK_SIZE, kv_cache_block_size: DEFAULT_KV_CACHE_BLOCK_SIZE,
http_host: Default::default(), http_host: Default::default(),
http_port: DEFAULT_HTTP_PORT, http_port: DEFAULT_HTTP_PORT,
http_metrics_port: None,
tls_cert_path: Default::default(), tls_cert_path: Default::default(),
tls_key_path: Default::default(), tls_key_path: Default::default(),
model_path: Default::default(), model_path: Default::default(),
...@@ -125,6 +127,11 @@ impl LocalModelBuilder { ...@@ -125,6 +127,11 @@ impl LocalModelBuilder {
self self
} }
pub fn http_metrics_port(&mut self, port: Option<u16>) -> &mut Self {
self.http_metrics_port = port;
self
}
pub fn tls_cert_path(&mut self, p: Option<PathBuf>) -> &mut Self { pub fn tls_cert_path(&mut self, p: Option<PathBuf>) -> &mut Self {
self.tls_cert_path = p; self.tls_cert_path = p;
self self
...@@ -259,6 +266,7 @@ impl LocalModelBuilder { ...@@ -259,6 +266,7 @@ impl LocalModelBuilder {
template, template,
http_host: self.http_host.take(), http_host: self.http_host.take(),
http_port: self.http_port, http_port: self.http_port,
http_metrics_port: self.http_metrics_port,
tls_cert_path: self.tls_cert_path.take(), tls_cert_path: self.tls_cert_path.take(),
tls_key_path: self.tls_key_path.take(), tls_key_path: self.tls_key_path.take(),
router_config: self.router_config.take().unwrap_or_default(), router_config: self.router_config.take().unwrap_or_default(),
...@@ -311,6 +319,7 @@ impl LocalModelBuilder { ...@@ -311,6 +319,7 @@ impl LocalModelBuilder {
template, template,
http_host: self.http_host.take(), http_host: self.http_host.take(),
http_port: self.http_port, http_port: self.http_port,
http_metrics_port: self.http_metrics_port,
tls_cert_path: self.tls_cert_path.take(), tls_cert_path: self.tls_cert_path.take(),
tls_key_path: self.tls_key_path.take(), tls_key_path: self.tls_key_path.take(),
router_config: self.router_config.take().unwrap_or_default(), router_config: self.router_config.take().unwrap_or_default(),
...@@ -330,6 +339,7 @@ pub struct LocalModel { ...@@ -330,6 +339,7 @@ pub struct LocalModel {
template: Option<RequestTemplate>, template: Option<RequestTemplate>,
http_host: Option<String>, http_host: Option<String>,
http_port: u16, http_port: u16,
http_metrics_port: Option<u16>,
tls_cert_path: Option<PathBuf>, tls_cert_path: Option<PathBuf>,
tls_key_path: Option<PathBuf>, tls_key_path: Option<PathBuf>,
router_config: RouterConfig, router_config: RouterConfig,
...@@ -380,6 +390,10 @@ impl LocalModel { ...@@ -380,6 +390,10 @@ impl LocalModel {
self.http_port self.http_port
} }
pub fn http_metrics_port(&self) -> Option<u16> {
self.http_metrics_port
}
pub fn tls_cert_path(&self) -> Option<&Path> { pub fn tls_cert_path(&self) -> Option<&Path> {
self.tls_cert_path.as_deref() self.tls_cert_path.as_deref()
} }
......
...@@ -6,14 +6,15 @@ import os ...@@ -6,14 +6,15 @@ import os
import shutil import shutil
import tempfile import tempfile
from pathlib import Path from pathlib import Path
from typing import Optional from typing import Generator, Optional
import pytest import pytest
from filelock import FileLock from filelock import FileLock
from tests.utils.constants import TEST_MODELS from tests.utils.constants import TEST_MODELS, DefaultPort
from tests.utils.managed_process import ManagedProcess from tests.utils.managed_process import ManagedProcess
from tests.utils.port_utils import ( from tests.utils.port_utils import (
ServicePorts,
allocate_port, allocate_port,
allocate_ports, allocate_ports,
deallocate_port, deallocate_port,
...@@ -631,3 +632,37 @@ def file_storage_backend(): ...@@ -631,3 +632,37 @@ def file_storage_backend():
os.environ["DYN_FILE_KV"] = old_env os.environ["DYN_FILE_KV"] = old_env
else: else:
os.environ.pop("DYN_FILE_KV", None) os.environ.pop("DYN_FILE_KV", None)
########################################################
# Shared Port Allocation (Dynamo deployments)
########################################################
@pytest.fixture(scope="function")
def num_system_ports(request) -> int:
"""Number of system ports to allocate for this test.
Default: 1 port.
Tests that need multiple system ports (e.g. SYSTEM_PORT1 + SYSTEM_PORT2) must
explicitly request them via indirect parametrization:
@pytest.mark.parametrize("num_system_ports", [2], indirect=True)
"""
return getattr(request, "param", 1)
@pytest.fixture(scope="function")
def dynamo_dynamic_ports(num_system_ports) -> Generator[ServicePorts, None, None]:
"""Allocate per-test ports for Dynamo deployments.
- frontend_port: OpenAI-compatible HTTP/gRPC ingress (dynamo.frontend)
- system_ports: List of worker metrics/system ports (configurable count via num_system_ports)
"""
frontend_port = allocate_port(DefaultPort.FRONTEND.value)
system_port_list = allocate_ports(num_system_ports, DefaultPort.SYSTEM1.value)
all_ports = [frontend_port, *system_port_list]
try:
yield ServicePorts(frontend_port=frontend_port, system_ports=system_port_list)
finally:
deallocate_ports(all_ports)
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""Shared fixtures for frontend tests."""
import logging
import os
import shutil
import time
import pytest
import requests
import tritonclient.grpc as grpcclient
from tests.utils.constants import QWEN
from tests.utils.managed_process import DynamoFrontendProcess, ManagedProcess
from tests.utils.port_utils import allocate_port, deallocate_port
logger = logging.getLogger(__name__)
@pytest.fixture(scope="function")
def start_services_with_http(
request, runtime_services_dynamic_ports, dynamo_dynamic_ports
):
"""Start HTTP frontend with dynamic ports.
Function-scoped to allow parallel test execution.
Each test gets its own HTTP frontend on a unique port.
Uses runtime_services_dynamic_ports for truly dynamic NATS/Etcd ports.
Individual test files should start their specific worker processes.
Yields:
Tuple of (frontend_port, system_port) for use by worker processes
"""
ports = dynamo_dynamic_ports
# In xdist/parallel runs, never kill other workers' frontends.
with DynamoFrontendProcess(
request,
frontend_port=ports.frontend_port,
terminate_existing=False,
):
logger.info(f"HTTP Frontend started on port {ports.frontend_port}")
yield ports.frontend_port, ports.system_ports[0]
def check_grpc_server_ready(
port: int, max_attempts: int = 30, retry_delay: float = 0.5
) -> bool:
"""Check if gRPC server is ready to accept connections.
Args:
port: gRPC server port
max_attempts: Maximum number of connection attempts
retry_delay: Delay between retry attempts in seconds
Returns:
True if server is ready
Raises:
Exception: If server is not ready after max_attempts
"""
for attempt in range(max_attempts):
try:
client = grpcclient.InferenceServerClient(f"localhost:{port}")
if client.is_server_ready():
logger.info(
f"gRPC server is ready on port {port} (attempt {attempt + 1}/{max_attempts})"
)
# Add delay after readiness check to ensure server is fully stable for parallel tests
# Retry the check once more to confirm stability
time.sleep(0.5)
if client.is_server_ready():
logger.info(f"gRPC server confirmed stable on port {port}")
return True
else:
logger.warning(
f"gRPC server became unstable on port {port}, retrying..."
)
continue
except Exception as e:
if attempt < max_attempts - 1:
logger.debug(f"gRPC server not ready on attempt {attempt + 1}: {e}")
time.sleep(retry_delay)
else:
logger.error(
f"gRPC server not ready after {max_attempts} attempts: {e}"
)
raise
return False
def wait_for_http_completions_ready(
*,
frontend_port: int,
model: str,
max_attempts: int = 30,
retry_delay: float = 0.25,
) -> None:
"""Wait until the HTTP completions route can actually serve the given model.
Why this exists:
- `/v1/models` can list a model slightly before the HTTP completions route is
ready to route requests to it (under xdist parallel startup).
- If we start sending requests immediately, we can intermittently get 404
"Model not found" even though the model shows up in `/v1/models`.
"""
payload = {"model": model, "prompt": "ping", "max_tokens": 1}
last_status: int | None = None
last_body: str = ""
for attempt in range(max_attempts):
try:
resp = requests.post(
f"http://localhost:{frontend_port}/v1/completions",
json=payload,
timeout=10,
)
last_status = resp.status_code
last_body = resp.text
if resp.status_code == 200:
return
# Common transient during startup: model is discovered but not routable yet.
if resp.status_code == 404 and "Model not found" in resp.text:
time.sleep(retry_delay)
continue
# Any other error is likely real (e.g. schema validation changed).
time.sleep(retry_delay)
except requests.RequestException as e:
last_body = str(e)
time.sleep(retry_delay)
raise RuntimeError(
"HTTP completions route did not become ready "
f"(frontend_port={frontend_port}, model={model}, "
f"last_status={last_status}, last_body={last_body})"
)
@pytest.fixture(scope="function")
def start_services_with_grpc(
request, runtime_services_dynamic_ports, dynamo_dynamic_ports
):
"""Start gRPC frontend with dynamic ports.
Function-scoped to allow parallel test execution.
Each test gets its own gRPC frontend on a unique port.
Uses runtime_services_dynamic_ports which provides isolated NATS/Etcd per test,
so no namespace conflicts - each test has its own Etcd/NATS instance!
Allocates an additional port for HTTP metrics server (used by gRPC service internally)
to enable parallel test execution without port 8788 conflicts.
Individual test files should start their specific worker processes.
Yields:
Tuple of (frontend_port, system_port) for use by worker processes
"""
ports = dynamo_dynamic_ports
# Allocate additional port for HTTP metrics server (gRPC service requirement)
grpc_metrics_port = allocate_port(8788)
try:
with DynamoFrontendProcess(
request,
frontend_port=ports.frontend_port,
terminate_existing=False,
extra_args=[
"--kserve-grpc-server",
"--grpc-metrics-port",
str(grpc_metrics_port),
],
):
logger.info(
f"gRPC Frontend starting on port {ports.frontend_port} "
f"(metrics on {grpc_metrics_port})"
)
check_grpc_server_ready(ports.frontend_port)
yield ports.frontend_port, ports.system_ports[0]
finally:
deallocate_port(grpc_metrics_port)
########################################################
# Shared Worker Classes
########################################################
class MockerWorkerProcess(ManagedProcess):
"""Shared mocker worker process for frontend tests.
Uses dynamo.mocker with configurable model and speedup ratio.
Can be used by any frontend test that needs a fast mock backend.
"""
def __init__(
self,
request,
model: str,
frontend_port: int,
system_port: int,
speedup_ratio: int = 100,
worker_id: str = "mocker-worker",
):
self.worker_id = worker_id
self.frontend_port = frontend_port
self.system_port = system_port
command = [
"python3",
"-m",
"dynamo.mocker",
"--model-path",
model,
"--speedup-ratio",
str(speedup_ratio),
]
env = os.environ.copy()
env["DYN_LOG"] = "debug"
env["DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS"] = '["generate"]'
env["DYN_SYSTEM_PORT"] = str(system_port)
log_dir = f"{request.node.name}_{worker_id}"
try:
shutil.rmtree(log_dir)
except FileNotFoundError:
pass
super().__init__(
command=command,
env=env,
health_check_urls=[
(f"http://localhost:{frontend_port}/v1/models", self._check_models_api),
(f"http://localhost:{system_port}/health", self.is_ready),
],
timeout=300,
display_output=True,
terminate_existing=False,
stragglers=["VLLM::EngineCore"],
straggler_commands=["-m dynamo.mocker"],
log_dir=log_dir,
)
def _check_models_api(self, response):
"""Check if models API is ready"""
try:
if response.status_code != 200:
return False
data = response.json()
models = data.get("data", [])
return len(models) > 0
except Exception:
return False
def is_ready(self, response) -> bool:
try:
status = (response.json() or {}).get("status")
except ValueError:
logger.warning("%s health response is not valid JSON", self.worker_id)
return False
is_ready = status == "ready"
if is_ready:
logger.info("%s status is ready", self.worker_id)
else:
logger.warning("%s status is not ready: %s", self.worker_id, status)
return is_ready
@pytest.fixture(scope="function")
def start_services_with_mocker(
request, start_services_with_http, predownload_tokenizers
):
"""Start mocker worker with the shared HTTP frontend.
Function-scoped to allow parallel test execution.
Each test gets its own frontend + mocker worker on unique ports.
Yields:
frontend_port: Port where frontend is running
"""
frontend_port, system_port = start_services_with_http
# Default to QWEN for compatibility; per-test model selection not yet implemented.
model = QWEN
with MockerWorkerProcess(request, model, frontend_port, system_port):
wait_for_http_completions_ready(frontend_port=frontend_port, model=model)
logger.info(f"Mocker Worker started for test on port {frontend_port}")
yield frontend_port
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
# Parallelization: Hermetic test (xdist-safe via dynamic ports).
# Tested on: Linux (Ubuntu 24.04 container), Intel(R) Core(TM) i9-14900K, 32 vCPU.
# Combined pre_merge wall time (this file + test_tensor_parameters.py):
# - Serialized: 87.48s.
# - Parallel (-n auto): 25.27s (62.21s saved, 3.46x).
# GPU Requirement: gpu_0 (CPU-only, echo worker does not use GPU)
"""gRPC tensor echo test with mocker worker."""
from __future__ import annotations from __future__ import annotations
...@@ -11,7 +19,6 @@ import shutil ...@@ -11,7 +19,6 @@ import shutil
import pytest import pytest
import triton_echo_client import triton_echo_client
from tests.conftest import EtcdServer, NatsServer
from tests.utils.constants import QWEN from tests.utils.constants import QWEN
from tests.utils.managed_process import ManagedProcess from tests.utils.managed_process import ManagedProcess
...@@ -20,38 +27,10 @@ logger = logging.getLogger(__name__) ...@@ -20,38 +27,10 @@ logger = logging.getLogger(__name__)
TEST_MODEL = QWEN TEST_MODEL = QWEN
class DynamoFrontendProcess(ManagedProcess):
"""Process manager for Dynamo frontend"""
def __init__(self, request):
command = ["python", "-m", "dynamo.frontend", "--kserve-grpc-server"]
# Unset DYN_SYSTEM_PORT - frontend doesn't use system metrics server
env = os.environ.copy()
env.pop("DYN_SYSTEM_PORT", None)
log_dir = f"{request.node.name}_frontend"
# Clean up any existing log directory from previous runs
try:
shutil.rmtree(log_dir)
logger.info(f"Cleaned up existing log directory: {log_dir}")
except FileNotFoundError:
# Directory doesn't exist, which is fine
pass
super().__init__(
command=command,
env=env,
display_output=True,
terminate_existing=True,
log_dir=log_dir,
)
class MockWorkerProcess(ManagedProcess): class MockWorkerProcess(ManagedProcess):
def __init__(self, request, worker_id: str = "mocker-worker"): def __init__(self, request, system_port: int, worker_id: str = "mocker-worker"):
self.worker_id = worker_id self.worker_id = worker_id
self.system_port = system_port
command = [ command = [
"python3", "python3",
...@@ -61,7 +40,7 @@ class MockWorkerProcess(ManagedProcess): ...@@ -61,7 +40,7 @@ class MockWorkerProcess(ManagedProcess):
env = os.environ.copy() env = os.environ.copy()
env["DYN_LOG"] = "debug" env["DYN_LOG"] = "debug"
env["DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS"] = '["generate"]' env["DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS"] = '["generate"]'
env["DYN_SYSTEM_PORT"] = "8083" env["DYN_SYSTEM_PORT"] = str(system_port)
log_dir = f"{request.node.name}_{worker_id}" log_dir = f"{request.node.name}_{worker_id}"
...@@ -75,8 +54,8 @@ class MockWorkerProcess(ManagedProcess): ...@@ -75,8 +54,8 @@ class MockWorkerProcess(ManagedProcess):
env=env, env=env,
health_check_urls=[ health_check_urls=[
# gRPC doesn't expose endpoint for listing models, so skip this check # gRPC doesn't expose endpoint for listing models, so skip this check
# ("http://localhost:8000/v1/models", check_models_api), # (f"http://localhost:{grpc_port}/v1/models", check_models_api),
("http://localhost:8083/health", self.is_ready), (f"http://localhost:{system_port}/health", self.is_ready),
], ],
timeout=300, timeout=300,
display_output=True, display_output=True,
...@@ -101,30 +80,29 @@ class MockWorkerProcess(ManagedProcess): ...@@ -101,30 +80,29 @@ class MockWorkerProcess(ManagedProcess):
return is_ready return is_ready
@pytest.fixture(scope="module") @pytest.fixture(scope="function")
def runtime_services(request): def start_services_with_echo_worker(request, start_services_with_grpc):
"""Module-scoped runtime services for this test file.""" """Start echo worker with the shared gRPC frontend.
with NatsServer(request) as nats_process:
with EtcdServer(request) as etcd_process:
yield nats_process, etcd_process
@pytest.fixture(scope="module") Function-scoped to allow parallel test execution.
def start_services(request, runtime_services): Each test gets its own gRPC frontend + echo worker on unique ports.
"""Start frontend and worker processes once for this module's tests.""" No namespace conflicts because runtime_services_dynamic_ports provides isolated Etcd/NATS.
with DynamoFrontendProcess(request): """
logger.info("Frontend started for tests") frontend_port, system_port = start_services_with_grpc
with MockWorkerProcess(request): with MockWorkerProcess(request, system_port):
logger.info("Worker started for tests") logger.info(f"gRPC Echo Worker started for test on port {frontend_port}")
yield yield frontend_port
@pytest.mark.usefixtures("start_services")
@pytest.mark.pre_merge @pytest.mark.pre_merge
@pytest.mark.gpu_1 @pytest.mark.gpu_0 # Echo worker is CPU-only (no GPU required)
@pytest.mark.parallel
@pytest.mark.integration @pytest.mark.integration
@pytest.mark.model(TEST_MODEL) @pytest.mark.model(TEST_MODEL)
def test_echo() -> None: def test_echo(start_services_with_echo_worker) -> None:
triton_echo_client.check_health() frontend_port = start_services_with_echo_worker
triton_echo_client.run_infer() # Use a per-test client instance to avoid cross-test/global state issues.
triton_echo_client.get_config() client = triton_echo_client.TritonEchoClient(grpc_port=frontend_port)
client.check_health()
client.run_infer()
client.get_config()
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
# Parallelization: Hermetic test (xdist-safe via dynamic ports).
# Tested on: Linux (Ubuntu 24.04 container), Intel(R) Core(TM) i9-14900K, 32 vCPU.
# Combined pre_merge wall time (this file + test_tensor_mocker_engine.py):
# - Serialized: 87.48s.
# - Parallel (-n auto): 25.27s (62.21s saved, 3.46x).
# GPU Requirement: gpu_0 (CPU-only, tensor echo worker does not use GPU)
"""Test gRPC parameter passing with tensor models.""" """Test gRPC parameter passing with tensor models."""
import logging import logging
...@@ -16,27 +23,10 @@ from tests.utils.managed_process import ManagedProcess ...@@ -16,27 +23,10 @@ from tests.utils.managed_process import ManagedProcess
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class DynamoFrontendProcess(ManagedProcess):
def __init__(self, request):
command = ["python", "-m", "dynamo.frontend", "--kserve-grpc-server"]
log_dir = f"{request.node.name}_frontend"
shutil.rmtree(log_dir, ignore_errors=True)
# Unset DYN_SYSTEM_PORT - frontend doesn't use system metrics server
env = os.environ.copy()
env.pop("DYN_SYSTEM_PORT", None)
super().__init__(
command=command,
env=env,
display_output=True,
terminate_existing=True,
log_dir=log_dir,
)
class EchoTensorWorkerProcess(ManagedProcess): class EchoTensorWorkerProcess(ManagedProcess):
def __init__(self, request): def __init__(self, request, system_port: int):
self.system_port = system_port
command = [ command = [
"python3", "python3",
os.path.join(os.path.dirname(__file__), "echo_tensor_worker.py"), os.path.join(os.path.dirname(__file__), "echo_tensor_worker.py"),
...@@ -45,7 +35,9 @@ class EchoTensorWorkerProcess(ManagedProcess): ...@@ -45,7 +35,9 @@ class EchoTensorWorkerProcess(ManagedProcess):
env = os.environ.copy() env = os.environ.copy()
env["DYN_LOG"] = "debug" env["DYN_LOG"] = "debug"
env["DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS"] = '["generate"]' env["DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS"] = '["generate"]'
env["DYN_SYSTEM_PORT"] = "8083" env["DYN_SYSTEM_PORT"] = str(system_port)
# Each test gets its own Etcd/NATS from runtime_services_dynamic_ports,
# so no namespace conflicts - use default "tensor" namespace
log_dir = f"{request.node.name}_worker" log_dir = f"{request.node.name}_worker"
shutil.rmtree(log_dir, ignore_errors=True) shutil.rmtree(log_dir, ignore_errors=True)
...@@ -55,22 +47,29 @@ class EchoTensorWorkerProcess(ManagedProcess): ...@@ -55,22 +47,29 @@ class EchoTensorWorkerProcess(ManagedProcess):
env=env, env=env,
health_check_urls=[ health_check_urls=[
( (
"http://localhost:8083/health", f"http://localhost:{system_port}/health",
lambda r: r.json().get("status") == "ready", lambda r: r.json().get("status") == "ready",
) )
], ],
timeout=300, timeout=300,
display_output=True, display_output=True,
log_dir=log_dir, log_dir=log_dir,
terminate_existing=False,
) )
@pytest.fixture() @pytest.fixture(scope="function")
def start_services(request, runtime_services): def start_services_with_echo_tensor_worker(request, start_services_with_grpc):
"""Start frontend and worker with fresh etcd/nats.""" """Start echo tensor worker with the shared gRPC frontend.
with DynamoFrontendProcess(request):
with EchoTensorWorkerProcess(request): Function-scoped to allow parallel test execution.
yield Each test gets its own gRPC frontend + echo tensor worker on unique ports.
No namespace conflicts because runtime_services_dynamic_ports provides isolated Etcd/NATS.
"""
frontend_port, system_port = start_services_with_grpc
with EchoTensorWorkerProcess(request, system_port):
logger.info(f"Echo Tensor Worker started for test on port {frontend_port}")
yield frontend_port
def extract_params(param_map) -> dict: def extract_params(param_map) -> dict:
...@@ -92,7 +91,8 @@ def extract_params(param_map) -> dict: ...@@ -92,7 +91,8 @@ def extract_params(param_map) -> dict:
@pytest.mark.e2e @pytest.mark.e2e
@pytest.mark.pre_merge @pytest.mark.pre_merge
@pytest.mark.gpu_1 @pytest.mark.gpu_0 # Echo tensor worker is CPU-only (no GPU required)
@pytest.mark.parallel
@pytest.mark.parametrize( @pytest.mark.parametrize(
"request_params", "request_params",
[ [
...@@ -102,14 +102,17 @@ def extract_params(param_map) -> dict: ...@@ -102,14 +102,17 @@ def extract_params(param_map) -> dict:
], ],
ids=["no_params", "numeric_param", "mixed_params"], ids=["no_params", "numeric_param", "mixed_params"],
) )
def test_request_parameters(file_storage_backend, start_services, request_params): def test_request_parameters(
file_storage_backend, start_services_with_echo_tensor_worker, request_params
):
"""Test gRPC request-level parameters are echoed through tensor models. """Test gRPC request-level parameters are echoed through tensor models.
The worker acts as an identity function: echoes input tensors unchanged and The worker acts as an identity function: echoes input tensors unchanged and
returns all request parameters plus a "processed" flag to verify the complete returns all request parameters plus a "processed" flag to verify the complete
parameter flow through the gRPC frontend. parameter flow through the gRPC frontend.
""" """
client = grpcclient.InferenceServerClient("localhost:8000") frontend_port = start_services_with_echo_tensor_worker
client = grpcclient.InferenceServerClient(f"localhost:{frontend_port}")
input_data = np.array([1.0, 2.0, 3.0, 4.0], dtype=np.float32) input_data = np.array([1.0, 2.0, 3.0, 4.0], dtype=np.float32)
inputs = [grpcclient.InferInput("INPUT", input_data.shape, "FP32")] inputs = [grpcclient.InferInput("INPUT", input_data.shape, "FP32")]
...@@ -118,6 +121,7 @@ def test_request_parameters(file_storage_backend, start_services, request_params ...@@ -118,6 +121,7 @@ def test_request_parameters(file_storage_backend, start_services, request_params
response = client.infer("echo", inputs=inputs, parameters=request_params) response = client.infer("echo", inputs=inputs, parameters=request_params)
output_data = response.as_numpy("INPUT") output_data = response.as_numpy("INPUT")
assert output_data is not None, "Expected response to include output tensor 'INPUT'"
assert np.array_equal(input_data, output_data) assert np.array_equal(input_data, output_data)
response_msg = response.get_response() response_msg = response.get_response()
......
...@@ -5,53 +5,67 @@ ...@@ -5,53 +5,67 @@
import numpy as np import numpy as np
import tritonclient.grpc as grpcclient import tritonclient.grpc as grpcclient
SERVER_URL = "localhost:8000"
class TritonEchoClient:
def check_health(): """Thin, per-instance Triton gRPC client wrapper used by frontend gRPC tests.
triton_client = grpcclient.InferenceServerClient(url=SERVER_URL)
assert triton_client.is_server_live() Why this exists:
assert triton_client.is_server_ready() - Some tests run under pytest-xdist or in threaded contexts.
assert triton_client.is_model_ready("echo") - Mutating module globals (like GRPC_PORT) is not thread-safe and can cause
cross-test contamination.
"""
def run_infer():
triton_client = grpcclient.InferenceServerClient(url=SERVER_URL) def __init__(self, *, grpc_host: str = "localhost", grpc_port: int = 8000):
self._grpc_host = grpc_host
model_name = "echo" self._grpc_port = int(grpc_port)
# Infer def _server_url(self) -> str:
inputs = [] return f"{self._grpc_host}:{self._grpc_port}"
inputs.append(grpcclient.InferInput("INPUT0", [16], "INT32"))
inputs.append(grpcclient.InferInput("INPUT1", [16], "BYTES")) def _client(self) -> grpcclient.InferenceServerClient:
return grpcclient.InferenceServerClient(url=self._server_url())
# Create the data for the two input tensors. Initialize the first
# to unique integers and the second to all ones. def check_health(self) -> None:
input0_data = np.arange(start=0, stop=16, dtype=np.int32).reshape([16]) triton_client = self._client()
input1_data = np.array( assert triton_client.is_server_live()
[str(x).encode("utf-8") for x in input0_data.reshape(input0_data.size)], assert triton_client.is_server_ready()
dtype=np.object_, assert triton_client.is_model_ready("echo")
).reshape([16])
def run_infer(self) -> None:
# Initialize the data triton_client = self._client()
inputs[0].set_data_from_numpy(input0_data) model_name = "echo"
inputs[1].set_data_from_numpy(input1_data)
inputs = [
# Test with outputs grpcclient.InferInput("INPUT0", [16], "INT32"),
results = triton_client.infer(model_name=model_name, inputs=inputs) grpcclient.InferInput("INPUT1", [16], "BYTES"),
]
# Get the output arrays from the results
output0_data = results.as_numpy("INPUT0") input0_data = np.arange(start=0, stop=16, dtype=np.int32).reshape([16])
output1_data = results.as_numpy("INPUT1") input1_data = np.array(
[str(x).encode("utf-8") for x in input0_data.reshape(input0_data.size)],
assert np.array_equal(input0_data, output0_data) dtype=np.object_,
assert np.array_equal(input1_data, output1_data) ).reshape([16])
inputs[0].set_data_from_numpy(input0_data)
def get_config(): inputs[1].set_data_from_numpy(input1_data)
triton_client = grpcclient.InferenceServerClient(url=SERVER_URL)
results = triton_client.infer(model_name=model_name, inputs=inputs)
model_name = "echo"
response = triton_client.get_model_config(model_name=model_name) output0_data = results.as_numpy("INPUT0")
# Check one of the field that can only be set by providing Triton model config output1_data = results.as_numpy("INPUT1")
assert response.config.model_transaction_policy.decoupled
assert (
output0_data is not None
), "Expected response to include output tensor 'INPUT0'"
assert (
output1_data is not None
), "Expected response to include output tensor 'INPUT1'"
assert np.array_equal(input0_data, output0_data)
assert np.array_equal(input1_data, output1_data)
def get_config(self) -> None:
triton_client = self._client()
model_name = "echo"
response = triton_client.get_model_config(model_name=model_name)
# Check one of the field that can only be set by providing Triton model config
assert response.config.model_transaction_policy.decoupled
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
# Parallelization: Hermetic test (xdist-safe via dynamic ports).
# Tested on: Linux (Ubuntu 24.04 container), Intel(R) Core(TM) i9-14900K, 32 vCPU.
# post_merge wall time:
# - Serialized: 97.29s.
# - Parallel (-n auto): 30.29s (67.00s saved, 3.21x).
# GPU Requirement: gpu_0 (CPU-only, mocker does not use GPU)
from __future__ import annotations from __future__ import annotations
import logging import logging
import os
import shutil
import time import time
from typing import Any, Dict from typing import Any, Dict
import pytest import pytest
import requests import requests
from tests.conftest import EtcdServer, NatsServer
from tests.utils.constants import QWEN from tests.utils.constants import QWEN
from tests.utils.managed_process import ManagedProcess
from tests.utils.payloads import check_models_api
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -24,99 +25,16 @@ TEST_MODEL = QWEN ...@@ -24,99 +25,16 @@ TEST_MODEL = QWEN
pytestmark = [ pytestmark = [
pytest.mark.e2e, pytest.mark.e2e,
pytest.mark.gpu_1, pytest.mark.gpu_0, # Mocker is CPU-only (no GPU required)
pytest.mark.post_merge, pytest.mark.post_merge,
pytest.mark.parallel,
pytest.mark.model(TEST_MODEL), pytest.mark.model(TEST_MODEL),
] ]
class DynamoFrontendProcess(ManagedProcess):
"""Process manager for Dynamo frontend"""
def __init__(self, request):
command = ["python", "-m", "dynamo.frontend", "--router-mode", "round-robin"]
# Unset DYN_SYSTEM_PORT - frontend doesn't use system metrics server
env = os.environ.copy()
env.pop("DYN_SYSTEM_PORT", None)
log_dir = f"{request.node.name}_frontend"
# Clean up any existing log directory from previous runs
try:
shutil.rmtree(log_dir)
logger.info(f"Cleaned up existing log directory: {log_dir}")
except FileNotFoundError:
# Directory doesn't exist, which is fine
pass
super().__init__(
command=command,
env=env,
display_output=True,
terminate_existing=True,
log_dir=log_dir,
)
class MockWorkerProcess(ManagedProcess):
def __init__(self, request, worker_id: str = "mocker-worker"):
self.worker_id = worker_id
command = [
"python3",
"-m",
"dynamo.mocker",
"--model-path",
TEST_MODEL,
"--speedup-ratio",
"100",
]
env = os.environ.copy()
env["DYN_LOG"] = "debug"
env["DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS"] = '["generate"]'
env["DYN_SYSTEM_PORT"] = "8083"
log_dir = f"{request.node.name}_{worker_id}"
try:
shutil.rmtree(log_dir)
except FileNotFoundError:
pass
super().__init__(
command=command,
env=env,
health_check_urls=[
("http://localhost:8000/v1/models", check_models_api),
("http://localhost:8083/health", self.is_ready),
],
timeout=300,
display_output=True,
terminate_existing=False,
stragglers=["VLLM::EngineCore"],
straggler_commands=["-m dynamo.mocker"],
log_dir=log_dir,
)
def is_ready(self, response) -> bool:
try:
status = (response.json() or {}).get("status")
except ValueError:
logger.warning("%s health response is not valid JSON", self.worker_id)
return False
is_ready = status == "ready"
if is_ready:
logger.info("%s status is ready", self.worker_id)
else:
logger.warning("%s status is not ready: %s", self.worker_id, status)
return is_ready
def _send_completion_request( def _send_completion_request(
payload: Dict[str, Any], payload: Dict[str, Any],
frontend_port: int,
timeout: int = 180, timeout: int = 180,
) -> requests.Response: ) -> requests.Response:
"""Send a text completion request""" """Send a text completion request"""
...@@ -125,7 +43,7 @@ def _send_completion_request( ...@@ -125,7 +43,7 @@ def _send_completion_request(
print(f"Sending request: {time.time()}") print(f"Sending request: {time.time()}")
response = requests.post( response = requests.post(
"http://localhost:8000/v1/completions", f"http://localhost:{frontend_port}/v1/completions",
headers=headers, headers=headers,
json=payload, json=payload,
timeout=timeout, timeout=timeout,
...@@ -133,33 +51,15 @@ def _send_completion_request( ...@@ -133,33 +51,15 @@ def _send_completion_request(
return response return response
@pytest.fixture(scope="module") def test_completion_string_prompt(start_services_with_mocker) -> None:
def runtime_services(request): frontend_port = start_services_with_mocker
"""Module-scoped runtime services for this test file."""
with NatsServer(request) as nats_process:
with EtcdServer(request) as etcd_process:
yield nats_process, etcd_process
@pytest.fixture(scope="module")
def start_services(request, runtime_services, predownload_tokenizers):
"""Start frontend and worker processes once for this module's tests."""
with DynamoFrontendProcess(request):
logger.info("Frontend started for tests")
with MockWorkerProcess(request):
logger.info("Worker started for tests")
yield
@pytest.mark.usefixtures("start_services")
def test_completion_string_prompt() -> None:
payload: Dict[str, Any] = { payload: Dict[str, Any] = {
"model": TEST_MODEL, "model": TEST_MODEL,
"prompt": "Tell me about Mars", "prompt": "Tell me about Mars",
"max_tokens": 2000, "max_tokens": 2000,
} }
response = _send_completion_request(payload) response = _send_completion_request(payload, frontend_port)
assert response.status_code == 200, ( assert response.status_code == 200, (
f"Completion request failed with status " f"Completion request failed with status "
...@@ -167,15 +67,15 @@ def test_completion_string_prompt() -> None: ...@@ -167,15 +67,15 @@ def test_completion_string_prompt() -> None:
) )
@pytest.mark.usefixtures("start_services") def test_completion_empty_array_prompt(start_services_with_mocker) -> None:
def test_completion_empty_array_prompt() -> None: frontend_port = start_services_with_mocker
payload: Dict[str, Any] = { payload: Dict[str, Any] = {
"model": TEST_MODEL, "model": TEST_MODEL,
"prompt": [], "prompt": [],
"max_tokens": 2000, "max_tokens": 2000,
} }
response = _send_completion_request(payload) response = _send_completion_request(payload, frontend_port)
assert response.status_code == 400, ( assert response.status_code == 400, (
f"Completion request should failed with status 400 but got" f"Completion request should failed with status 400 but got"
...@@ -183,15 +83,15 @@ def test_completion_empty_array_prompt() -> None: ...@@ -183,15 +83,15 @@ def test_completion_empty_array_prompt() -> None:
) )
@pytest.mark.usefixtures("start_services") def test_completion_single_element_array_prompt(start_services_with_mocker) -> None:
def test_completion_single_element_array_prompt() -> None: frontend_port = start_services_with_mocker
payload: Dict[str, Any] = { payload: Dict[str, Any] = {
"model": TEST_MODEL, "model": TEST_MODEL,
"prompt": ["Tell me about Mars"], "prompt": ["Tell me about Mars"],
"max_tokens": 2000, "max_tokens": 2000,
} }
response = _send_completion_request(payload) response = _send_completion_request(payload, frontend_port)
assert response.status_code == 200, ( assert response.status_code == 200, (
f"Completion request failed with status " f"Completion request failed with status "
...@@ -199,8 +99,8 @@ def test_completion_single_element_array_prompt() -> None: ...@@ -199,8 +99,8 @@ def test_completion_single_element_array_prompt() -> None:
) )
@pytest.mark.usefixtures("start_services") def test_completion_multi_element_array_prompt(start_services_with_mocker) -> None:
def test_completion_multi_element_array_prompt() -> None: frontend_port = start_services_with_mocker
payload: Dict[str, Any] = { payload: Dict[str, Any] = {
"model": TEST_MODEL, "model": TEST_MODEL,
"prompt": [ "prompt": [
...@@ -211,7 +111,7 @@ def test_completion_multi_element_array_prompt() -> None: ...@@ -211,7 +111,7 @@ def test_completion_multi_element_array_prompt() -> None:
"max_tokens": 300, "max_tokens": 300,
} }
response = _send_completion_request(payload) response = _send_completion_request(payload, frontend_port)
response_data = response.json() response_data = response.json()
assert response.status_code == 200, ( assert response.status_code == 200, (
......
...@@ -13,7 +13,7 @@ from typing import Any, Dict, Optional ...@@ -13,7 +13,7 @@ from typing import Any, Dict, Optional
import pytest import pytest
from dynamo.common.utils.paths import WORKSPACE_DIR from dynamo.common.utils.paths import WORKSPACE_DIR
from tests.serve.conftest import ServicePorts from tests.conftest import ServicePorts
from tests.utils.client import send_request from tests.utils.client import send_request
from tests.utils.constants import DefaultPort from tests.utils.constants import DefaultPort
from tests.utils.engine_process import EngineConfig, EngineProcess from tests.utils.engine_process import EngineConfig, EngineProcess
...@@ -53,31 +53,45 @@ def run_serve_deployment( ...@@ -53,31 +53,45 @@ def run_serve_deployment(
if ports is not None: if ports is not None:
dynamic_frontend_port = int(ports.frontend_port) dynamic_frontend_port = int(ports.frontend_port)
dynamic_system_port1 = int(ports.system_port1) dynamic_system_ports = [int(p) for p in ports.system_ports]
dynamic_system_port2 = int(ports.system_port2)
# The environments are used by the bash scripts to set the ports. # The environments are used by the bash scripts to set the ports.
merged_env.update( merged_env["DYN_HTTP_PORT"] = str(dynamic_frontend_port)
{
"DYN_HTTP_PORT": str(dynamic_frontend_port), # If no system ports are provided, explicitly ensure we don't pass any
# Alias for PORT1 (many scripts only read this). # stale DYN_SYSTEM_PORT* values via extra_env.
"DYN_SYSTEM_PORT": str(dynamic_system_port1), if not dynamic_system_ports:
"DYN_SYSTEM_PORT1": str(dynamic_system_port1), for k in list(merged_env.keys()):
"DYN_SYSTEM_PORT2": str(dynamic_system_port2), if k == "DYN_SYSTEM_PORT":
} merged_env.pop(k, None)
) continue
if k.startswith("DYN_SYSTEM_PORT") and k != "DYN_SYSTEM_PORT":
suffix = k.removeprefix("DYN_SYSTEM_PORT")
if suffix.isdigit():
merged_env.pop(k, None)
else:
# Alias for PORT1 (many scripts only read this).
merged_env["DYN_SYSTEM_PORT"] = str(dynamic_system_ports[0])
merged_env["DYN_SYSTEM_PORT1"] = str(dynamic_system_ports[0])
for idx, port in enumerate(dynamic_system_ports, start=1):
merged_env[f"DYN_SYSTEM_PORT{idx}"] = str(port)
# Ensure EngineProcess health checks hit the correct frontend port. # Ensure EngineProcess health checks hit the correct frontend port.
config = dataclasses.replace(config, frontend_port=dynamic_frontend_port) config = dataclasses.replace(config, frontend_port=dynamic_frontend_port)
else: else:
# Backward compat: infer from config/extra_env if no explicit ports are passed. # Backward compat: infer from config/extra_env if no explicit ports are passed.
dynamic_frontend_port = int(config.frontend_port) dynamic_frontend_port = int(config.frontend_port)
dynamic_system_port1 = int( # Preserve the historical two-port behavior in this branch. Tests that
merged_env.get("DYN_SYSTEM_PORT1") # need tighter control should pass `ports=...` to avoid default port
or merged_env.get("DYN_SYSTEM_PORT") # collisions under xdist.
or DefaultPort.SYSTEM1.value dynamic_system_ports = [
) int(
dynamic_system_port2 = int( merged_env.get("DYN_SYSTEM_PORT1")
merged_env.get("DYN_SYSTEM_PORT2") or DefaultPort.SYSTEM2.value or merged_env.get("DYN_SYSTEM_PORT")
) or DefaultPort.SYSTEM1.value
),
int(merged_env.get("DYN_SYSTEM_PORT2") or DefaultPort.SYSTEM2.value),
]
with EngineProcess.from_script( with EngineProcess.from_script(
config, request, extra_env=merged_env config, request, extra_env=merged_env
...@@ -96,9 +110,19 @@ def run_serve_deployment( ...@@ -96,9 +110,19 @@ def run_serve_deployment(
# worker system ports (mapped from DefaultPort -> per-test ports). # worker system ports (mapped from DefaultPort -> per-test ports).
if getattr(payload, "endpoint", "") == "/metrics": if getattr(payload, "endpoint", "") == "/metrics":
if payload.port == DefaultPort.SYSTEM1.value: if payload.port == DefaultPort.SYSTEM1.value:
payload.port = dynamic_system_port1 if len(dynamic_system_ports) < 1:
raise RuntimeError(
"Payload targets SYSTEM_PORT1 but no system ports were provided "
f"(payload={payload.__class__.__name__})"
)
payload.port = dynamic_system_ports[0]
elif payload.port == DefaultPort.SYSTEM2.value: elif payload.port == DefaultPort.SYSTEM2.value:
payload.port = dynamic_system_port2 if len(dynamic_system_ports) < 2:
raise RuntimeError(
"Payload targets SYSTEM_PORT2 but only 1 system port was provided "
f"(payload={payload.__class__.__name__})"
)
payload.port = dynamic_system_ports[1]
else: else:
payload.port = dynamic_frontend_port payload.port = dynamic_frontend_port
...@@ -109,9 +133,19 @@ def run_serve_deployment( ...@@ -109,9 +133,19 @@ def run_serve_deployment(
mapped_system_ports: list[int] = [] mapped_system_ports: list[int] = []
for p in payload.system_ports: for p in payload.system_ports:
if p == DefaultPort.SYSTEM1.value: if p == DefaultPort.SYSTEM1.value:
mapped_system_ports.append(dynamic_system_port1) if len(dynamic_system_ports) < 1:
raise RuntimeError(
"Payload.system_ports includes SYSTEM_PORT1 but no system ports were provided "
f"(payload={payload.__class__.__name__})"
)
mapped_system_ports.append(dynamic_system_ports[0])
elif p == DefaultPort.SYSTEM2.value: elif p == DefaultPort.SYSTEM2.value:
mapped_system_ports.append(dynamic_system_port2) if len(dynamic_system_ports) < 2:
raise RuntimeError(
"Payload.system_ports includes SYSTEM_PORT2 but only 1 system port was provided "
f"(payload={payload.__class__.__name__})"
)
mapped_system_ports.append(dynamic_system_ports[1])
else: else:
mapped_system_ports.append(p) mapped_system_ports.append(p)
payload.system_ports = mapped_system_ports payload.system_ports = mapped_system_ports
......
...@@ -2,16 +2,12 @@ ...@@ -2,16 +2,12 @@
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
import os import os
from dataclasses import dataclass
from typing import Generator
import pytest import pytest
from pytest_httpserver import HTTPServer from pytest_httpserver import HTTPServer
from dynamo.common.utils.paths import WORKSPACE_DIR from dynamo.common.utils.paths import WORKSPACE_DIR
from tests.serve.lora_utils import MinioLoraConfig, MinioService from tests.serve.lora_utils import MinioLoraConfig, MinioService
from tests.utils.constants import DefaultPort
from tests.utils.port_utils import allocate_port, allocate_ports, deallocate_ports
# Shared constants for multimodal testing # Shared constants for multimodal testing
IMAGE_SERVER_PORT = 8765 IMAGE_SERVER_PORT = 8765
...@@ -21,38 +17,6 @@ MULTIMODAL_IMG_PATH = os.path.join( ...@@ -21,38 +17,6 @@ MULTIMODAL_IMG_PATH = os.path.join(
MULTIMODAL_IMG_URL = f"http://localhost:{IMAGE_SERVER_PORT}/llm-graphic.png" MULTIMODAL_IMG_URL = f"http://localhost:{IMAGE_SERVER_PORT}/llm-graphic.png"
@dataclass(frozen=True)
class ServicePorts:
frontend_port: int
system_port1: int
system_port2: int
@pytest.fixture(scope="function")
def dynamo_dynamic_ports() -> Generator[ServicePorts, None, None]:
"""Allocate per-test ports for serve-style deployments.
- frontend_port: OpenAI-compatible HTTP ingress (dynamo.frontend)
- system_port1/system_port2: worker metrics/system ports (used by some scripts)
Note: some disaggregated launch scripts can spawn more than two workers; if/when
serve tests start exercising those scripts, we'll extend this fixture to allocate
additional system ports (e.g. system_port3+ / DYN_SYSTEM_PORT3+).
"""
frontend_port = allocate_port(DefaultPort.FRONTEND.value)
system_ports = allocate_ports(2, DefaultPort.SYSTEM1.value)
ports = [frontend_port, *system_ports]
try:
yield ServicePorts(
frontend_port=frontend_port,
system_port1=system_ports[0],
system_port2=system_ports[1],
)
finally:
deallocate_ports(ports)
@pytest.fixture(scope="session") @pytest.fixture(scope="session")
def httpserver_listen_address(): def httpserver_listen_address():
return ("127.0.0.1", IMAGE_SERVER_PORT) return ("127.0.0.1", IMAGE_SERVER_PORT)
......
...@@ -253,14 +253,21 @@ def sglang_config_test(request): ...@@ -253,14 +253,21 @@ def sglang_config_test(request):
@pytest.mark.e2e @pytest.mark.e2e
@pytest.mark.sglang @pytest.mark.sglang
# Use 2 system ports because some `sglang_configs` validate metrics on multiple ports.
# This test iterates over all configs via `sglang_config_test`.
@pytest.mark.parametrize("num_system_ports", [2], indirect=True)
def test_sglang_deployment( def test_sglang_deployment(
sglang_config_test, sglang_config_test,
request, request,
runtime_services_dynamic_ports, runtime_services_dynamic_ports,
dynamo_dynamic_ports, dynamo_dynamic_ports,
num_system_ports,
predownload_models, predownload_models,
): ):
"""Test SGLang deployment scenarios using common helpers""" """Test SGLang deployment scenarios using common helpers"""
assert (
num_system_ports >= 2
), "serve tests require at least SYSTEM_PORT1 + SYSTEM_PORT2"
config = dataclasses.replace( config = dataclasses.replace(
sglang_config_test, frontend_port=dynamo_dynamic_ports.frontend_port sglang_config_test, frontend_port=dynamo_dynamic_ports.frontend_port
) )
......
...@@ -211,16 +211,21 @@ def trtllm_config_test(request): ...@@ -211,16 +211,21 @@ def trtllm_config_test(request):
@pytest.mark.trtllm @pytest.mark.trtllm
@pytest.mark.e2e @pytest.mark.e2e
@pytest.mark.parametrize("num_system_ports", [2], indirect=True)
def test_deployment( def test_deployment(
trtllm_config_test, trtllm_config_test,
request, request,
runtime_services_dynamic_ports, runtime_services_dynamic_ports,
dynamo_dynamic_ports, dynamo_dynamic_ports,
num_system_ports,
predownload_models, predownload_models,
): ):
""" """
Test dynamo deployments with different configurations. Test dynamo deployments with different configurations.
""" """
assert (
num_system_ports >= 2
), "serve tests require at least SYSTEM_PORT1 + SYSTEM_PORT2"
# Use per-test ports so tests can run safely under pytest-xdist. # Use per-test ports so tests can run safely under pytest-xdist.
config = dataclasses.replace( config = dataclasses.replace(
trtllm_config_test, frontend_port=dynamo_dynamic_ports.frontend_port trtllm_config_test, frontend_port=dynamo_dynamic_ports.frontend_port
......
...@@ -617,7 +617,10 @@ def test_serve_deployment( ...@@ -617,7 +617,10 @@ def test_serve_deployment(
@pytest.mark.gpu_2 @pytest.mark.gpu_2
@pytest.mark.timeout(360) # Match VLLMConfig.timeout for this multimodal deployment @pytest.mark.timeout(360) # Match VLLMConfig.timeout for this multimodal deployment
def test_multimodal_b64( def test_multimodal_b64(
request, runtime_services_dynamic_ports, dynamo_dynamic_ports, predownload_models request,
runtime_services_dynamic_ports,
dynamo_dynamic_ports,
predownload_models,
): ):
""" """
Test multimodal inference with base64 url passthrough. Test multimodal inference with base64 url passthrough.
...@@ -764,12 +767,14 @@ def test_lora_aggregated( ...@@ -764,12 +767,14 @@ def test_lora_aggregated(
@pytest.mark.model("Qwen/Qwen3-0.6B") @pytest.mark.model("Qwen/Qwen3-0.6B")
@pytest.mark.timeout(600) @pytest.mark.timeout(600)
@pytest.mark.post_merge @pytest.mark.post_merge
@pytest.mark.parametrize("num_system_ports", [2], indirect=True)
def test_lora_aggregated_router( def test_lora_aggregated_router(
request, request,
runtime_services_dynamic_ports, runtime_services_dynamic_ports,
predownload_models, predownload_models,
minio_lora_service, minio_lora_service,
dynamo_dynamic_ports, dynamo_dynamic_ports,
num_system_ports,
): ):
""" """
Test LoRA inference with aggregated vLLM deployment using KV router. Test LoRA inference with aggregated vLLM deployment using KV router.
...@@ -780,6 +785,9 @@ def test_lora_aggregated_router( ...@@ -780,6 +785,9 @@ def test_lora_aggregated_router(
3. Loads the LoRA adapter on both workers via system API 3. Loads the LoRA adapter on both workers via system API
4. Runs inference with the LoRA model, verifying KV cache routing 4. Runs inference with the LoRA model, verifying KV cache routing
""" """
assert (
num_system_ports >= 2
), "serve tests require at least SYSTEM_PORT1 + SYSTEM_PORT2"
minio_config: MinioLoraConfig = minio_lora_service minio_config: MinioLoraConfig = minio_lora_service
# Create payloads that load LoRA on both workers and test inference # Create payloads that load LoRA on both workers and test inference
......
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json import json
import logging import logging
...@@ -20,6 +8,7 @@ import shutil ...@@ -20,6 +8,7 @@ import shutil
import signal import signal
import socket import socket
import subprocess import subprocess
import tempfile
import time import time
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import Any, List, Optional from typing import Any, List, Optional
...@@ -119,6 +108,17 @@ class ManagedProcess: ...@@ -119,6 +108,17 @@ class ManagedProcess:
try: try:
self._logger = logging.getLogger(self.__class__.__name__) self._logger = logging.getLogger(self.__class__.__name__)
self._command_name = self.command[0] self._command_name = self.command[0]
# Keep test logs out of the git working tree: many tests pass a relative
# `log_dir` derived from `request.node.name`, which otherwise creates a large
# number of untracked directories under the repo root during pytest runs.
if not os.path.isabs(self.log_dir):
log_root = os.environ.get(
"DYN_TEST_OUTPUT_PATH",
os.path.join(tempfile.gettempdir(), "dynamo_tests"),
)
self.log_dir = os.path.join(log_root, self.log_dir)
os.makedirs(self.log_dir, exist_ok=True) os.makedirs(self.log_dir, exist_ok=True)
log_name = f"{self._command_name}.log.txt" log_name = f"{self._command_name}.log.txt"
self._log_path = os.path.join(self.log_dir, log_name) self._log_path = os.path.join(self.log_dir, log_name)
...@@ -595,14 +595,20 @@ class DynamoFrontendProcess(ManagedProcess): ...@@ -595,14 +595,20 @@ class DynamoFrontendProcess(ManagedProcess):
router_mode: str = "round-robin", router_mode: str = "round-robin",
extra_args: Optional[list[str]] = None, extra_args: Optional[list[str]] = None,
extra_env: Optional[dict[str, str]] = None, extra_env: Optional[dict[str, str]] = None,
terminate_existing: bool = True, # Default to false so pytest-xdist workers don't kill each other's frontends.
terminate_existing: bool = False,
): ):
# TODO: Refactor remaining duplicate "DynamoFrontendProcess" helpers in tests to # TODO: Refactor remaining duplicate "DynamoFrontendProcess" helpers in tests to
# use this shared implementation (and delete the copies): # use this shared implementation (and delete the copies):
# - tests/frontend/test_vllm.py # - tests/frontend/test_vllm.py
# - tests/frontend/test_completion_mocker_engine.py # - tests/router/common.py
# - tests/frontend/grpc/test_tensor_parameters.py # - tests/router/test_router_e2e_with_vllm.py
# - tests/frontend/grpc/test_tensor_mocker_engine.py # - tests/router/test_router_e2e_with_sglang.py
# - tests/router/test_router_e2e_with_trtllm.py
# - tests/fault_tolerance/cancellation/utils.py
# - tests/fault_tolerance/migration/utils.py
# - tests/fault_tolerance/etcd_ha/utils.py
# - tests/fault_tolerance/test_vllm_health_check.py
self._allocated_http_port: Optional[int] = None self._allocated_http_port: Optional[int] = None
if frontend_port == 0: if frontend_port == 0:
# Treat `0` as "allocate a random free port" for xdist-safe tests. # Treat `0` as "allocate a random free port" for xdist-safe tests.
......
...@@ -15,6 +15,7 @@ import random ...@@ -15,6 +15,7 @@ import random
import socket import socket
import tempfile import tempfile
import time import time
from dataclasses import dataclass
from pathlib import Path from pathlib import Path
# Port allocation lock file # Port allocation lock file
...@@ -27,6 +28,18 @@ _PORT_MIN = 1024 ...@@ -27,6 +28,18 @@ _PORT_MIN = 1024
_PORT_MAX = 32767 _PORT_MAX = 32767
@dataclass(frozen=True)
class ServicePorts:
"""Port allocation for Dynamo service deployments.
Used by tests that need to pass a cohesive set of ports around (frontend + one or
more worker/system ports).
"""
frontend_port: int
system_ports: list[int]
def _load_port_registry() -> dict: def _load_port_registry() -> dict:
"""Load the port registry from disk. """Load the port registry from disk.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment