conftest.py 3.13 KB
Newer Older
1
# SPDX-License-Identifier: Apache-2.0
2
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
3
4
from typing import Callable

5
6
import pytest

7
from vllm import LLM, EngineArgs
8
from vllm.distributed import cleanup_dist_env_and_memory
9
from vllm.model_executor.model_loader import tensorizer as tensorizer_mod
10
from vllm.model_executor.model_loader.tensorizer import TensorizerConfig
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from vllm.utils import get_distributed_init_method, get_ip, get_open_port
from vllm.v1.executor.abstract import UniProcExecutor
from vllm.worker.worker_base import WorkerWrapperBase

MODEL_REF = "facebook/opt-125m"


@pytest.fixture()
def model_ref():
    return MODEL_REF


@pytest.fixture(autouse=True)
def allow_insecure_serialization(monkeypatch):
    monkeypatch.setenv("VLLM_ALLOW_INSECURE_SERIALIZATION", "1")
26
27


28
@pytest.fixture(autouse=True)
29
def cleanup():
30
    cleanup_dist_env_and_memory(shutdown_ray=True)
31
32


33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@pytest.fixture()
def just_serialize_model_tensors(model_ref, monkeypatch, tmp_path):

    def noop(*args, **kwargs):
        return None

    args = EngineArgs(model=model_ref)
    tc = TensorizerConfig(tensorizer_uri=f"{tmp_path}/model.tensors")

    monkeypatch.setattr(tensorizer_mod, "serialize_extra_artifacts", noop)

    tensorizer_mod.tensorize_vllm_model(args, tc)
    yield tmp_path


48
49
50
@pytest.fixture(autouse=True)
def tensorizer_config():
    config = TensorizerConfig(tensorizer_uri="vllm")
51
    return config
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102


@pytest.fixture()
def model_path(model_ref, tmp_path):
    yield tmp_path / model_ref / "model.tensors"


def assert_from_collective_rpc(engine: LLM, closure: Callable,
                               closure_kwargs: dict):
    res = engine.collective_rpc(method=closure, kwargs=closure_kwargs)
    return all(res)


# This is an object pulled from tests/v1/engine/test_engine_core.py
# Modified to strip the `load_model` method from its `_init_executor`
# method. It's purely used as a dummy utility to run methods that test
# Tensorizer functionality
class DummyExecutor(UniProcExecutor):

    def _init_executor(self) -> None:
        """Initialize the worker and load the model.
        """
        self.driver_worker = WorkerWrapperBase(vllm_config=self.vllm_config,
                                               rpc_rank=0)
        distributed_init_method = get_distributed_init_method(
            get_ip(), get_open_port())
        local_rank = 0
        # set local rank as the device index if specified
        device_info = self.vllm_config.device_config.device.__str__().split(
            ":")
        if len(device_info) > 1:
            local_rank = int(device_info[1])
        rank = 0
        is_driver_worker = True
        kwargs = dict(
            vllm_config=self.vllm_config,
            local_rank=local_rank,
            rank=rank,
            distributed_init_method=distributed_init_method,
            is_driver_worker=is_driver_worker,
        )
        self.collective_rpc("init_worker", args=([kwargs], ))
        self.collective_rpc("init_device")

    @property
    def max_concurrent_batches(self) -> int:
        return 2

    def shutdown(self):
        if hasattr(self, 'thread_pool'):
            self.thread_pool.shutdown(wait=False)