Unverified Commit df7e1271 authored by Andreas Karatzas's avatar Andreas Karatzas Committed by GitHub
Browse files

[ROCm][CI] Fix engine core client tests for ROCm spawn multiprocessing (#32061)


Signed-off-by: default avatarAndreas Karatzas <akaratza@amd.com>
parent 44c34f22
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
import asyncio import asyncio
import importlib import importlib
import inspect
import os import os
import signal import signal
import time import time
...@@ -47,6 +48,7 @@ MODEL_NAME = "meta-llama/Llama-3.2-1B-Instruct" ...@@ -47,6 +48,7 @@ MODEL_NAME = "meta-llama/Llama-3.2-1B-Instruct"
TOKENIZER = AutoTokenizer.from_pretrained(MODEL_NAME) TOKENIZER = AutoTokenizer.from_pretrained(MODEL_NAME)
PROMPT = "Hello my name is Robert and I love quantization kernels" PROMPT = "Hello my name is Robert and I love quantization kernels"
PROMPT_TOKENS = TOKENIZER(PROMPT).input_ids PROMPT_TOKENS = TOKENIZER(PROMPT).input_ids
TEST_MODULE = "tests.v1.engine.test_engine_core_client"
_REQUEST_COUNTER = 0 _REQUEST_COUNTER = 0
...@@ -223,10 +225,170 @@ def echo(self, msg: str, err_msg: str | None = None, sleep: float | None = None) ...@@ -223,10 +225,170 @@ def echo(self, msg: str, err_msg: str | None = None, sleep: float | None = None)
return msg return msg
@dataclass
class TestMessage:
"""Test dataclass for verifying custom type serialization."""
message: str
# Dummy utility function to monkey-patch into engine core.
def echo_dc(
self,
msg: str,
return_list: bool = False,
) -> TestMessage | list[TestMessage]:
print(f"echo dc util function called: {msg}")
val = None if msg is None else TestMessage(msg)
# Return dataclass to verify support for returning custom types
# (for which there is special handling to make it work with msgspec).
return [val for _ in range(3)] if return_list else val
# Dummy utility function to test dict serialization with custom types.
def echo_dc_dict(
self,
msg: str,
return_dict: bool = False,
) -> TestMessage | dict[str, TestMessage]:
print(f"echo dc dict util function called: {msg}")
val = None if msg is None else TestMessage(msg)
# Return dict of dataclasses to verify support for returning dicts
# with custom value types.
if return_dict:
return {"key1": val, "key2": val, "key3": val}
else:
return val
# Dummy utility function to test nested structures with custom types.
def echo_dc_nested(
self,
msg: str,
structure_type: str = "list_of_dicts",
) -> Any:
print(f"echo dc nested util function called: {msg}, structure: {structure_type}")
val = None if msg is None else TestMessage(msg)
structures = {
"list_of_dicts": [{"a": val, "b": val}, {"c": val, "d": val}],
"dict_of_lists": {"list1": [val, val], "list2": [val, val]},
"deep_nested": {"outer": [{"inner": [val, val]}, {"inner": [val]}]},
}
return structures.get(structure_type, val)
# --- Fixtures for subprocess patching ---
# These create sitecustomize.py files that patch EngineCore in spawned
# subprocesses. This is necessary because ROCm requires 'spawn' multiprocessing
# start method, which creates fresh Python interpreters that don't inherit
# monkey-patches from the parent process.
@pytest.fixture
def subprocess_echo_patch(monkeypatch, tmp_path):
"""Create sitecustomize.py so spawned subprocesses have echo method.
This is needed because ROCm uses 'spawn' multiprocessing start method,
which creates a fresh Python interpreter that doesn't inherit monkey-patches.
By using sitecustomize.py, we ensure the patch is applied when Python starts.
"""
sc = tmp_path / "sitecustomize.py"
sc.write_text(
"\n".join(
[
"import time",
"from vllm.v1.engine.core import EngineCore",
inspect.getsource(echo),
"EngineCore.echo = echo",
]
)
)
monkeypatch.setenv(
"PYTHONPATH",
os.pathsep.join(filter(None, [str(tmp_path), os.getenv("PYTHONPATH")])),
)
@pytest.fixture
def subprocess_echo_dc_patch(monkeypatch, tmp_path):
"""Create sitecustomize.py so spawned subprocesses have echo_dc method."""
sc = tmp_path / "sitecustomize.py"
sc.write_text(
"\n".join(
[
"from dataclasses import dataclass",
"",
inspect.getsource(TestMessage),
f"TestMessage.__module__ = '{TEST_MODULE}'",
"",
"from vllm.v1.engine.core import EngineCore",
inspect.getsource(echo_dc),
"EngineCore.echo_dc = echo_dc",
]
)
)
monkeypatch.setenv(
"PYTHONPATH",
os.pathsep.join(filter(None, [str(tmp_path), os.getenv("PYTHONPATH")])),
)
@pytest.fixture
def subprocess_echo_dc_dict_patch(monkeypatch, tmp_path):
"""Create sitecustomize.py so spawned subprocesses have echo_dc_dict method."""
sc = tmp_path / "sitecustomize.py"
sc.write_text(
"\n".join(
[
"from dataclasses import dataclass",
"",
inspect.getsource(TestMessage),
f"TestMessage.__module__ = '{TEST_MODULE}'",
"",
"from vllm.v1.engine.core import EngineCore",
inspect.getsource(echo_dc_dict),
"EngineCore.echo_dc_dict = echo_dc_dict",
]
)
)
monkeypatch.setenv(
"PYTHONPATH",
os.pathsep.join(filter(None, [str(tmp_path), os.getenv("PYTHONPATH")])),
)
@pytest.fixture
def subprocess_echo_dc_nested_patch(monkeypatch, tmp_path):
"""Create sitecustomize.py so spawned subprocesses have echo_dc_nested method."""
sc = tmp_path / "sitecustomize.py"
sc.write_text(
"\n".join(
[
"from dataclasses import dataclass",
"from typing import Any",
"",
inspect.getsource(TestMessage),
f"TestMessage.__module__ = '{TEST_MODULE}'",
"",
"from vllm.v1.engine.core import EngineCore",
inspect.getsource(echo_dc_nested),
"EngineCore.echo_dc_nested = echo_dc_nested",
]
)
)
monkeypatch.setenv(
"PYTHONPATH",
os.pathsep.join(filter(None, [str(tmp_path), os.getenv("PYTHONPATH")])),
)
@create_new_process_for_each_test() @create_new_process_for_each_test()
@pytest.mark.parametrize("multiprocessing_mode", [True, False]) @pytest.mark.parametrize("multiprocessing_mode", [True, False])
def test_engine_core_client( def test_engine_core_client(
monkeypatch: pytest.MonkeyPatch, multiprocessing_mode: bool monkeypatch: pytest.MonkeyPatch,
multiprocessing_mode: bool,
subprocess_echo_patch,
): ):
with monkeypatch.context() as m: with monkeypatch.context() as m:
# Monkey-patch core engine utility function to test. # Monkey-patch core engine utility function to test.
...@@ -313,7 +475,10 @@ def test_engine_core_client( ...@@ -313,7 +475,10 @@ def test_engine_core_client(
@pytest.mark.asyncio(loop_scope="function") @pytest.mark.asyncio(loop_scope="function")
async def test_engine_core_client_asyncio(monkeypatch: pytest.MonkeyPatch): async def test_engine_core_client_asyncio(
monkeypatch: pytest.MonkeyPatch,
subprocess_echo_patch,
):
with monkeypatch.context() as m: with monkeypatch.context() as m:
# Monkey-patch core engine utility function to test. # Monkey-patch core engine utility function to test.
m.setattr(EngineCore, "echo", echo, raising=False) m.setattr(EngineCore, "echo", echo, raising=False)
...@@ -406,66 +571,10 @@ async def test_engine_core_client_asyncio(monkeypatch: pytest.MonkeyPatch): ...@@ -406,66 +571,10 @@ async def test_engine_core_client_asyncio(monkeypatch: pytest.MonkeyPatch):
client.shutdown() client.shutdown()
@dataclass
class MyDataclass:
message: str
# Dummy utility function to monkey-patch into engine core.
def echo_dc(
self,
msg: str,
return_list: bool = False,
) -> MyDataclass | list[MyDataclass]:
print(f"echo dc util function called: {msg}")
val = None if msg is None else MyDataclass(msg)
# Return dataclass to verify support for returning custom types
# (for which there is special handling to make it work with msgspec).
return [val for _ in range(3)] if return_list else val
# Dummy utility function to test dict serialization with custom types.
def echo_dc_dict(
self,
msg: str,
return_dict: bool = False,
) -> MyDataclass | dict[str, MyDataclass]:
print(f"echo dc dict util function called: {msg}")
val = None if msg is None else MyDataclass(msg)
# Return dict of dataclasses to verify support for returning dicts
# with custom value types.
if return_dict:
return {"key1": val, "key2": val, "key3": val}
else:
return val
# Dummy utility function to test nested structures with custom types.
def echo_dc_nested(
self,
msg: str,
structure_type: str = "list_of_dicts",
) -> Any:
print(f"echo dc nested util function called: {msg}, structure: {structure_type}")
val = None if msg is None else MyDataclass(msg)
if structure_type == "list_of_dicts": # noqa
# Return list of dicts: [{"a": val, "b": val}, {"c": val, "d": val}]
return [{"a": val, "b": val}, {"c": val, "d": val}]
elif structure_type == "dict_of_lists":
# Return dict of lists: {"list1": [val, val], "list2": [val, val]}
return {"list1": [val, val], "list2": [val, val]}
elif structure_type == "deep_nested":
# Return deeply nested: {"outer": [{"inner": [val, val]},
# {"inner": [val]}]}
return {"outer": [{"inner": [val, val]}, {"inner": [val]}]}
else:
return val
@pytest.mark.asyncio(loop_scope="function") @pytest.mark.asyncio(loop_scope="function")
async def test_engine_core_client_util_method_custom_return( async def test_engine_core_client_util_method_custom_return(
monkeypatch: pytest.MonkeyPatch, monkeypatch: pytest.MonkeyPatch,
subprocess_echo_dc_patch,
): ):
with monkeypatch.context() as m: with monkeypatch.context() as m:
# Must set insecure serialization to allow returning custom types. # Must set insecure serialization to allow returning custom types.
...@@ -494,10 +603,10 @@ async def test_engine_core_client_util_method_custom_return( ...@@ -494,10 +603,10 @@ async def test_engine_core_client_util_method_custom_return(
core_client: AsyncMPClient = client core_client: AsyncMPClient = client
result = await core_client.call_utility_async("echo_dc", "testarg2", False) result = await core_client.call_utility_async("echo_dc", "testarg2", False)
assert isinstance(result, MyDataclass) and result.message == "testarg2" assert isinstance(result, TestMessage) and result.message == "testarg2"
result = await core_client.call_utility_async("echo_dc", "testarg2", True) result = await core_client.call_utility_async("echo_dc", "testarg2", True)
assert isinstance(result, list) and all( assert isinstance(result, list) and all(
isinstance(r, MyDataclass) and r.message == "testarg2" for r in result isinstance(r, TestMessage) and r.message == "testarg2" for r in result
) )
# Test returning None and list of Nones # Test returning None and list of Nones
...@@ -513,6 +622,7 @@ async def test_engine_core_client_util_method_custom_return( ...@@ -513,6 +622,7 @@ async def test_engine_core_client_util_method_custom_return(
@pytest.mark.asyncio(loop_scope="function") @pytest.mark.asyncio(loop_scope="function")
async def test_engine_core_client_util_method_custom_dict_return( async def test_engine_core_client_util_method_custom_dict_return(
monkeypatch: pytest.MonkeyPatch, monkeypatch: pytest.MonkeyPatch,
subprocess_echo_dc_dict_patch,
): ):
with monkeypatch.context() as m: with monkeypatch.context() as m:
# Must set insecure serialization to allow returning custom types. # Must set insecure serialization to allow returning custom types.
...@@ -544,7 +654,7 @@ async def test_engine_core_client_util_method_custom_dict_return( ...@@ -544,7 +654,7 @@ async def test_engine_core_client_util_method_custom_dict_return(
result = await core_client.call_utility_async( result = await core_client.call_utility_async(
"echo_dc_dict", "testarg3", False "echo_dc_dict", "testarg3", False
) )
assert isinstance(result, MyDataclass) and result.message == "testarg3" assert isinstance(result, TestMessage) and result.message == "testarg3"
# Test dict return with custom value types # Test dict return with custom value types
result = await core_client.call_utility_async( result = await core_client.call_utility_async(
...@@ -553,7 +663,7 @@ async def test_engine_core_client_util_method_custom_dict_return( ...@@ -553,7 +663,7 @@ async def test_engine_core_client_util_method_custom_dict_return(
assert isinstance(result, dict) and len(result) == 3 assert isinstance(result, dict) and len(result) == 3
for key, val in result.items(): for key, val in result.items():
assert key in ["key1", "key2", "key3"] assert key in ["key1", "key2", "key3"]
assert isinstance(val, MyDataclass) and val.message == "testarg3" assert isinstance(val, TestMessage) and val.message == "testarg3"
# Test returning dict with None values # Test returning dict with None values
result = await core_client.call_utility_async("echo_dc_dict", None, True) result = await core_client.call_utility_async("echo_dc_dict", None, True)
...@@ -569,6 +679,7 @@ async def test_engine_core_client_util_method_custom_dict_return( ...@@ -569,6 +679,7 @@ async def test_engine_core_client_util_method_custom_dict_return(
@pytest.mark.asyncio(loop_scope="function") @pytest.mark.asyncio(loop_scope="function")
async def test_engine_core_client_util_method_nested_structures( async def test_engine_core_client_util_method_nested_structures(
monkeypatch: pytest.MonkeyPatch, monkeypatch: pytest.MonkeyPatch,
subprocess_echo_dc_nested_patch,
): ):
with monkeypatch.context() as m: with monkeypatch.context() as m:
# Must set insecure serialization to allow returning custom types. # Must set insecure serialization to allow returning custom types.
...@@ -605,21 +716,21 @@ async def test_engine_core_client_util_method_nested_structures( ...@@ -605,21 +716,21 @@ async def test_engine_core_client_util_method_nested_structures(
if i == 0: if i == 0:
assert "a" in item and "b" in item assert "a" in item and "b" in item
assert ( assert (
isinstance(item["a"], MyDataclass) isinstance(item["a"], TestMessage)
and item["a"].message == "nested1" and item["a"].message == "nested1"
) )
assert ( assert (
isinstance(item["b"], MyDataclass) isinstance(item["b"], TestMessage)
and item["b"].message == "nested1" and item["b"].message == "nested1"
) )
else: else:
assert "c" in item and "d" in item assert "c" in item and "d" in item
assert ( assert (
isinstance(item["c"], MyDataclass) isinstance(item["c"], TestMessage)
and item["c"].message == "nested1" and item["c"].message == "nested1"
) )
assert ( assert (
isinstance(item["d"], MyDataclass) isinstance(item["d"], TestMessage)
and item["d"].message == "nested1" and item["d"].message == "nested1"
) )
...@@ -632,7 +743,7 @@ async def test_engine_core_client_util_method_nested_structures( ...@@ -632,7 +743,7 @@ async def test_engine_core_client_util_method_nested_structures(
for key, lst in result.items(): for key, lst in result.items():
assert isinstance(lst, list) and len(lst) == 2 assert isinstance(lst, list) and len(lst) == 2
for item in lst: for item in lst:
assert isinstance(item, MyDataclass) and item.message == "nested2" assert isinstance(item, TestMessage) and item.message == "nested2"
# Test deeply nested: {"outer": [{"inner": [val, val]}, # Test deeply nested: {"outer": [{"inner": [val, val]},
# {"inner": [val]}]} # {"inner": [val]}]}
...@@ -649,7 +760,7 @@ async def test_engine_core_client_util_method_nested_structures( ...@@ -649,7 +760,7 @@ async def test_engine_core_client_util_method_nested_structures(
inner_list1 = inner_dict1["inner"] inner_list1 = inner_dict1["inner"]
assert isinstance(inner_list1, list) and len(inner_list1) == 2 assert isinstance(inner_list1, list) and len(inner_list1) == 2
for item in inner_list1: for item in inner_list1:
assert isinstance(item, MyDataclass) and item.message == "nested3" assert isinstance(item, TestMessage) and item.message == "nested3"
# Second dict in outer list should have "inner" with 1 item # Second dict in outer list should have "inner" with 1 item
inner_dict2 = outer_list[1] inner_dict2 = outer_list[1]
...@@ -657,7 +768,7 @@ async def test_engine_core_client_util_method_nested_structures( ...@@ -657,7 +768,7 @@ async def test_engine_core_client_util_method_nested_structures(
inner_list2 = inner_dict2["inner"] inner_list2 = inner_dict2["inner"]
assert isinstance(inner_list2, list) and len(inner_list2) == 1 assert isinstance(inner_list2, list) and len(inner_list2) == 1
assert ( assert (
isinstance(inner_list2[0], MyDataclass) isinstance(inner_list2[0], TestMessage)
and inner_list2[0].message == "nested3" and inner_list2[0].message == "nested3"
) )
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment