"vllm/utils/argparse_utils.py" did not exist on "b3230e1ac0f5c6b46f6c099ef0b3806df8a14feb"
test_executor.py 4.65 KB
Newer Older
1
2
3
4
5
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project

import asyncio
import os
6
from collections.abc import Callable
7
from concurrent.futures import Future
8
from typing import Any
9
10
11

import pytest

12
from vllm.distributed.kv_transfer.kv_connector.utils import KVOutputAggregator
13
14
15
16
from vllm.engine.arg_utils import AsyncEngineArgs, EngineArgs
from vllm.sampling_params import SamplingParams
from vllm.v1.engine.async_llm import AsyncLLM
from vllm.v1.engine.llm_engine import LLMEngine
17
from vllm.v1.executor.abstract import Executor
18
from vllm.v1.executor.multiproc_executor import MultiprocExecutor
19
20
21
22
from vllm.v1.executor.uniproc_executor import (
    ExecutorWithExternalLauncher,
    UniProcExecutor,
)
23
24


25
class Mock: ...
26
27


28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def test_supports_async_scheduling_base_executor():
    assert Executor.supports_async_scheduling() is False


def test_supports_async_scheduling_uniproc_executor():
    assert UniProcExecutor.supports_async_scheduling() is True


def test_supports_async_scheduling_executor_with_external_launcher():
    # ExecutorWithExternalLauncher inherits from UniProcExecutor and does not
    # override supports_async_scheduling, so it should return True.
    assert ExecutorWithExternalLauncher.supports_async_scheduling() is True


def test_supports_async_scheduling_multiproc_executor():
    assert MultiprocExecutor.supports_async_scheduling() is True


46
class CustomMultiprocExecutor(MultiprocExecutor):
47
48
    def collective_rpc(
        self,
49
50
        method: str | Callable,
        timeout: float | None = None,
51
        args: tuple = (),
52
        kwargs: dict | None = None,
53
        non_block: bool = False,
54
        unique_reply_rank: int | None = None,
55
        kv_output_aggregator: KVOutputAggregator = None,
56
    ) -> Any | list[Any] | Future[Any | list[Any]]:
57
        # Drop marker to show that this was run
58
59
        with open(".marker", "w"):
            ...
60
        return super().collective_rpc(
61
62
63
64
65
66
67
            method,
            timeout,
            args,
            kwargs,
            non_block,
            unique_reply_rank,
            kv_output_aggregator,
68
        )
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84


CustomMultiprocExecutorAsync = CustomMultiprocExecutor
MODEL = "Qwen/Qwen3-0.6B"


def test_custom_executor_type_checking():
    with pytest.raises(ValueError):
        engine_args = EngineArgs(
            model=MODEL,
            gpu_memory_utilization=0.2,
            max_model_len=8192,
            distributed_executor_backend=Mock,
        )
        LLMEngine.from_engine_args(engine_args)
    with pytest.raises(ValueError):
85
86
87
88
89
90
        engine_args = AsyncEngineArgs(
            model=MODEL,
            gpu_memory_utilization=0.2,
            max_model_len=8192,
            distributed_executor_backend=Mock,
        )
91
92
93
        AsyncLLM.from_engine_args(engine_args)


94
95
96
97
98
99
100
@pytest.mark.parametrize(
    "distributed_executor_backend",
    [
        CustomMultiprocExecutor,
        "tests.v1.executor.test_executor.CustomMultiprocExecutor",
    ],
)
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def test_custom_executor(distributed_executor_backend, tmp_path):
    cwd = os.path.abspath(".")
    os.chdir(tmp_path)
    try:
        assert not os.path.exists(".marker")

        engine_args = EngineArgs(
            model=MODEL,
            gpu_memory_utilization=0.2,
            max_model_len=8192,
            distributed_executor_backend=distributed_executor_backend,
            enforce_eager=True,  # reduce test time
        )
        engine = LLMEngine.from_engine_args(engine_args)
        sampling_params = SamplingParams(max_tokens=1)

        engine.add_request("0", "foo", sampling_params)
        engine.step()

        assert os.path.exists(".marker")
    finally:
        os.chdir(cwd)


125
126
127
128
129
130
131
@pytest.mark.parametrize(
    "distributed_executor_backend",
    [
        CustomMultiprocExecutorAsync,
        "tests.v1.executor.test_executor.CustomMultiprocExecutorAsync",
    ],
)
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def test_custom_executor_async(distributed_executor_backend, tmp_path):
    cwd = os.path.abspath(".")
    os.chdir(tmp_path)
    try:
        assert not os.path.exists(".marker")

        engine_args = AsyncEngineArgs(
            model=MODEL,
            gpu_memory_utilization=0.2,
            max_model_len=8192,
            distributed_executor_backend=distributed_executor_backend,
            enforce_eager=True,  # reduce test time
        )
        engine = AsyncLLM.from_engine_args(engine_args)
        sampling_params = SamplingParams(max_tokens=1)

        async def t():
149
150
151
            stream = engine.generate(
                request_id="0", prompt="foo", sampling_params=sampling_params
            )
152
153
154
155
156
157
158
159
            async for x in stream:
                ...

        asyncio.run(t())

        assert os.path.exists(".marker")
    finally:
        os.chdir(cwd)