"vscode:/vscode.git/clone" did not exist on "e176a29c713400d84e3db53169c29542ff58a364"
Unverified Commit 1b2826a9 authored by Alec's avatar Alec Committed by GitHub
Browse files

ci: remove not slow filter (#2944)


Signed-off-by: default avataralec-flowers <aflowers@nvidia.com>
parent 2eced093
...@@ -22,7 +22,7 @@ jobs: ...@@ -22,7 +22,7 @@ jobs:
pytest_marks: "e2e and vllm and gpu_1 and not slow" pytest_marks: "e2e and vllm and gpu_1 and not slow"
- framework: sglang - framework: sglang
target: runtime target: runtime
pytest_marks: "e2e and sglang and gpu_1" pytest_marks: "e2e and sglang and gpu_1 and not slow"
# Do not cancel main branch runs # Do not cancel main branch runs
concurrency: concurrency:
......
...@@ -178,7 +178,8 @@ markers = [ ...@@ -178,7 +178,8 @@ markers = [
"sglang: marks tests as requiring sglang", "sglang: marks tests as requiring sglang",
"slow: marks tests as known to be slow", "slow: marks tests as known to be slow",
"h100: marks tests to run on H100", "h100: marks tests to run on H100",
"kvbm: marks tests for KV behavior and model determinism" "kvbm: marks tests for KV behavior and model determinism",
"model: model id used by a test or parameter"
] ]
# Linting/formatting # Linting/formatting
......
...@@ -20,26 +20,25 @@ import tempfile ...@@ -20,26 +20,25 @@ import tempfile
import pytest import pytest
from tests.utils.constants import TEST_MODELS
from tests.utils.managed_process import ManagedProcess from tests.utils.managed_process import ManagedProcess
# Custom format inspired by your example
def pytest_configure(config):
# Defining model morker to avoid `'model' not found in `markers` configuration option`
# error when pyproject.toml is not available in the container
config.addinivalue_line("markers", "model: model id used by a test or parameter")
LOG_FORMAT = "[TEST] %(asctime)s %(levelname)s %(name)s: %(message)s" LOG_FORMAT = "[TEST] %(asctime)s %(levelname)s %(name)s: %(message)s"
DATE_FORMAT = "%Y-%m-%dT%H:%M:%S" DATE_FORMAT = "%Y-%m-%dT%H:%M:%S"
# Configure logging
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.INFO,
format=LOG_FORMAT, format=LOG_FORMAT,
datefmt=DATE_FORMAT, # ISO 8601 UTC format datefmt=DATE_FORMAT, # ISO 8601 UTC format
) )
# List of models used in tests
TEST_MODELS = [
"Qwen/Qwen3-0.6B",
"deepseek-ai/DeepSeek-R1-Distill-Llama-8B",
"llava-hf/llava-1.5-7b-hf",
]
def download_models(model_list=None, ignore_weights=False): def download_models(model_list=None, ignore_weights=False):
"""Download models - can be called directly or via fixture """Download models - can be called directly or via fixture
...@@ -107,15 +106,33 @@ def download_models(model_list=None, ignore_weights=False): ...@@ -107,15 +106,33 @@ def download_models(model_list=None, ignore_weights=False):
@pytest.fixture(scope="session") @pytest.fixture(scope="session")
def predownload_models(): def predownload_models(pytestconfig):
"""Fixture wrapper around download_models for all TEST_MODELS""" """Fixture wrapper around download_models for models used in collected tests"""
# Get models from pytest config if available, otherwise fall back to TEST_MODELS
models = getattr(pytestconfig, "models_to_download", None)
if models:
logging.info(
f"Downloading {len(models)} models needed for collected tests\nModels: {models}"
)
download_models(model_list=list(models))
else:
# Fallback to original behavior if extraction failed
download_models() download_models()
yield yield
@pytest.fixture(scope="session") @pytest.fixture(scope="session")
def predownload_tokenizers(): def predownload_tokenizers(pytestconfig):
"""Fixture wrapper around download_models for all TEST_MODELS""" """Fixture wrapper around download_models for tokenizers used in collected tests"""
# Get models from pytest config if available, otherwise fall back to TEST_MODELS
models = getattr(pytestconfig, "models_to_download", None)
if models:
logging.info(
f"Downloading tokenizers for {len(models)} models needed for collected tests\nModels: {models}"
)
download_models(model_list=list(models), ignore_weights=True)
else:
# Fallback to original behavior if extraction failed
download_models(ignore_weights=True) download_models(ignore_weights=True)
yield yield
...@@ -135,42 +152,26 @@ def logger(request): ...@@ -135,42 +152,26 @@ def logger(request):
logger.removeHandler(handler) logger.removeHandler(handler)
@pytest.hookimpl(trylast=True)
def pytest_collection_modifyitems(config, items): def pytest_collection_modifyitems(config, items):
""" """
This function is called to modify the list of tests to run. This function is called to modify the list of tests to run.
It is used to skip tests that are not supported on all environments.
""" """
# Collect models via explicit pytest mark from final filtered items only
# Tests marked with trtllm requires specific environment with tensorrtllm models_to_download = set()
# installed. Hence, we skip them if the user did not explicitly ask for them.
if config.getoption("-m") and "trtllm_marker" in config.getoption("-m"):
return
skip_trtllm = pytest.mark.skip(reason="need -m trtllm_marker to run")
for item in items: for item in items:
if "trtllm_marker" in item.keywords: # Only collect from items that are not skipped
item.add_marker(skip_trtllm) if any(
getattr(m, "name", "") == "skip" for m in getattr(item, "own_markers", [])
# Auto-inject predownload_models fixture for serve tests only (not router tests)
# Skip items that don't have fixturenames (like MypyFileItem)
if hasattr(item, "fixturenames"):
# Guard clause: skip if already has the fixtures
if (
"predownload_models" in item.fixturenames
or "predownload_tokenizers" in item.fixturenames
): ):
continue continue
model_mark = item.get_closest_marker("model")
if model_mark and model_mark.args:
models_to_download.add(model_mark.args[0])
# Guard clause: skip if marked with skip_model_download # Store models to download in pytest config for fixtures to access
if item.get_closest_marker("skip_model_download"): if models_to_download:
continue config.models_to_download = models_to_download
# Add appropriate fixture based on test path
if "serve" in str(item.path):
item.fixturenames = list(item.fixturenames)
item.fixturenames.append("predownload_models")
elif "router" in str(item.path):
item.fixturenames = list(item.fixturenames)
item.fixturenames.append("predownload_tokenizers")
class EtcdServer(ManagedProcess): class EtcdServer(ManagedProcess):
......
...@@ -9,8 +9,8 @@ import time ...@@ -9,8 +9,8 @@ import time
import pytest import pytest
import requests import requests
from huggingface_hub import snapshot_download
from tests.utils.constants import FAULT_TOLERANCE_MODEL_NAME
from tests.utils.engine_process import FRONTEND_PORT from tests.utils.engine_process import FRONTEND_PORT
from tests.utils.managed_process import ManagedProcess from tests.utils.managed_process import ManagedProcess
from tests.utils.payloads import check_health_generate, check_models_api from tests.utils.payloads import check_health_generate, check_models_api
...@@ -56,7 +56,7 @@ class DynamoWorkerProcess(ManagedProcess): ...@@ -56,7 +56,7 @@ class DynamoWorkerProcess(ManagedProcess):
"-m", "-m",
"dynamo.vllm", "dynamo.vllm",
"--model", "--model",
"deepseek-ai/DeepSeek-R1-Distill-Llama-8B", FAULT_TOLERANCE_MODEL_NAME,
"--enforce-eager", "--enforce-eager",
"--gpu-memory-utilization", "--gpu-memory-utilization",
"0.45", "0.45",
...@@ -137,47 +137,12 @@ class DynamoWorkerProcess(ManagedProcess): ...@@ -137,47 +137,12 @@ class DynamoWorkerProcess(ManagedProcess):
return False return False
def download_model() -> None:
"""
Download the DeepSeek-R1-Distill-Llama-8B model from HuggingFace Hub if not already cached.
"""
model_id = "deepseek-ai/DeepSeek-R1-Distill-Llama-8B"
logger.info(f"Caching model {model_id}...")
max_retries = 5
retry_delay = 30 # seconds
for attempt in range(max_retries):
try:
# Download the model to the default cache directory
# This will skip download if the model is already cached
snapshot_download(
repo_id="deepseek-ai/DeepSeek-R1-Distill-Llama-8B",
repo_type="model",
local_files_only=False,
)
logger.info(f"Model {model_id} is ready for use")
return # Success, exit the function
except Exception as e:
if attempt < max_retries - 1: # Not the last attempt
logger.warning(
f"Failed to download model {model_id} (attempt {attempt + 1}/{max_retries}): {e}"
)
logger.info(f"Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
else: # Last attempt failed
logger.error(
f"Failed to download model {model_id} after {max_retries} attempts: {e}"
)
raise
def send_completion_request( def send_completion_request(
prompt: str, max_tokens: int, timeout: int = 120 prompt: str, max_tokens: int, timeout: int = 120
) -> requests.Response: ) -> requests.Response:
"""Send a completion request to the frontend""" """Send a completion request to the frontend"""
payload = { payload = {
"model": "deepseek-ai/DeepSeek-R1-Distill-Llama-8B", "model": FAULT_TOLERANCE_MODEL_NAME,
"prompt": prompt, "prompt": prompt,
"max_tokens": max_tokens, "max_tokens": max_tokens,
} }
...@@ -211,7 +176,7 @@ def send_chat_completion_request( ...@@ -211,7 +176,7 @@ def send_chat_completion_request(
) -> requests.Response: ) -> requests.Response:
"""Send a chat completion request to the frontend""" """Send a chat completion request to the frontend"""
payload = { payload = {
"model": "deepseek-ai/DeepSeek-R1-Distill-Llama-8B", "model": FAULT_TOLERANCE_MODEL_NAME,
"messages": [{"role": "user", "content": prompt}], "messages": [{"role": "user", "content": prompt}],
"max_tokens": max_tokens, "max_tokens": max_tokens,
"stream": stream, "stream": stream,
...@@ -383,8 +348,8 @@ def verify_request_cancelled( ...@@ -383,8 +348,8 @@ def verify_request_cancelled(
@pytest.mark.vllm @pytest.mark.vllm
@pytest.mark.gpu_1 @pytest.mark.gpu_1
@pytest.mark.e2e @pytest.mark.e2e
@pytest.mark.slow @pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME)
def test_request_cancellation_vllm(request, runtime_services): def test_request_cancellation_vllm(request, runtime_services, predownload_models):
""" """
End-to-end test for request cancellation functionality. End-to-end test for request cancellation functionality.
...@@ -395,8 +360,6 @@ def test_request_cancellation_vllm(request, runtime_services): ...@@ -395,8 +360,6 @@ def test_request_cancellation_vllm(request, runtime_services):
2. Chat completion request (non-streaming) 2. Chat completion request (non-streaming)
3. Chat completion request (streaming) 3. Chat completion request (streaming)
""" """
# Step 0: Download the model from HuggingFace if not already cached
download_model()
# Step 1: Start the frontend # Step 1: Start the frontend
with DynamoFrontendProcess(request) as frontend: with DynamoFrontendProcess(request) as frontend:
...@@ -446,8 +409,10 @@ def test_request_cancellation_vllm(request, runtime_services): ...@@ -446,8 +409,10 @@ def test_request_cancellation_vllm(request, runtime_services):
@pytest.mark.vllm @pytest.mark.vllm
@pytest.mark.gpu_1 @pytest.mark.gpu_1
@pytest.mark.e2e @pytest.mark.e2e
@pytest.mark.slow @pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME)
def test_request_cancellation_vllm_decode(request, runtime_services): def test_request_cancellation_vllm_decode(
request, runtime_services, predownload_models
):
""" """
End-to-end test for request cancellation functionality with remote prefill. End-to-end test for request cancellation functionality with remote prefill.
...@@ -455,8 +420,6 @@ def test_request_cancellation_vllm_decode(request, runtime_services): ...@@ -455,8 +420,6 @@ def test_request_cancellation_vllm_decode(request, runtime_services):
the system properly handles the cancellation and cleans up resources the system properly handles the cancellation and cleans up resources
on the decode worker side in a disaggregated setup. on the decode worker side in a disaggregated setup.
""" """
# Step 0: Download the model from HuggingFace if not already cached
download_model()
# Step 1: Start the frontend # Step 1: Start the frontend
with DynamoFrontendProcess(request) as frontend: with DynamoFrontendProcess(request) as frontend:
...@@ -501,7 +464,6 @@ def test_request_cancellation_vllm_decode(request, runtime_services): ...@@ -501,7 +464,6 @@ def test_request_cancellation_vllm_decode(request, runtime_services):
@pytest.mark.vllm @pytest.mark.vllm
@pytest.mark.gpu_1 @pytest.mark.gpu_1
@pytest.mark.e2e @pytest.mark.e2e
@pytest.mark.slow
def test_request_cancellation_vllm_prefill(request, runtime_services): def test_request_cancellation_vllm_prefill(request, runtime_services):
""" """
End-to-end test for request cancellation on remote prefill. End-to-end test for request cancellation on remote prefill.
......
...@@ -10,8 +10,8 @@ import time ...@@ -10,8 +10,8 @@ import time
import pytest import pytest
import requests import requests
from huggingface_hub import snapshot_download
from tests.utils.constants import FAULT_TOLERANCE_MODEL_NAME
from tests.utils.engine_process import FRONTEND_PORT from tests.utils.engine_process import FRONTEND_PORT
from tests.utils.managed_process import ManagedProcess, terminate_process_tree from tests.utils.managed_process import ManagedProcess, terminate_process_tree
from tests.utils.payloads import check_models_api from tests.utils.payloads import check_models_api
...@@ -54,7 +54,7 @@ class DynamoWorkerProcess(ManagedProcess): ...@@ -54,7 +54,7 @@ class DynamoWorkerProcess(ManagedProcess):
"-m", "-m",
"dynamo.vllm", "dynamo.vllm",
"--model", "--model",
"deepseek-ai/DeepSeek-R1-Distill-Llama-8B", FAULT_TOLERANCE_MODEL_NAME,
"--enforce-eager", "--enforce-eager",
"--gpu-memory-utilization", "--gpu-memory-utilization",
"0.45", "0.45",
...@@ -117,47 +117,12 @@ class DynamoWorkerProcess(ManagedProcess): ...@@ -117,47 +117,12 @@ class DynamoWorkerProcess(ManagedProcess):
return False return False
def download_model() -> None:
"""
Download the DeepSeek-R1-Distill-Llama-8B model from HuggingFace Hub if not already cached.
"""
model_id = "deepseek-ai/DeepSeek-R1-Distill-Llama-8B"
logger.info(f"Caching model {model_id}...")
max_retries = 5
retry_delay = 30 # seconds
for attempt in range(max_retries):
try:
# Download the model to the default cache directory
# This will skip download if the model is already cached
snapshot_download(
repo_id="deepseek-ai/DeepSeek-R1-Distill-Llama-8B",
repo_type="model",
local_files_only=False,
)
logger.info(f"Model {model_id} is ready for use")
return # Success, exit the function
except Exception as e:
if attempt < max_retries - 1: # Not the last attempt
logger.warning(
f"Failed to download model {model_id} (attempt {attempt + 1}/{max_retries}): {e}"
)
logger.info(f"Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
else: # Last attempt failed
logger.error(
f"Failed to download model {model_id} after {max_retries} attempts: {e}"
)
raise
def send_completion_request( def send_completion_request(
prompt: str, max_tokens: int, timeout: int = 120 prompt: str, max_tokens: int, timeout: int = 120
) -> requests.Response: ) -> requests.Response:
"""Send a completion request to the frontend""" """Send a completion request to the frontend"""
payload = { payload = {
"model": "deepseek-ai/DeepSeek-R1-Distill-Llama-8B", "model": FAULT_TOLERANCE_MODEL_NAME,
"prompt": prompt, "prompt": prompt,
"max_tokens": max_tokens, "max_tokens": max_tokens,
} }
...@@ -324,8 +289,8 @@ def verify_migration_occurred(frontend_process: DynamoFrontendProcess) -> None: ...@@ -324,8 +289,8 @@ def verify_migration_occurred(frontend_process: DynamoFrontendProcess) -> None:
@pytest.mark.vllm @pytest.mark.vllm
@pytest.mark.gpu_1 @pytest.mark.gpu_1
@pytest.mark.e2e @pytest.mark.e2e
@pytest.mark.slow @pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME)
def test_request_migration_vllm(request, runtime_services): def test_request_migration_vllm(request, runtime_services, predownload_models):
""" """
End-to-end test for worker fault tolerance with migration support. End-to-end test for worker fault tolerance with migration support.
...@@ -333,8 +298,6 @@ def test_request_migration_vllm(request, runtime_services): ...@@ -333,8 +298,6 @@ def test_request_migration_vllm(request, runtime_services):
the system can handle the failure gracefully and migrate the request to the system can handle the failure gracefully and migrate the request to
another worker. another worker.
""" """
# Step 0: Download the model from HuggingFace if not already cached
download_model()
# Step 1: Start the frontend # Step 1: Start the frontend
with DynamoFrontendProcess(request) as frontend: with DynamoFrontendProcess(request) as frontend:
......
...@@ -8,8 +8,8 @@ import time ...@@ -8,8 +8,8 @@ import time
import pytest import pytest
import requests import requests
from huggingface_hub import snapshot_download
from tests.utils.constants import FAULT_TOLERANCE_MODEL_NAME
from tests.utils.engine_process import FRONTEND_PORT from tests.utils.engine_process import FRONTEND_PORT
from tests.utils.managed_process import ManagedProcess from tests.utils.managed_process import ManagedProcess
from tests.utils.payloads import check_models_api, completions_response_handler from tests.utils.payloads import check_models_api, completions_response_handler
...@@ -56,10 +56,8 @@ class DynamoWorkerProcess(ManagedProcess): ...@@ -56,10 +56,8 @@ class DynamoWorkerProcess(ManagedProcess):
"-m", "-m",
"dynamo.vllm", "dynamo.vllm",
"--model", "--model",
"deepseek-ai/DeepSeek-R1-Distill-Llama-8B", FAULT_TOLERANCE_MODEL_NAME,
"--enforce-eager", "--enforce-eager",
"--gpu-memory-utilization",
"0.45",
"--max-model-len", "--max-model-len",
"8192", "8192",
"--migration-limit", "--migration-limit",
...@@ -123,47 +121,12 @@ class DynamoWorkerProcess(ManagedProcess): ...@@ -123,47 +121,12 @@ class DynamoWorkerProcess(ManagedProcess):
return False return False
def download_model() -> None:
"""
Download the DeepSeek-R1-Distill-Llama-8B model from HuggingFace Hub if not already cached.
"""
model_id = "deepseek-ai/DeepSeek-R1-Distill-Llama-8B"
logger.info(f"Caching model {model_id}...")
max_retries = 5
retry_delay = 30 # seconds
for attempt in range(max_retries):
try:
# Download the model to the default cache directory
# This will skip download if the model is already cached
snapshot_download(
repo_id="deepseek-ai/DeepSeek-R1-Distill-Llama-8B",
repo_type="model",
local_files_only=False,
)
logger.info(f"Model {model_id} is ready for use")
return # Success, exit the function
except Exception as e:
if attempt < max_retries - 1: # Not the last attempt
logger.warning(
f"Failed to download model {model_id} (attempt {attempt + 1}/{max_retries}): {e}"
)
logger.info(f"Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
else: # Last attempt failed
logger.error(
f"Failed to download model {model_id} after {max_retries} attempts: {e}"
)
raise
def send_completion_request( def send_completion_request(
prompt: str, max_tokens: int, timeout: int = 120 prompt: str, max_tokens: int, timeout: int = 120
) -> requests.Response: ) -> requests.Response:
"""Send a completion request to the frontend""" """Send a completion request to the frontend"""
payload = { payload = {
"model": "deepseek-ai/DeepSeek-R1-Distill-Llama-8B", "model": FAULT_TOLERANCE_MODEL_NAME,
"prompt": prompt, "prompt": prompt,
"max_tokens": max_tokens, "max_tokens": max_tokens,
} }
...@@ -194,7 +157,7 @@ def send_completion_request( ...@@ -194,7 +157,7 @@ def send_completion_request(
@pytest.mark.vllm @pytest.mark.vllm
@pytest.mark.gpu_1 @pytest.mark.gpu_1
@pytest.mark.e2e @pytest.mark.e2e
@pytest.mark.slow @pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME)
def test_vllm_health_check_active(request, runtime_services): def test_vllm_health_check_active(request, runtime_services):
""" """
End-to-end test for worker fault tolerance with migration support. End-to-end test for worker fault tolerance with migration support.
...@@ -203,8 +166,6 @@ def test_vllm_health_check_active(request, runtime_services): ...@@ -203,8 +166,6 @@ def test_vllm_health_check_active(request, runtime_services):
the system can handle the failure gracefully and migrate the request to the system can handle the failure gracefully and migrate the request to
another worker. another worker.
""" """
# Step 0: Download the model from HuggingFace if not already cached
download_model()
# Step 1: Start the frontend # Step 1: Start the frontend
logger.info("Starting frontend...") logger.info("Starting frontend...")
...@@ -251,8 +212,8 @@ def test_vllm_health_check_active(request, runtime_services): ...@@ -251,8 +212,8 @@ def test_vllm_health_check_active(request, runtime_services):
@pytest.mark.vllm @pytest.mark.vllm
@pytest.mark.gpu_1 @pytest.mark.gpu_1
@pytest.mark.e2e @pytest.mark.e2e
@pytest.mark.slow @pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME)
def test_vllm_health_check_passive(request, runtime_services): def test_vllm_health_check_passive(request, runtime_services, predownload_models):
""" """
End-to-end test for worker fault tolerance with migration support. End-to-end test for worker fault tolerance with migration support.
...@@ -260,8 +221,6 @@ def test_vllm_health_check_passive(request, runtime_services): ...@@ -260,8 +221,6 @@ def test_vllm_health_check_passive(request, runtime_services):
the system can handle the failure gracefully and migrate the request to the system can handle the failure gracefully and migrate the request to
another worker. another worker.
""" """
# Step 0: Download the model from HuggingFace if not already cached
download_model()
# Step 1: Start the frontend # Step 1: Start the frontend
logger.info("Starting frontend...") logger.info("Starting frontend...")
......
...@@ -117,6 +117,8 @@ class LLMServerManager: ...@@ -117,6 +117,8 @@ class LLMServerManager:
"--kv-transfer-config", "--kv-transfer-config",
'{"kv_connector":"DynamoConnector","kv_role":"kv_both", "kv_connector_module_path": "dynamo.llm.vllm_integration.connector"}', '{"kv_connector":"DynamoConnector","kv_role":"kv_both", "kv_connector_module_path": "dynamo.llm.vllm_integration.connector"}',
os.environ.get("KVBM_MODEL_ID", "deepseek-ai/DeepSeek-R1-Distill-Llama-8B"), os.environ.get("KVBM_MODEL_ID", "deepseek-ai/DeepSeek-R1-Distill-Llama-8B"),
"--max-seq-len",
"8000", # required to fit on L4 GPU when using 8b model
] ]
# GPU blocks override # GPU blocks override
......
...@@ -13,13 +13,14 @@ import aiohttp ...@@ -13,13 +13,14 @@ import aiohttp
import pytest import pytest
from dynamo._core import DistributedRuntime, KvPushRouter, KvRouterConfig from dynamo._core import DistributedRuntime, KvPushRouter, KvRouterConfig
from tests.utils.constants import ROUTER_MODEL_NAME
from tests.utils.managed_process import ManagedProcess from tests.utils.managed_process import ManagedProcess
pytestmark = pytest.mark.pre_merge pytestmark = pytest.mark.pre_merge
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
MODEL_NAME = "Qwen/Qwen3-0.6B" MODEL_NAME = ROUTER_MODEL_NAME
NUM_MOCKERS = 2 NUM_MOCKERS = 2
BLOCK_SIZE = 16 BLOCK_SIZE = 16
SPEEDUP_RATIO = 10.0 SPEEDUP_RATIO = 10.0
...@@ -322,7 +323,8 @@ async def send_inflight_requests(urls: list, payload: dict, num_requests: int): ...@@ -322,7 +323,8 @@ async def send_inflight_requests(urls: list, payload: dict, num_requests: int):
@pytest.mark.pre_merge @pytest.mark.pre_merge
def test_mocker_kv_router(request, runtime_services): @pytest.mark.model(MODEL_NAME)
def test_mocker_kv_router(request, runtime_services, predownload_tokenizers):
""" """
Test KV router with multiple mocker engine instances. Test KV router with multiple mocker engine instances.
This test doesn't require GPUs and runs quickly for pre-merge validation. This test doesn't require GPUs and runs quickly for pre-merge validation.
...@@ -379,7 +381,8 @@ def test_mocker_kv_router(request, runtime_services): ...@@ -379,7 +381,8 @@ def test_mocker_kv_router(request, runtime_services):
@pytest.mark.pre_merge @pytest.mark.pre_merge
def test_mocker_two_kv_router(request, runtime_services): @pytest.mark.model(MODEL_NAME)
def test_mocker_two_kv_router(request, runtime_services, predownload_tokenizers):
""" """
Test with two KV routers and multiple mocker engine instances. Test with two KV routers and multiple mocker engine instances.
Alternates requests between the two routers to test load distribution. Alternates requests between the two routers to test load distribution.
...@@ -446,8 +449,11 @@ def test_mocker_two_kv_router(request, runtime_services): ...@@ -446,8 +449,11 @@ def test_mocker_two_kv_router(request, runtime_services):
@pytest.mark.pre_merge @pytest.mark.pre_merge
@pytest.mark.model(MODEL_NAME)
@pytest.mark.skip(reason="Flaky, temporarily disabled") @pytest.mark.skip(reason="Flaky, temporarily disabled")
def test_mocker_kv_router_overload_503(request, runtime_services): def test_mocker_kv_router_overload_503(
request, runtime_services, predownload_tokenizers
):
""" """
Test that KV router returns 503 when all workers are busy. Test that KV router returns 503 when all workers are busy.
This test uses limited resources to intentionally trigger the overload condition. This test uses limited resources to intentionally trigger the overload condition.
...@@ -612,7 +618,8 @@ def test_mocker_kv_router_overload_503(request, runtime_services): ...@@ -612,7 +618,8 @@ def test_mocker_kv_router_overload_503(request, runtime_services):
@pytest.mark.pre_merge @pytest.mark.pre_merge
def test_kv_push_router_bindings(request, runtime_services): @pytest.mark.model(MODEL_NAME)
def test_kv_push_router_bindings(request, runtime_services, predownload_tokenizers):
""" """
Test KvPushRouter Python bindings with mocker engines. Test KvPushRouter Python bindings with mocker engines.
This test creates KvPushRouter as a Python object and verifies This test creates KvPushRouter as a Python object and verifies
...@@ -839,7 +846,8 @@ def test_kv_push_router_bindings(request, runtime_services): ...@@ -839,7 +846,8 @@ def test_kv_push_router_bindings(request, runtime_services):
@pytest.mark.pre_merge @pytest.mark.pre_merge
def test_indexers_sync(request, runtime_services): @pytest.mark.model(MODEL_NAME)
def test_indexers_sync(request, runtime_services, predownload_tokenizers):
""" """
Test that two KV routers have synchronized indexer states after processing requests. Test that two KV routers have synchronized indexer states after processing requests.
This test verifies that both routers converge to the same internal state. This test verifies that both routers converge to the same internal state.
...@@ -1066,7 +1074,10 @@ def test_indexers_sync(request, runtime_services): ...@@ -1066,7 +1074,10 @@ def test_indexers_sync(request, runtime_services):
@pytest.mark.pre_merge @pytest.mark.pre_merge
def test_query_instance_id_returns_worker_and_tokens(request, runtime_services): @pytest.mark.model(MODEL_NAME)
def test_query_instance_id_returns_worker_and_tokens(
request, runtime_services, predownload_tokenizers
):
""" """
Test that the KV router correctly handles query_instance_id annotation. Test that the KV router correctly handles query_instance_id annotation.
......
...@@ -4,12 +4,16 @@ ...@@ -4,12 +4,16 @@
"""Common base classes and utilities for engine tests (vLLM, TRT-LLM, etc.)""" """Common base classes and utilities for engine tests (vLLM, TRT-LLM, etc.)"""
import logging import logging
from collections.abc import Mapping
from typing import Any, Dict, Optional from typing import Any, Dict, Optional
import pytest
from tests.utils.client import send_request from tests.utils.client import send_request
from tests.utils.engine_process import EngineConfig, EngineProcess from tests.utils.engine_process import EngineConfig, EngineProcess
DEFAULT_TIMEOUT = 10 DEFAULT_TIMEOUT = 10
SERVE_TEST_DIR = "/workspace/tests/serve"
def run_serve_deployment( def run_serve_deployment(
...@@ -58,3 +62,16 @@ def run_serve_deployment( ...@@ -58,3 +62,16 @@ def run_serve_deployment(
method=payload_item.method, method=payload_item.method,
) )
server_process.check_response(payload_item, response) server_process.check_response(payload_item, response)
def params_with_model_mark(configs: Mapping[str, EngineConfig]):
"""Return pytest params for a config dict, adding a model marker per param.
This enables simple model collection after pytest filtering.
"""
params = []
for config_name, cfg in configs.items():
marks = list(getattr(cfg, "marks", []))
marks.append(pytest.mark.model(cfg.model))
params.append(pytest.param(config_name, marks=marks))
return params
...@@ -7,7 +7,11 @@ from dataclasses import dataclass, field ...@@ -7,7 +7,11 @@ from dataclasses import dataclass, field
import pytest import pytest
from tests.serve.common import run_serve_deployment from tests.serve.common import (
SERVE_TEST_DIR,
params_with_model_mark,
run_serve_deployment,
)
from tests.utils.engine_process import EngineConfig from tests.utils.engine_process import EngineConfig
from tests.utils.payload_builder import chat_payload_default, completion_payload_default from tests.utils.payload_builder import chat_payload_default, completion_payload_default
...@@ -26,7 +30,7 @@ sglang_dir = os.environ.get("SGLANG_DIR", "/workspace/components/backends/sglang ...@@ -26,7 +30,7 @@ sglang_dir = os.environ.get("SGLANG_DIR", "/workspace/components/backends/sglang
sglang_configs = { sglang_configs = {
"aggregated": SGLangConfig( "aggregated": SGLangConfig(
name="aggregated", name="aggregated",
directory="/workspace/tests/serve", directory=SERVE_TEST_DIR,
script_name="sglang_agg.sh", script_name="sglang_agg.sh",
marks=[pytest.mark.gpu_1], marks=[pytest.mark.gpu_1],
model="deepseek-ai/DeepSeek-R1-Distill-Llama-8B", model="deepseek-ai/DeepSeek-R1-Distill-Llama-8B",
...@@ -67,13 +71,7 @@ sglang_configs = { ...@@ -67,13 +71,7 @@ sglang_configs = {
} }
@pytest.fixture( @pytest.fixture(params=params_with_model_mark(sglang_configs))
params=[
pytest.param("aggregated", marks=[pytest.mark.gpu_1]),
pytest.param("disaggregated", marks=[pytest.mark.gpu_2]),
pytest.param("kv_events", marks=[pytest.mark.gpu_2]),
]
)
def sglang_config_test(request): def sglang_config_test(request):
"""Fixture that provides different SGLang test configurations""" """Fixture that provides different SGLang test configurations"""
return sglang_configs[request.param] return sglang_configs[request.param]
...@@ -81,7 +79,9 @@ def sglang_config_test(request): ...@@ -81,7 +79,9 @@ def sglang_config_test(request):
@pytest.mark.e2e @pytest.mark.e2e
@pytest.mark.sglang @pytest.mark.sglang
def test_sglang_deployment(sglang_config_test, request, runtime_services): def test_sglang_deployment(
sglang_config_test, request, runtime_services, predownload_models
):
"""Test SGLang deployment scenarios using common helpers""" """Test SGLang deployment scenarios using common helpers"""
config = sglang_config_test config = sglang_config_test
run_serve_deployment(config, request) run_serve_deployment(config, request)
...@@ -90,7 +90,7 @@ def test_sglang_deployment(sglang_config_test, request, runtime_services): ...@@ -90,7 +90,7 @@ def test_sglang_deployment(sglang_config_test, request, runtime_services):
@pytest.mark.skip( @pytest.mark.skip(
reason="Requires 4 GPUs - enable when hardware is consistently available" reason="Requires 4 GPUs - enable when hardware is consistently available"
) )
def test_sglang_disagg_dp_attention(request, runtime_services): def test_sglang_disagg_dp_attention(request, runtime_services, predownload_models):
"""Test sglang disaggregated with DP attention (requires 4 GPUs)""" """Test sglang disaggregated with DP attention (requires 4 GPUs)"""
# Kept for reference; this test uses a different launch path and is skipped # Kept for reference; this test uses a different launch path and is skipped
...@@ -7,7 +7,7 @@ from dataclasses import dataclass, field ...@@ -7,7 +7,7 @@ from dataclasses import dataclass, field
import pytest import pytest
from tests.serve.common import run_serve_deployment from tests.serve.common import params_with_model_mark, run_serve_deployment
from tests.utils.engine_process import EngineConfig from tests.utils.engine_process import EngineConfig
from tests.utils.payload_builder import chat_payload_default, completion_payload_default from tests.utils.payload_builder import chat_payload_default, completion_payload_default
...@@ -84,12 +84,7 @@ trtllm_configs = { ...@@ -84,12 +84,7 @@ trtllm_configs = {
} }
@pytest.fixture( @pytest.fixture(params=params_with_model_mark(trtllm_configs))
params=[
pytest.param(config_name, marks=config.marks)
for config_name, config in trtllm_configs.items()
]
)
def trtllm_config_test(request): def trtllm_config_test(request):
"""Fixture that provides different trtllm test configurations""" """Fixture that provides different trtllm test configurations"""
return trtllm_configs[request.param] return trtllm_configs[request.param]
...@@ -97,7 +92,7 @@ def trtllm_config_test(request): ...@@ -97,7 +92,7 @@ def trtllm_config_test(request):
@pytest.mark.trtllm_marker @pytest.mark.trtllm_marker
@pytest.mark.e2e @pytest.mark.e2e
def test_deployment(trtllm_config_test, request, runtime_services): def test_deployment(trtllm_config_test, request, runtime_services, predownload_models):
""" """
Test dynamo deployments with different configurations. Test dynamo deployments with different configurations.
""" """
...@@ -110,9 +105,8 @@ def test_deployment(trtllm_config_test, request, runtime_services): ...@@ -110,9 +105,8 @@ def test_deployment(trtllm_config_test, request, runtime_services):
@pytest.mark.e2e @pytest.mark.e2e
@pytest.mark.gpu_1 @pytest.mark.gpu_1
@pytest.mark.trtllm_marker @pytest.mark.trtllm_marker
@pytest.mark.slow
def test_chat_only_aggregated_with_test_logits_processor( def test_chat_only_aggregated_with_test_logits_processor(
request, runtime_services, monkeypatch request, runtime_services, predownload_models, monkeypatch
): ):
""" """
Run a single aggregated chat-completions test using Qwen 0.6B with the Run a single aggregated chat-completions test using Qwen 0.6B with the
......
...@@ -7,7 +7,7 @@ from dataclasses import dataclass, field ...@@ -7,7 +7,7 @@ from dataclasses import dataclass, field
import pytest import pytest
from tests.serve.common import run_serve_deployment from tests.serve.common import params_with_model_mark, run_serve_deployment
from tests.utils.engine_process import EngineConfig from tests.utils.engine_process import EngineConfig
from tests.utils.payload_builder import ( from tests.utils.payload_builder import (
chat_payload, chat_payload,
...@@ -185,12 +185,7 @@ vllm_configs = { ...@@ -185,12 +185,7 @@ vllm_configs = {
} }
@pytest.fixture( @pytest.fixture(params=params_with_model_mark(vllm_configs))
params=[
pytest.param(config_name, marks=config.marks)
for config_name, config in vllm_configs.items()
]
)
def vllm_config_test(request): def vllm_config_test(request):
"""Fixture that provides different vLLM test configurations""" """Fixture that provides different vLLM test configurations"""
return vllm_configs[request.param] return vllm_configs[request.param]
...@@ -198,7 +193,9 @@ def vllm_config_test(request): ...@@ -198,7 +193,9 @@ def vllm_config_test(request):
@pytest.mark.vllm @pytest.mark.vllm
@pytest.mark.e2e @pytest.mark.e2e
def test_serve_deployment(vllm_config_test, request, runtime_services): def test_serve_deployment(
vllm_config_test, request, runtime_services, predownload_models
):
""" """
Test dynamo serve deployments with different graph configurations. Test dynamo serve deployments with different graph configurations.
""" """
......
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""Shared test constants.
Centralize model identifiers and other shared constants for tests to
avoid importing from conftest and to keep values consistent.
"""
import os
QWEN = "Qwen/Qwen3-0.6B"
LLAMA = "deepseek-ai/DeepSeek-R1-Distill-Llama-8B" # on an l4 gpu, must limit --max-seq-len, otherwise it will not fit
TEST_MODELS = [
QWEN,
LLAMA,
]
# Env-driven defaults for specific test groups
# Allows overriding via environment variables
ROUTER_MODEL_NAME = os.environ.get("ROUTER_MODEL_NAME", QWEN)
FAULT_TOLERANCE_MODEL_NAME = os.environ.get("FAULT_TOLERANCE_MODEL_NAME", QWEN)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment