test_launch_server.py 6.65 KB
Newer Older
1
import socket
Byron Hsu's avatar
Byron Hsu committed
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import subprocess
import time
import unittest
from types import SimpleNamespace

import requests

from sglang.test.run_eval import run_eval
from sglang.test.test_utils import (
    DEFAULT_MODEL_NAME_FOR_TEST,
    DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH,
    DEFAULT_URL_FOR_TEST,
)


def popen_launch_router(
    model: str,
    base_url: str,
    dp_size: int,
    timeout: float,
22
    policy: str = "cache_aware",
Byron Hsu's avatar
Byron Hsu committed
23
24
25
26
27
28
29
30
31
):
    """
    Launch the router server process.

    Args:
        model: Model path/name
        base_url: Server base URL
        dp_size: Data parallel size
        timeout: Server launch timeout
32
        policy: Router policy, one of "cache_aware", "round_robin", "random"
Byron Hsu's avatar
Byron Hsu committed
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
    """
    _, host, port = base_url.split(":")
    host = host[2:]

    command = [
        "python3",
        "-m",
        "sglang_router.launch_server",
        "--model-path",
        model,
        "--host",
        host,
        "--port",
        port,
        "--dp",
        str(dp_size),  # Convert dp_size to string
49
50
        "--router-eviction-interval",
        "5",  # frequent eviction for testing
51
52
        "--router-policy",
        policy,
Byron Hsu's avatar
Byron Hsu committed
53
54
    ]

55
    process = subprocess.Popen(command, stdout=None, stderr=None)
Byron Hsu's avatar
Byron Hsu committed
56
57
58
59
60
61
62

    start_time = time.time()
    with requests.Session() as session:
        while time.time() - start_time < timeout:
            try:
                response = session.get(f"{base_url}/health")
                if response.status_code == 200:
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
                    print(f"Router {base_url} is healthy")
                    return process
            except requests.RequestException:
                pass
            time.sleep(10)

    raise TimeoutError("Router failed to start within the timeout period.")


def find_available_port():
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.bind(("127.0.0.1", 0))
        return s.getsockname()[1]


def popen_launch_server(
    model: str,
    base_url: str,
    timeout: float,
):
    _, host, port = base_url.split(":")
    host = host[2:]

    command = [
        "python3",
        "-m",
        "sglang.launch_server",
        "--model-path",
        model,
        "--host",
        host,
        "--port",
        port,
        "--base-gpu-id",
        "1",
    ]

    process = subprocess.Popen(command, stdout=None, stderr=None)

102
103
    # intentionally don't wait and defer the job to the router health check
    return process
Byron Hsu's avatar
Byron Hsu committed
104
105


106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def terminate_and_wait(process, timeout=300):
    """Terminate a process and wait until it is terminated.

    Args:
        process: subprocess.Popen object
        timeout: maximum time to wait in seconds

    Raises:
        TimeoutError: if process does not terminate within timeout
    """
    if process is None:
        return

    process.terminate()
    start_time = time.time()

    while process.poll() is None:
        print(f"Terminating process {process.pid}")
        if time.time() - start_time > timeout:
            raise TimeoutError(
                f"Process {process.pid} failed to terminate within {timeout}s"
            )
        time.sleep(1)

    print(f"Process {process.pid} is successfully terminated")


133
class TestLaunchServer(unittest.TestCase):
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
    def setUp(self):
        self.model = DEFAULT_MODEL_NAME_FOR_TEST
        self.base_url = DEFAULT_URL_FOR_TEST
        self.process = None
        self.other_process = []

    def tearDown(self):
        print("Running tearDown...")
        if self.process:
            terminate_and_wait(self.process)
        for process in self.other_process:
            terminate_and_wait(process)
        print("tearDown done")

    def test_1_mmlu(self):
        print("Running test_1_mmlu...")
150
        # DP size = 2
151
        self.process = popen_launch_router(
152
153
154
155
            self.model,
            self.base_url,
            dp_size=2,
            timeout=DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH,
156
            policy="cache_aware",
157
158
        )

Byron Hsu's avatar
Byron Hsu committed
159
160
161
162
163
164
165
166
167
168
        args = SimpleNamespace(
            base_url=self.base_url,
            model=self.model,
            eval_name="mmlu",
            num_examples=64,
            num_threads=32,
            temperature=0.1,
        )

        metrics = run_eval(args)
Byron Hsu's avatar
Byron Hsu committed
169
170
171
172
173
        score = metrics["score"]
        THRESHOLD = 0.65
        passed = score >= THRESHOLD
        msg = f"MMLU test {'passed' if passed else 'failed'} with score {score:.3f} (threshold: {THRESHOLD})"
        self.assertGreaterEqual(score, THRESHOLD, msg)
Byron Hsu's avatar
Byron Hsu committed
174

175
176
    def test_2_add_and_remove_worker(self):
        print("Running test_2_add_and_remove_worker...")
177
        # DP size = 1
178
        self.process = popen_launch_router(
179
180
181
182
            self.model,
            self.base_url,
            dp_size=1,
            timeout=DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH,
183
            policy="round_robin",  # use round robin to make sure every worker processes requests
184
        )
185
186
187
188
189
190
        # 1. start a worker, and wait until it is healthy
        port = find_available_port()
        worker_url = f"http://127.0.0.1:{port}"
        worker_process = popen_launch_server(
            self.model, worker_url, DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH
        )
191
        self.other_process.append(worker_process)
192
193

        # 2. use /add_worker api to add it the the router. It will be used by router after it is healthy
194
195
196
197
        with requests.Session() as session:
            response = session.post(f"{self.base_url}/add_worker?url={worker_url}")
            print(f"status code: {response.status_code}, response: {response.text}")
            self.assertEqual(response.status_code, 200)
198

199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
        # 3. run mmlu
        args = SimpleNamespace(
            base_url=self.base_url,
            model=self.model,
            eval_name="mmlu",
            num_examples=64,
            num_threads=32,
            temperature=0.1,
        )
        metrics = run_eval(args)
        score = metrics["score"]
        THRESHOLD = 0.65
        passed = score >= THRESHOLD
        msg = f"MMLU test {'passed' if passed else 'failed'} with score {score:.3f} (threshold: {THRESHOLD})"
        self.assertGreaterEqual(score, THRESHOLD, msg)

215
216
217
218
219
220
221
222
223
224
225
226
227
228
        # 4. use /remove_worker api to remove it from the router
        with requests.Session() as session:
            response = session.post(f"{self.base_url}/remove_worker?url={worker_url}")
            print(f"status code: {response.status_code}, response: {response.text}")
            self.assertEqual(response.status_code, 200)

        # 5. run mmlu again
        metrics = run_eval(args)
        score = metrics["score"]
        THRESHOLD = 0.65
        passed = score >= THRESHOLD
        msg = f"MMLU test {'passed' if passed else 'failed'} with score {score:.3f} (threshold: {THRESHOLD})"
        self.assertGreaterEqual(score, THRESHOLD, msg)

Byron Hsu's avatar
Byron Hsu committed
229
230
231

if __name__ == "__main__":
    unittest.main()