test_custom_executor.py 2.65 KB
Newer Older
1
2
import asyncio
import os
3
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
4
5
6
7
8
9

import pytest

from vllm.engine.arg_utils import AsyncEngineArgs, EngineArgs
from vllm.engine.async_llm_engine import AsyncLLMEngine
from vllm.engine.llm_engine import LLMEngine
10
from vllm.executor.uniproc_executor import UniProcExecutor
11
12
13
14
15
16
17
from vllm.sampling_params import SamplingParams


class Mock:
    ...


18
class CustomUniExecutor(UniProcExecutor):
19

20
    def collective_rpc(self,
21
                       method: Union[str, Callable],
22
23
24
                       timeout: Optional[float] = None,
                       args: Tuple = (),
                       kwargs: Optional[Dict] = None) -> List[Any]:
25
26
27
        # Drop marker to show that this was ran
        with open(".marker", "w"):
            ...
28
        return super().collective_rpc(method, timeout, args, kwargs)
29
30


31
CustomUniExecutorAsync = CustomUniExecutor
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46


@pytest.mark.parametrize("model", ["facebook/opt-125m"])
def test_custom_executor_type_checking(model):
    with pytest.raises(ValueError):
        engine_args = EngineArgs(model=model,
                                 distributed_executor_backend=Mock)
        LLMEngine.from_engine_args(engine_args)
    with pytest.raises(ValueError):
        engine_args = AsyncEngineArgs(model=model,
                                      distributed_executor_backend=Mock)
        AsyncLLMEngine.from_engine_args(engine_args)


@pytest.mark.parametrize("model", ["facebook/opt-125m"])
47
def test_custom_executor(model, tmp_path):
48
    cwd = os.path.abspath(".")
49
    os.chdir(tmp_path)
50
51
52
53
    try:
        assert not os.path.exists(".marker")

        engine_args = EngineArgs(
54
            model=model, distributed_executor_backend=CustomUniExecutor)
55
56
57
58
59
60
61
62
63
64
65
66
        engine = LLMEngine.from_engine_args(engine_args)
        sampling_params = SamplingParams(max_tokens=1)

        engine.add_request("0", "foo", sampling_params)
        engine.step()

        assert os.path.exists(".marker")
    finally:
        os.chdir(cwd)


@pytest.mark.parametrize("model", ["facebook/opt-125m"])
67
def test_custom_executor_async(model, tmp_path):
68
    cwd = os.path.abspath(".")
69
    os.chdir(tmp_path)
70
71
72
73
    try:
        assert not os.path.exists(".marker")

        engine_args = AsyncEngineArgs(
74
            model=model, distributed_executor_backend=CustomUniExecutorAsync)
75
76
77
78
79
80
81
82
83
84
85
86
87
        engine = AsyncLLMEngine.from_engine_args(engine_args)
        sampling_params = SamplingParams(max_tokens=1)

        async def t():
            stream = await engine.add_request("0", "foo", sampling_params)
            async for x in stream:
                ...

        asyncio.run(t())

        assert os.path.exists(".marker")
    finally:
        os.chdir(cwd)