Unverified Commit e076f3a2 authored by Graham King's avatar Graham King Committed by GitHub
Browse files

test: Auto-start services for prompt_embed_tests.py (#5491)


Signed-off-by: default avatarGraham King <grahamk@nvidia.com>
parent d953f9d0
...@@ -13,29 +13,163 @@ These tests validate behavior that cannot be covered by Rust unit tests: ...@@ -13,29 +13,163 @@ These tests validate behavior that cannot be covered by Rust unit tests:
Validation tests (base64, size limits, empty prompt) are covered by Rust unit tests Validation tests (base64, size limits, empty prompt) are covered by Rust unit tests
in lib/llm/src/protocols/openai/completions.rs in lib/llm/src/protocols/openai/completions.rs
Run with: pytest tests/frontend/test_prompt_embeds.py -v
""" """
from __future__ import annotations
import base64 import base64
import concurrent.futures import concurrent.futures
import io import io
import logging import logging
import os
import shutil
from typing import Generator
import pytest import pytest
import torch import torch
from openai import OpenAI from openai import OpenAI
from tests.utils.managed_process import DynamoFrontendProcess, ManagedProcess
from tests.utils.payloads import check_models_api
from tests.utils.port_utils import ServicePorts
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Test model - small and fast for CI # Test model - small and fast for CI
TEST_MODEL = "Qwen/Qwen3-0.6B" TEST_MODEL = "Qwen/Qwen3-0.6B"
pytestmark = [
pytest.mark.integration,
pytest.mark.vllm,
pytest.mark.nightly,
pytest.mark.gpu_1,
pytest.mark.model(TEST_MODEL),
]
class VllmPromptEmbedsWorkerProcess(ManagedProcess):
"""Vllm Worker process configured for prompt embeddings testing.
Uses file-based KV store and TCP request plane (no NATS/etcd required).
"""
def __init__(
self,
request,
*,
frontend_port: int,
system_port: int,
worker_id: str = "vllm-prompt-embeds-worker",
):
self.worker_id = worker_id
self.frontend_port = int(frontend_port)
self.system_port = int(system_port)
command = [
"python3",
"-m",
"dynamo.vllm",
"--model",
TEST_MODEL,
"--connector",
"none",
"--max-model-len",
"4096",
"--store-kv",
"file",
"--request-plane",
"tcp",
"--enable-prompt-embeds",
"--kv-events-config",
'{"enable_kv_cache_events": false}',
]
env = os.environ.copy()
env["DYN_LOG"] = "debug"
env["DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS"] = '["generate"]'
env["DYN_SYSTEM_PORT"] = str(self.system_port)
log_dir = f"{request.node.name}_{worker_id}"
try:
shutil.rmtree(log_dir)
except FileNotFoundError:
pass
super().__init__(
command=command,
env=env,
health_check_urls=[
(f"http://localhost:{self.frontend_port}/v1/models", check_models_api),
(f"http://localhost:{self.system_port}/health", self.is_ready),
],
timeout=500,
display_output=True,
terminate_existing=False,
stragglers=["VLLM::EngineCore"],
straggler_commands=["-m dynamo.vllm"],
log_dir=log_dir,
)
def is_ready(self, response) -> bool:
try:
status = (response.json() or {}).get("status")
except ValueError:
logger.warning("%s health response is not valid JSON", self.worker_id)
return False
is_ready = status == "ready"
if is_ready:
logger.info("%s status is ready", self.worker_id)
else:
logger.warning("%s status is not ready: %s", self.worker_id, status)
return is_ready
@pytest.fixture(scope="function")
def start_services(
request,
file_storage_backend,
dynamo_dynamic_ports: ServicePorts,
predownload_models,
) -> Generator[ServicePorts, None, None]:
"""Start frontend and vllm worker processes for prompt embeds testing.
Uses file-based KV store and TCP request plane (no NATS/etcd needed).
This makes tests simpler and faster by avoiding external dependencies.
The `file_storage_backend` fixture sets up a temporary directory and
configures DYN_FILE_KV environment variable.
"""
_ = file_storage_backend # Ensures temp dir is set up and DYN_FILE_KV is configured
_ = predownload_models # Ensures model is downloaded before starting services
frontend_port = dynamo_dynamic_ports.frontend_port
system_port = dynamo_dynamic_ports.system_ports[0]
with DynamoFrontendProcess(
request,
frontend_port=frontend_port,
terminate_existing=False,
extra_args=["--store-kv", "file", "--request-plane", "tcp"],
):
logger.info("Frontend started for prompt embeds tests")
with VllmPromptEmbedsWorkerProcess(
request,
frontend_port=frontend_port,
system_port=system_port,
):
logger.info("Vllm Worker with prompt embeds started for tests")
yield dynamo_dynamic_ports
@pytest.fixture @pytest.fixture
def dynamo_client(): def dynamo_client(start_services: ServicePorts):
"""Create OpenAI client pointing to Dynamo frontend.""" """Create OpenAI client pointing to Dynamo frontend on the allocated port."""
return OpenAI( return OpenAI(
api_key="EMPTY", api_key="EMPTY",
base_url="http://localhost:8000/v1", base_url=f"http://localhost:{start_services.frontend_port}/v1",
) )
...@@ -48,11 +182,6 @@ def create_embeddings_base64(shape: tuple[int, ...]) -> str: ...@@ -48,11 +182,6 @@ def create_embeddings_base64(shape: tuple[int, ...]) -> str:
return base64.b64encode(buffer.read()).decode("utf-8") return base64.b64encode(buffer.read()).decode("utf-8")
@pytest.mark.integration
@pytest.mark.vllm
@pytest.mark.nightly
@pytest.mark.gpu_1
@pytest.mark.model(TEST_MODEL)
class TestPromptEmbedsE2E: class TestPromptEmbedsE2E:
""" """
End-to-end tests for prompt embeddings. End-to-end tests for prompt embeddings.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment