test_disaggregation_different_tp.py 5.63 KB
Newer Older
1
2
3
4
import os
import time
import unittest
from types import SimpleNamespace
5
from urllib.parse import urlparse
6
7

from sglang.test.few_shot_gsm8k import run_eval as run_eval_few_shot_gsm8k
8
from sglang.test.test_disaggregation_utils import TestDisaggregationBase
9
10
11
12
13
14
15
16
from sglang.test.test_utils import (
    DEFAULT_MODEL_NAME_FOR_TEST_MLA,
    DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH,
    DEFAULT_URL_FOR_TEST,
    popen_launch_pd_server,
)


17
class TestDisaggregationMooncakePrefillLargerTP(TestDisaggregationBase):
18
19
20
21
22
23
24
    @classmethod
    def setUpClass(cls):
        # Temporarily disable JIT DeepGEMM
        cls.original_jit_deepgemm = os.environ.get("SGL_ENABLE_JIT_DEEPGEMM")
        os.environ["SGL_ENABLE_JIT_DEEPGEMM"] = "false"

        cls.model = DEFAULT_MODEL_NAME_FOR_TEST_MLA
25
26
27
28
29
30
31
32
33
34
        parsed_url = urlparse(DEFAULT_URL_FOR_TEST)
        cls.base_host = parsed_url.hostname
        base_port = str(parsed_url.port)
        cls.lb_port = base_port
        cls.prefill_port = f"{int(base_port) + 100}"
        cls.decode_port = f"{int(base_port) + 200}"
        cls.prefill_url = f"http://{cls.base_host}:{cls.prefill_port}"
        cls.decode_url = f"http://{cls.base_host}:{cls.decode_port}"
        cls.lb_url = f"http://{cls.base_host}:{cls.lb_port}"
        print(f"{cls.base_host=} {cls.lb_port=} {cls.prefill_port=} {cls.decode_port=}")
35

36
37
38
        # Non blocking start servers
        cls.start_prefill()
        cls.start_decode()
39

40
        # Block until both
41
42
43
        cls.wait_server_ready(cls.prefill_url + "/health")
        cls.wait_server_ready(cls.decode_url + "/health")

44
        cls.launch_lb()
45
46
47
48
49
50
51
52

    @classmethod
    def start_prefill(cls):
        prefill_args = [
            "--trust-remote-code",
            "--disaggregation-mode",
            "prefill",
            "--tp",
53
54
55
            "2",
            "--disaggregation-ib-device",
            "mlx5_roce0,mlx5_roce1",
56
57
58
59
60
61
62
63
64
65
66
67
68
69
        ]
        cls.process_prefill = popen_launch_pd_server(
            cls.model,
            cls.prefill_url,
            timeout=DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH,
            other_args=prefill_args,
        )

    @classmethod
    def start_decode(cls):
        decode_args = [
            "--trust-remote-code",
            "--disaggregation-mode",
            "decode",
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
            "--tp",
            "1",
            "--base-gpu-id",
            "2",
            "--disaggregation-ib-device",
            "mlx5_roce2",
        ]
        cls.process_decode = popen_launch_pd_server(
            cls.model,
            cls.decode_url,
            timeout=DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH,
            other_args=decode_args,
        )

    def test_gsm8k(self):
        args = SimpleNamespace(
            num_shots=5,
            data_path=None,
            num_questions=200,
            max_new_tokens=512,
            parallel=128,
            host=f"http://{self.base_host}",
            port=int(self.lb_port),
        )
        metrics = run_eval_few_shot_gsm8k(args)
        print(f"Evaluation metrics: {metrics}")

        self.assertGreater(metrics["accuracy"], 0.60)


100
class TestDisaggregationMooncakeDecodeLargerTP(TestDisaggregationBase):
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
    @classmethod
    def setUpClass(cls):
        # Temporarily disable JIT DeepGEMM
        cls.original_jit_deepgemm = os.environ.get("SGL_ENABLE_JIT_DEEPGEMM")
        os.environ["SGL_ENABLE_JIT_DEEPGEMM"] = "false"

        cls.model = DEFAULT_MODEL_NAME_FOR_TEST_MLA
        parsed_url = urlparse(DEFAULT_URL_FOR_TEST)
        cls.base_host = parsed_url.hostname
        base_port = str(parsed_url.port)
        cls.lb_port = base_port
        cls.prefill_port = f"{int(base_port) + 100}"
        cls.decode_port = f"{int(base_port) + 200}"
        cls.prefill_url = f"http://{cls.base_host}:{cls.prefill_port}"
        cls.decode_url = f"http://{cls.base_host}:{cls.decode_port}"
        cls.lb_url = f"http://{cls.base_host}:{cls.lb_port}"
        print(f"{cls.base_host=} {cls.lb_port=} {cls.prefill_port=} {cls.decode_port=}")

        # Non blocking start servers
        cls.start_prefill()
        cls.start_decode()

        # Block until both
        cls.wait_server_ready(cls.prefill_url + "/health")
        cls.wait_server_ready(cls.decode_url + "/health")

127
        cls.launch_lb()
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152

    @classmethod
    def start_prefill(cls):
        prefill_args = [
            "--trust-remote-code",
            "--disaggregation-mode",
            "prefill",
            "--tp",
            "1",
            "--disaggregation-ib-device",
            "mlx5_roce0",
        ]
        cls.process_prefill = popen_launch_pd_server(
            cls.model,
            cls.prefill_url,
            timeout=DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH,
            other_args=prefill_args,
        )

    @classmethod
    def start_decode(cls):
        decode_args = [
            "--trust-remote-code",
            "--disaggregation-mode",
            "decode",
153
154
155
            "--tp",
            "2",
            "--base-gpu-id",
156
157
158
            "1",
            "--disaggregation-ib-device",
            "mlx5_roce1,mlx5_roce2",
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
        ]
        cls.process_decode = popen_launch_pd_server(
            cls.model,
            cls.decode_url,
            timeout=DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH,
            other_args=decode_args,
        )

    def test_gsm8k(self):
        args = SimpleNamespace(
            num_shots=5,
            data_path=None,
            num_questions=200,
            max_new_tokens=512,
            parallel=128,
174
175
            host=f"http://{self.base_host}",
            port=int(self.lb_port),
176
177
178
179
180
181
182
183
184
        )
        metrics = run_eval_few_shot_gsm8k(args)
        print(f"Evaluation metrics: {metrics}")

        self.assertGreater(metrics["accuracy"], 0.60)


if __name__ == "__main__":
    unittest.main()