test_hidden_states.py 4.53 KB
Newer Older
1
2
3
4
5
6
import unittest

import torch
from transformers import AutoModelForCausalLM, AutoTokenizer

import sglang as sgl
7
from sglang.test.test_utils import DEFAULT_SMALL_MODEL_NAME_FOR_TEST
8
9
10
11
12


class TestHiddenState(unittest.TestCase):
    def test_return_hidden_states(self):
        prompts = ["Today is", "Today is a sunny day and I like"]
13
        model_path = DEFAULT_SMALL_MODEL_NAME_FOR_TEST
14
15
16
        tokenizer = AutoTokenizer.from_pretrained(model_path)
        input_ids = tokenizer(prompts).input_ids

17
18
19
20
        sampling_params = {
            "temperature": 0,
            "max_new_tokens": 8,
        }
21
22
23
24
25
26

        engine = sgl.Engine(
            model_path=model_path,
            random_seed=42,
            skip_tokenizer_init=True,
        )
27
28
29
30
31
        outputs = engine.generate(
            input_ids=input_ids,
            sampling_params=sampling_params,
            return_hidden_states=True,
        )
32
33
34
35
        engine.shutdown()

        for output in outputs:
            self.assertEqual(len(output["meta_info"]["hidden_states"]), 8)
36
37
38
39
40
            for i in range(len(output["meta_info"]["hidden_states"])):
                assert isinstance(output["meta_info"]["hidden_states"][i], list)
                output["meta_info"]["hidden_states"][i] = torch.tensor(
                    output["meta_info"]["hidden_states"][i], dtype=torch.bfloat16
                )
41
42
43
44
45
46
47
48
49
50
51
52
53
54
        # Checks that splicing of the batch was done correctly
        self.assertGreater(
            outputs[1]["meta_info"]["hidden_states"][0].shape[0],
            outputs[0]["meta_info"]["hidden_states"][0].shape[0],
        )

        model = AutoModelForCausalLM.from_pretrained(
            model_path, torch_dtype=torch.bfloat16, device_map="cuda"
        )

        for input_id, output in zip(input_ids, outputs):
            with torch.inference_mode():
                hf_out = model(
                    torch.tensor(
55
                        [input_id + output["output_ids"][:-1]], device=model.device
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
                    ),
                    output_hidden_states=True,
                )
            print("=== HF Hiddens ===")
            print(hf_out["hidden_states"][-1][0])
            sg_hidden_states = torch.cat(
                [
                    i.unsqueeze(0) if len(i.shape) == 1 else i
                    for i in output["meta_info"]["hidden_states"]
                ]
            ).to("cuda")
            print("=== SRT Hiddens ===")
            print(sg_hidden_states)

            print(
                f"Max diff: {torch.max(torch.abs(hf_out['hidden_states'][-1][0] - sg_hidden_states))}"
            )

74
            atol = 0.8
75
76
77
78
79
80
81
82
83
            self.assertTrue(
                torch.allclose(
                    hf_out["hidden_states"][-1][0],
                    sg_hidden_states,
                    atol=atol,
                    rtol=0,
                )
            )

84
85
    def test_repeatedly_changes_hidden_states(self):
        prompts = ["Today is", "Today is a sunny day and I like"]
86
        model_path = DEFAULT_SMALL_MODEL_NAME_FOR_TEST
87
88
89
        tokenizer = AutoTokenizer.from_pretrained(model_path)
        input_ids = tokenizer(prompts).input_ids

90
        sampling_params = {
91
92
93
94
95
96
97
98
99
100
            "temperature": 0,
            "max_new_tokens": 8,
        }

        engine = sgl.Engine(
            model_path=model_path,
            random_seed=42,
            skip_tokenizer_init=True,
        )
        outputs_completion_first_round = engine.generate(
101
102
103
            input_ids=input_ids,
            sampling_params=sampling_params,
            return_hidden_states=True,
104
105
        )
        outputs_hidden_state = engine.generate(
106
107
108
            input_ids=input_ids,
            sampling_params=sampling_params,
            return_hidden_states=False,
109
110
111
        )

        outputs_completion_last_round = engine.generate(
112
113
114
            input_ids=input_ids,
            sampling_params=sampling_params,
            return_hidden_states=True,
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
        )
        engine.shutdown()

        for (
            output_completion_first_round,
            output_hidden_state,
            output_completion_last_round,
        ) in zip(
            outputs_completion_first_round,
            outputs_hidden_state,
            outputs_completion_last_round,
        ):
            self.assertEqual(
                len(output_completion_first_round["meta_info"]["hidden_states"]), 8
            )
            self.assertNotIn("hidden_states", output_hidden_state["meta_info"])
            self.assertEqual(
                len(output_completion_last_round["meta_info"]["hidden_states"]), 8
            )

135
136
137

if __name__ == "__main__":
    unittest.main()