conftest.py 5.87 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
import socket
import subprocess
import time
from types import SimpleNamespace
from urllib.parse import urlparse

import pytest
import requests

from sglang.test.test_utils import (
    DEFAULT_MODEL_NAME_FOR_TEST,
    DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH,
    DEFAULT_URL_FOR_TEST,
)


def _find_available_port() -> int:
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.bind(("127.0.0.1", 0))
        return s.getsockname()[1]


def _parse_url(base_url: str) -> tuple[str, str]:
    """Parse a base URL and return (host, port) as strings.

    This is more robust than simple string splitting and supports different schemes
    and URL shapes like trailing paths.
    """
    parsed = urlparse(base_url)
    return parsed.hostname or "127.0.0.1", (
        str(parsed.port) if parsed.port is not None else ""
    )


def _wait_router_health(base_url: str, timeout: float) -> None:
    start = time.perf_counter()
    with requests.Session() as session:
        while time.perf_counter() - start < timeout:
            try:
                r = session.get(f"{base_url}/health", timeout=5)
                if r.status_code == 200:
                    return
            except requests.RequestException:
                pass
            time.sleep(2)
    raise TimeoutError("Router failed to become healthy in time")


def _popen_launch_router(
    model: str,
    base_url: str,
    dp_size: int,
    timeout: float,
    policy: str = "cache_aware",
) -> subprocess.Popen:
    host, port = _parse_url(base_url)

    prom_port = _find_available_port()

    cmd = [
        "python3",
        "-m",
        "sglang_router.launch_server",
        "--model-path",
        model,
        "--host",
        host,
        "--port",
        port,
        "--dp",
        str(dp_size),
        "--router-policy",
        policy,
        "--allow-auto-truncate",
        "--router-prometheus-port",
        str(prom_port),
        "--router-prometheus-host",
        "127.0.0.1",
    ]

    proc = subprocess.Popen(cmd)
    _wait_router_health(base_url, timeout)
    return proc


def _popen_launch_worker(
    model: str,
    base_url: str,
    *,
    dp_size: int | None = None,
    api_key: str | None = None,
) -> subprocess.Popen:
    host, port = _parse_url(base_url)

    cmd = [
        "python3",
        "-m",
        "sglang.launch_server",
        "--model-path",
        model,
        "--host",
        host,
        "--port",
        port,
        "--base-gpu-id",
        "0",
    ]
    if dp_size is not None:
        cmd += ["--dp-size", str(dp_size)]
    if api_key is not None:
        cmd += ["--api-key", api_key]
    return subprocess.Popen(cmd)


def _popen_launch_router_only(
    base_url: str,
    policy: str = "round_robin",
    timeout: float = 120.0,
    *,
    dp_aware: bool = False,
    api_key: str | None = None,
) -> subprocess.Popen:
    host, port = _parse_url(base_url)

    prom_port = _find_available_port()
    cmd = [
        "python3",
        "-m",
        "sglang_router.launch_router",
        "--host",
        host,
        "--port",
        port,
        "--policy",
        policy,
    ]
    if dp_aware:
        cmd += ["--dp-aware"]
    if api_key is not None:
        cmd += ["--api-key", api_key]
    cmd += [
        "--prometheus-port",
        str(prom_port),
        "--prometheus-host",
        "127.0.0.1",
    ]
    proc = subprocess.Popen(cmd)
    _wait_router_health(base_url, timeout)
    return proc


def _terminate(proc: subprocess.Popen, timeout: float = 120) -> None:
    if proc is None:
        return
    proc.terminate()
    start = time.perf_counter()
    while proc.poll() is None:
        if time.perf_counter() - start > timeout:
            proc.kill()
            break
        time.sleep(1)


def pytest_configure(config):
    config.addinivalue_line("markers", "e2e: mark as end-to-end test")


@pytest.fixture(scope="session")
def e2e_model() -> str:
    # Always use the default test model
    return DEFAULT_MODEL_NAME_FOR_TEST


@pytest.fixture
def e2e_router(e2e_model: str):
    # Keep this available but tests below use router-only to avoid GPU contention
    base_url = DEFAULT_URL_FOR_TEST
    proc = _popen_launch_router(
        e2e_model, base_url, dp_size=2, timeout=DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH
    )
    try:
        yield SimpleNamespace(proc=proc, url=base_url)
    finally:
        _terminate(proc)


@pytest.fixture
def e2e_router_only_rr():
    port = _find_available_port()
    base_url = f"http://127.0.0.1:{port}"
    proc = _popen_launch_router_only(base_url, policy="round_robin")
    try:
        yield SimpleNamespace(proc=proc, url=base_url)
    finally:
        _terminate(proc)


@pytest.fixture(scope="session")
def e2e_primary_worker(e2e_model: str):
    port = _find_available_port()
    base_url = f"http://127.0.0.1:{port}"
    proc = _popen_launch_worker(e2e_model, base_url)
    # Router health gate will handle worker readiness
    try:
        yield SimpleNamespace(proc=proc, url=base_url)
    finally:
        _terminate(proc)


@pytest.fixture
def e2e_router_only_rr_dp_aware_api():
    """Router-only with dp-aware enabled and an API key."""
    port = _find_available_port()
    base_url = f"http://127.0.0.1:{port}"
    api_key = "secret"
    proc = _popen_launch_router_only(
        base_url, policy="round_robin", timeout=180.0, dp_aware=True, api_key=api_key
    )
    try:
        yield SimpleNamespace(proc=proc, url=base_url, api_key=api_key)
    finally:
        _terminate(proc)


@pytest.fixture
def e2e_worker_dp2_api(e2e_model: str, e2e_router_only_rr_dp_aware_api):
    """Worker with dp-size=2 and the same API key as the dp-aware router."""
    port = _find_available_port()
    base_url = f"http://127.0.0.1:{port}"
    api_key = e2e_router_only_rr_dp_aware_api.api_key
    proc = _popen_launch_worker(e2e_model, base_url, dp_size=2, api_key=api_key)
    try:
        yield SimpleNamespace(proc=proc, url=base_url)
    finally:
        _terminate(proc)