Unverified Commit d821a8b9 authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

chore: parallelize planner profile tests + bindings test cleanup (#4532)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent 65f18884
......@@ -29,10 +29,7 @@ pytestmark = pytest.mark.pre_merge
@pytest.fixture
async def distributed_runtime():
"""Function-scoped runtime fixture for use with @pytest.mark.forked tests.
Each test gets its own runtime in a forked process to avoid singleton conflicts.
"""
"""Function-scoped runtime fixture for distributed runtime tests."""
loop = asyncio.get_running_loop()
runtime = DistributedRuntime(loop, "etcd", "nats")
yield runtime
......@@ -40,7 +37,6 @@ async def distributed_runtime():
@pytest.mark.asyncio
@pytest.mark.forked
async def test_radix_tree_binding(distributed_runtime):
"""Test RadixTree binding directly with store event and find matches"""
import json
......@@ -107,7 +103,6 @@ async def test_radix_tree_binding(distributed_runtime):
@pytest.mark.asyncio
@pytest.mark.forked
@pytest.mark.parametrize("num_threads", [2, 3, 5, 128])
@pytest.mark.parametrize("prepopulate_worker_ids", [True, False])
@pytest.mark.parametrize("expiration_duration_secs", [None])
......@@ -209,15 +204,7 @@ async def test_radix_tree_thread_safety(
), f"Expected {expected_blocks_after_removal} block events after removal, got {len(blocks_after_removal)}"
# TODO Figure out how to test with different kv_block_size
# Right now I get an error in EventPublisher init when I run this test
# back to back. It occurs when calling dynamo_llm_init and I think is related to the
# OnceCell initializations not being reset.
# The test works individually if I run it with 32, then 11, then 64.
# @pytest.mark.parametrize("kv_block_size", [11, 32, 64])
@pytest.mark.asyncio
@pytest.mark.forked
@pytest.mark.skip(reason="Flakey in CI. Likely race condition going on.")
async def test_event_handler(distributed_runtime):
kv_block_size = 32
namespace = "kv_test"
......@@ -225,7 +212,10 @@ async def test_event_handler(distributed_runtime):
kv_listener = distributed_runtime.namespace(namespace).component(component)
# publisher
worker_id = 233
# Get actual worker_id from component (KvEventPublisher ignores the passed worker_id and uses component's connection_id)
# Create a dummy endpoint to access connection_id since Component doesn't expose it directly
dummy_endpoint = kv_listener.endpoint("dummy")
worker_id = dummy_endpoint.connection_id()
event_publisher = EventPublisher(kv_listener, worker_id, kv_block_size)
# indexer
......@@ -237,44 +227,26 @@ async def test_event_handler(distributed_runtime):
assert not scores.scores
event_publisher.store_event(test_token, lora_id)
# wait for the event to be processed as it is sent asynchronously
# Retry loop for CI environments where processing may take longer
# Wait for the event to be processed (sent asynchronously)
await asyncio.sleep(0.2)
scores = await indexer.find_matches_for_request(test_token, lora_id)
worker_key = (worker_id, 0) # (worker_id, dp_rank)
for retry in range(10): # Try up to 10 times
await asyncio.sleep(0.5) # Wait 500ms between retries
scores = await indexer.find_matches_for_request(test_token, lora_id)
if (
scores.scores
and worker_key in scores.scores
and scores.scores[worker_key] == 1
):
break
if retry == 9: # Last iteration
# Provide detailed error message for debugging
assert scores.scores, f"No scores found after {(retry+1)*0.5}s"
assert (
worker_key in scores.scores
), f"Worker {worker_key} not in scores after {(retry+1)*0.5}s"
assert (
scores.scores[worker_key] == 1
), f"Expected score 1, got {scores.scores.get(worker_key)} after {(retry+1)*0.5}s"
# remove event
assert scores.scores, "No scores found"
assert worker_key in scores.scores, f"Worker {worker_key} not found in scores"
assert (
scores.scores[worker_key] == 1
), f"Expected score 1, got {scores.scores[worker_key]}"
# Remove event and verify
event_publisher.remove_event()
# Retry loop for event removal verification
for retry in range(10): # Try up to 10 times
await asyncio.sleep(0.5) # Wait 500ms between retries
scores = await indexer.find_matches_for_request(test_token, lora_id)
if not scores.scores:
break
if retry == 9: # Last iteration
assert (
not scores.scores
), f"Scores still present after {(retry+1)*0.5}s: {scores.scores}"
await asyncio.sleep(0.2)
scores = await indexer.find_matches_for_request(test_token, lora_id)
assert not scores.scores, f"Scores still present: {scores.scores}"
@pytest.mark.asyncio
@pytest.mark.forked
async def test_approx_kv_indexer(distributed_runtime):
kv_block_size = 32
namespace = "kv_test"
......
......@@ -5,15 +5,16 @@
import os
import pytest
import uvloop
from dynamo.llm import ModelInput, ModelRuntimeConfig, ModelType, register_llm
from dynamo.runtime import DistributedRuntime, dynamo_worker
from dynamo.runtime import DistributedRuntime
TEST_END_TO_END = os.environ.get("TEST_END_TO_END", 0)
@dynamo_worker()
@pytest.mark.asyncio
async def test_register(runtime: DistributedRuntime):
component = runtime.namespace("test").component("tensor")
......
......@@ -37,15 +37,16 @@ class TestProfileSlaAiconfigurator:
"""Test class for profile_sla aiconfigurator functionality."""
@pytest.fixture
def trtllm_args(self):
def trtllm_args(self, request):
class Args:
def __init__(self):
self.model = ""
self.dgd_image = ""
self.backend = "trtllm"
self.config = "examples/backends/trtllm/deploy/disagg.yaml"
self.output_dir = "/tmp/test_profiling_results"
self.namespace = "test-namespace"
# Use unique output directory per test for parallel execution
self.output_dir = f"/tmp/test_profiling_results_{request.node.name}"
self.namespace = f"test-namespace-{request.node.name}"
self.min_num_gpus_per_engine = 1
self.max_num_gpus_per_engine = 8
self.skip_existing_results = False
......@@ -76,6 +77,7 @@ class TestProfileSlaAiconfigurator:
return Args()
@pytest.mark.pre_merge
@pytest.mark.parallel
@pytest.mark.asyncio
@pytest.mark.parametrize("missing_arg", ["aic_system", "aic_hf_id"])
async def test_aiconfigurator_missing_args(self, trtllm_args, missing_arg):
......@@ -86,6 +88,7 @@ class TestProfileSlaAiconfigurator:
await run_profile(trtllm_args)
@pytest.mark.pre_merge
@pytest.mark.parallel
@pytest.mark.asyncio
@pytest.mark.parametrize(
"arg_name, bad_value",
......@@ -103,11 +106,13 @@ class TestProfileSlaAiconfigurator:
await run_profile(trtllm_args)
@pytest.mark.pre_merge
@pytest.mark.parallel
@pytest.mark.asyncio
async def test_trtllm_aiconfigurator_single_model(self, trtllm_args):
# Test that profile_sla works with the model & backend in the trtllm_args fixture.
await run_profile(trtllm_args)
@pytest.mark.parallel
@pytest.mark.asyncio
@pytest.mark.parametrize(
"backend, aic_backend_version",
......
......@@ -41,15 +41,16 @@ class TestProfileSLADryRun:
"""Test class for profile_sla dry-run functionality."""
@pytest.fixture
def vllm_args(self):
def vllm_args(self, request):
"""Create arguments for vllm backend dry-run test."""
class Args:
def __init__(self):
self.backend = "vllm"
self.config = "examples/backends/vllm/deploy/disagg.yaml"
self.output_dir = "/tmp/test_profiling_results"
self.namespace = "test-namespace"
# Use unique output directory per test for parallel execution
self.output_dir = f"/tmp/test_profiling_results_{request.node.name}"
self.namespace = f"test-namespace-{request.node.name}"
self.model = ""
self.dgd_image = ""
self.min_num_gpus_per_engine = 1
......@@ -83,15 +84,16 @@ class TestProfileSLADryRun:
return Args()
@pytest.fixture
def sglang_args(self):
def sglang_args(self, request):
"""Create arguments for sglang backend dry-run test."""
class Args:
def __init__(self):
self.backend = "sglang"
self.config = "examples/backends/sglang/deploy/disagg.yaml"
self.output_dir = "/tmp/test_profiling_results"
self.namespace = "test-namespace"
# Use unique output directory per test for parallel execution
self.output_dir = f"/tmp/test_profiling_results_{request.node.name}"
self.namespace = f"test-namespace-{request.node.name}"
self.model = ""
self.dgd_image = ""
self.min_num_gpus_per_engine = 1
......@@ -124,6 +126,7 @@ class TestProfileSLADryRun:
return Args()
@pytest.mark.pre_merge
@pytest.mark.parallel
@pytest.mark.asyncio
async def test_vllm_dryrun(self, vllm_args):
"""Test that profile_sla dry-run works for vllm backend with disagg.yaml config."""
......@@ -131,6 +134,7 @@ class TestProfileSLADryRun:
await run_profile(vllm_args)
@pytest.mark.pre_merge
@pytest.mark.parallel
@pytest.mark.asyncio
async def test_sglang_dryrun(self, sglang_args):
"""Test that profile_sla dry-run works for sglang backend with disagg.yaml config."""
......@@ -138,15 +142,16 @@ class TestProfileSLADryRun:
await run_profile(sglang_args)
@pytest.fixture
def trtllm_args(self):
def trtllm_args(self, request):
"""Create arguments for trtllm backend dry-run test."""
class Args:
def __init__(self):
self.backend = "trtllm"
self.config = "examples/backends/trtllm/deploy/disagg.yaml"
self.output_dir = "/tmp/test_profiling_results"
self.namespace = "test-namespace"
# Use unique output directory per test for parallel execution
self.output_dir = f"/tmp/test_profiling_results_{request.node.name}"
self.namespace = f"test-namespace-{request.node.name}"
self.model = ""
self.dgd_image = ""
self.min_num_gpus_per_engine = 1
......@@ -179,6 +184,7 @@ class TestProfileSLADryRun:
return Args()
@pytest.mark.pre_merge
@pytest.mark.parallel
@pytest.mark.asyncio
async def test_trtllm_dryrun(self, trtllm_args):
"""Test that profile_sla dry-run works for trtllm backend with disagg.yaml config."""
......@@ -186,15 +192,16 @@ class TestProfileSLADryRun:
await run_profile(trtllm_args)
@pytest.fixture
def sglang_moe_args(self):
def sglang_moe_args(self, request):
"""Create arguments for trtllm backend dry-run test."""
class Args:
def __init__(self):
self.backend = "sglang"
self.config = "recipes/deepseek-r1/sglang/disagg-16gpu/deploy.yaml"
self.output_dir = "/tmp/test_profiling_results"
self.namespace = "test-namespace"
# Use unique output directory per test for parallel execution
self.output_dir = f"/tmp/test_profiling_results_{request.node.name}"
self.namespace = f"test-namespace-{request.node.name}"
self.model = ""
self.dgd_image = ""
self.min_num_gpus_per_engine = 8
......@@ -228,6 +235,7 @@ class TestProfileSLADryRun:
return Args()
@pytest.mark.pre_merge
@pytest.mark.parallel
@pytest.mark.asyncio
async def test_sglang_moe_dryrun(self, sglang_moe_args):
"""Test that profile_sla dry-run works for sglang backend with MoE config."""
......@@ -255,15 +263,16 @@ class TestProfileSLADryRun:
)
@pytest.fixture
def vllm_args_with_model_autogen(self):
def vllm_args_with_model_autogen(self, request):
"""Create arguments for vllm backend with model-based search space autogeneration."""
class Args:
def __init__(self):
self.backend = "vllm"
self.config = ""
self.output_dir = "/tmp/test_profiling_results"
self.namespace = "test-namespace"
# Use unique output directory per test for parallel execution
self.output_dir = f"/tmp/test_profiling_results_{request.node.name}"
self.namespace = f"test-namespace-{request.node.name}"
self.model = "deepseek-ai/DeepSeek-R1-Distill-Llama-8B" # Specify model for autogen
self.dgd_image = ""
# Set to 0 to trigger auto-generation path
......@@ -293,6 +302,7 @@ class TestProfileSLADryRun:
return Args()
@pytest.mark.pre_merge
@pytest.mark.parallel
@pytest.mark.asyncio
@patch("benchmarks.profiler.utils.search_space_autogen.get_gpu_summary")
@patch("benchmarks.profiler.utils.search_space_autogen.get_model_info")
......@@ -319,15 +329,16 @@ class TestProfileSLADryRun:
await run_profile(vllm_args_with_model_autogen)
@pytest.fixture
def sglang_args_with_model_autogen(self):
def sglang_args_with_model_autogen(self, request):
"""Create arguments for sglang backend with model-based search space autogeneration."""
class Args:
def __init__(self):
self.backend = "sglang"
self.config = ""
self.output_dir = "/tmp/test_profiling_results"
self.namespace = "test-namespace"
# Use unique output directory per test for parallel execution
self.output_dir = f"/tmp/test_profiling_results_{request.node.name}"
self.namespace = f"test-namespace-{request.node.name}"
self.model = "deepseek-ai/DeepSeek-R1-Distill-Llama-8B" # Specify model for autogen
self.dgd_image = ""
self.min_num_gpus_per_engine = 0
......@@ -355,6 +366,7 @@ class TestProfileSLADryRun:
return Args()
@pytest.mark.pre_merge
@pytest.mark.parallel
@pytest.mark.asyncio
@patch("benchmarks.profiler.utils.search_space_autogen.get_gpu_summary")
@patch("benchmarks.profiler.utils.search_space_autogen.get_model_info")
......@@ -381,15 +393,16 @@ class TestProfileSLADryRun:
await run_profile(sglang_args_with_model_autogen)
@pytest.fixture
def trtllm_args_with_model_autogen(self):
def trtllm_args_with_model_autogen(self, request):
"""Create arguments for trtllm backend with model-based search space autogeneration."""
class Args:
def __init__(self):
self.backend = "trtllm"
self.config = ""
self.output_dir = "/tmp/test_profiling_results"
self.namespace = "test-namespace"
# Use unique output directory per test for parallel execution
self.output_dir = f"/tmp/test_profiling_results_{request.node.name}"
self.namespace = f"test-namespace-{request.node.name}"
self.model = "deepseek-ai/DeepSeek-R1-Distill-Llama-8B" # Specify model for autogen
self.dgd_image = ""
self.min_num_gpus_per_engine = 0
......@@ -417,6 +430,7 @@ class TestProfileSLADryRun:
return Args()
@pytest.mark.pre_merge
@pytest.mark.parallel
@pytest.mark.asyncio
@patch("benchmarks.profiler.utils.search_space_autogen.get_gpu_summary")
@patch("benchmarks.profiler.utils.search_space_autogen.get_model_info")
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment