test_sharded_state_loader.py 4.61 KB
Newer Older
1
# SPDX-License-Identifier: Apache-2.0
2
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
3

4
import fnmatch
5
import multiprocessing as mp
6
7
8
9
10
11
12
13
14
import os
import shutil
from tempfile import TemporaryDirectory

import pytest
import torch
from huggingface_hub import snapshot_download

from vllm import LLM, SamplingParams
15
from vllm.model_executor.model_loader import ShardedStateLoader
16
17
18
19
20
21
22
23
24
25

prompts = [
    "Hello, my name is",
    "The president of the United States is",
    "The capital of France is",
    "The future of AI is",
]

# Create a sampling params object.
sampling_params = SamplingParams(
26
    temperature=0,
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
    max_tokens=256,
    ignore_eos=True,
)


def test_filter_subtensors():
    state_dict = {
        "a": torch.empty(2),
        "b": torch.empty((2, 4)),
        "c": torch.empty((2, 4, 8)),
    }
    state_dict.update({
        "x": state_dict["b"],
        "y": state_dict["c"][1, 2, :],
        "z": state_dict["c"][1, :, 4],
    })
    filtered_state_dict = ShardedStateLoader._filter_subtensors(state_dict)
    assert tuple(filtered_state_dict.keys()) == ("a", "b", "c")
    for key, tensor in filtered_state_dict.items():
46
        # NOTE: don't use `equal` here, as the tensor might contain NaNs
47
        assert tensor is state_dict[key]
48
49


50
@pytest.fixture(scope="module")
51
def llama_3p2_1b_files():
52
53
    input_dir = snapshot_download("meta-llama/Llama-3.2-1B-Instruct",
                                  ignore_patterns=["*.bin*", "original/*"])
54

55
    yield input_dir
56
57
58
59
60
61
62
63


def _run_writer(input_dir, output_dir, weights_patterns, **kwargs):
    llm_sharded_writer = LLM(model=input_dir, **kwargs)

    # Dump worker states to output directory
    llm_sharded_writer.llm_engine.model_executor.save_sharded_state(
        path=output_dir)
64

65
66
    # Copy metadata files to output directory
    for file in os.listdir(input_dir):
67
        if os.path.isdir(os.path.join(input_dir, file)):
68
69
70
71
            shutil.copytree(os.path.join(input_dir, file),
                            os.path.join(output_dir, file))
        elif not any(fnmatch.fnmatch(file, ext) for ext in weights_patterns):
            shutil.copy(os.path.join(input_dir, file), output_dir)
72
73
74
75
76
77
78
79
80
81


def _run_generate(input_dir, queue: mp.Queue, **kwargs):
    llm = LLM(model=input_dir, **kwargs)
    gen = llm.generate(prompts, sampling_params)
    queue.put([g.outputs[0].__dict__ for g in gen])
    queue.close()
    queue.join_thread()


82
@pytest.mark.parametrize("enable_lora", [False, True])
83
84
@pytest.mark.parametrize("tp_size", [1, 2])
def test_sharded_state_loader(enable_lora, tp_size, num_gpus_available,
85
86
                              llama_3p2_1b_files,
                              monkeypatch: pytest.MonkeyPatch):
87
88
    if num_gpus_available < tp_size:
        pytest.skip(f"Not enough GPUs for tensor parallelism {tp_size}")
89

90
91
    weights_patterns = ("*.safetensors", )
    gpu_memory_utilization = 0.8
92
    input_dir = llama_3p2_1b_files
93
    ctx = mp.get_context("spawn")
94
95
    # The interface in v1 engine has changed, run in v1 engine will hang.
    monkeypatch.setenv("VLLM_USE_V1", "0")
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135

    # Run in separate processes for memory & CUDA isolation
    with TemporaryDirectory() as output_dir:
        p = ctx.Process(target=_run_writer,
                        args=(input_dir, output_dir, weights_patterns),
                        kwargs=dict(
                            tensor_parallel_size=tp_size,
                            distributed_executor_backend="mp",
                            gpu_memory_utilization=gpu_memory_utilization,
                            enforce_eager=True,
                        ))
        p.start()
        p.join()

        queue = ctx.Queue()

        p = ctx.Process(target=_run_generate,
                        args=(input_dir, queue),
                        kwargs=dict(
                            distributed_executor_backend="mp",
                            enable_lora=enable_lora,
                            gpu_memory_utilization=gpu_memory_utilization,
                            tensor_parallel_size=tp_size,
                        ))
        p.start()
        p.join()
        out_before = queue.get()

        p = ctx.Process(target=_run_generate,
                        args=(output_dir, queue),
                        kwargs=dict(
                            distributed_executor_backend="mp",
                            enable_lora=enable_lora,
                            gpu_memory_utilization=gpu_memory_utilization,
                            tensor_parallel_size=tp_size,
                            load_format="sharded_state",
                        ))
        p.start()
        p.join()
        out_after = queue.get()
136
137

        assert out_before == out_after