test_sharded_state_loader.py 4.32 KB
Newer Older
1
2
# SPDX-License-Identifier: Apache-2.0

3
import multiprocessing as mp
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import os
import shutil
from tempfile import TemporaryDirectory

import pytest
import torch
from huggingface_hub import snapshot_download

from vllm import LLM, SamplingParams
from vllm.model_executor.model_loader.loader import ShardedStateLoader

prompts = [
    "Hello, my name is",
    "The president of the United States is",
    "The capital of France is",
    "The future of AI is",
]

# Create a sampling params object.
sampling_params = SamplingParams(
24
    temperature=0,
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
    max_tokens=256,
    ignore_eos=True,
)


def test_filter_subtensors():
    state_dict = {
        "a": torch.empty(2),
        "b": torch.empty((2, 4)),
        "c": torch.empty((2, 4, 8)),
    }
    state_dict.update({
        "x": state_dict["b"],
        "y": state_dict["c"][1, 2, :],
        "z": state_dict["c"][1, :, 4],
    })
    filtered_state_dict = ShardedStateLoader._filter_subtensors(state_dict)
    assert tuple(filtered_state_dict.keys()) == ("a", "b", "c")
    for key, tensor in filtered_state_dict.items():
44
        # NOTE: don't use `equal` here, as the tensor might contain NaNs
45
        assert tensor is state_dict[key]
46
47


48
49
50
@pytest.fixture(scope="module")
def llama_2_7b_files():
    with TemporaryDirectory() as cache_dir:
51
        input_dir = snapshot_download("meta-llama/Llama-3.2-1B",
52
                                      cache_dir=cache_dir,
53
54
                                      ignore_patterns=["*.bin*", "original/*"])

55
56
57
58
59
60
61
62
63
        yield input_dir


def _run_writer(input_dir, output_dir, weights_patterns, **kwargs):
    llm_sharded_writer = LLM(model=input_dir, **kwargs)

    # Dump worker states to output directory
    llm_sharded_writer.llm_engine.model_executor.save_sharded_state(
        path=output_dir)
64

65
66
    # Copy metadata files to output directory
    for file in os.listdir(input_dir):
67
68
69
        if not any(
                file.endswith(ext) and not os.path.isdir(file)
                for ext in weights_patterns):
70
71
72
73
74
75
76
77
78
79
80
            shutil.copy(f"{input_dir}/{file}", output_dir)


def _run_generate(input_dir, queue: mp.Queue, **kwargs):
    llm = LLM(model=input_dir, **kwargs)
    gen = llm.generate(prompts, sampling_params)
    queue.put([g.outputs[0].__dict__ for g in gen])
    queue.close()
    queue.join_thread()


81
@pytest.mark.parametrize("enable_lora", [False, True])
82
83
84
85
86
@pytest.mark.parametrize("tp_size", [1, 2])
def test_sharded_state_loader(enable_lora, tp_size, num_gpus_available,
                              llama_2_7b_files):
    if num_gpus_available < tp_size:
        pytest.skip(f"Not enough GPUs for tensor parallelism {tp_size}")
87

88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
    weights_patterns = ("*.safetensors", )
    gpu_memory_utilization = 0.8
    input_dir = llama_2_7b_files
    ctx = mp.get_context("spawn")

    # Run in separate processes for memory & CUDA isolation
    with TemporaryDirectory() as output_dir:
        p = ctx.Process(target=_run_writer,
                        args=(input_dir, output_dir, weights_patterns),
                        kwargs=dict(
                            tensor_parallel_size=tp_size,
                            distributed_executor_backend="mp",
                            gpu_memory_utilization=gpu_memory_utilization,
                            enforce_eager=True,
                        ))
        p.start()
        p.join()

        queue = ctx.Queue()

        p = ctx.Process(target=_run_generate,
                        args=(input_dir, queue),
                        kwargs=dict(
                            distributed_executor_backend="mp",
                            enable_lora=enable_lora,
                            gpu_memory_utilization=gpu_memory_utilization,
                            tensor_parallel_size=tp_size,
                        ))
        p.start()
        p.join()
        out_before = queue.get()

        p = ctx.Process(target=_run_generate,
                        args=(output_dir, queue),
                        kwargs=dict(
                            distributed_executor_backend="mp",
                            enable_lora=enable_lora,
                            gpu_memory_utilization=gpu_memory_utilization,
                            tensor_parallel_size=tp_size,
                            load_format="sharded_state",
                        ))
        p.start()
        p.join()
        out_after = queue.get()
132
133

        assert out_before == out_after