test_regular_router.py 7.03 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
import threading
import time
from types import SimpleNamespace

import pytest
import requests

from sglang.test.run_eval import run_eval


11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def _wait_for_workers(
    base_url: str, expected_count: int, timeout: float = 60.0, headers: dict = None
) -> None:
    """Poll /workers endpoint until expected number of workers are registered."""
    start = time.perf_counter()
    with requests.Session() as session:
        while time.perf_counter() - start < timeout:
            try:
                r = session.get(f"{base_url}/workers", headers=headers, timeout=5)
                if r.status_code == 200:
                    workers = r.json().get("workers", [])
                    if len(workers) >= expected_count:
                        return
            except requests.RequestException:
                pass
            time.sleep(0.5)
    raise TimeoutError(
        f"Expected {expected_count} workers at {base_url}, timed out after {timeout}s"
    )


32
@pytest.mark.e2e
33
34
def test_mmlu(e2e_router_only_rr, e2e_two_workers_dp2, e2e_model):
    # Attach two dp=2 workers (total 4 GPUs) to a fresh router-only instance
35
    base = e2e_router_only_rr.url
36
    for w in e2e_two_workers_dp2:
37
38
39
40
41
42
43
        r = requests.post(f"{base}/workers", json={"url": w.url}, timeout=180)
        assert (
            r.status_code == 202
        ), f"Expected 202 ACCEPTED, got {r.status_code}: {r.text}"

    # Wait for workers to be registered
    _wait_for_workers(base, expected_count=2, timeout=60.0)
44
45
46
47
48
49
50
51
52
53
54
55
56

    args = SimpleNamespace(
        base_url=base,
        model=e2e_model,
        eval_name="mmlu",
        num_examples=64,
        num_threads=32,
        temperature=0.1,
    )
    metrics = run_eval(args)
    assert metrics["score"] >= 0.65


57
58
59
60
61
62
63
@pytest.mark.e2e
def test_genai_bench(
    e2e_router_only_rr, e2e_two_workers_dp2, e2e_model, genai_bench_runner
):
    """Attach a worker to the regular router and run a short genai-bench."""
    base = e2e_router_only_rr.url
    for w in e2e_two_workers_dp2:
64
65
66
67
68
69
70
        r = requests.post(f"{base}/workers", json={"url": w.url}, timeout=180)
        assert (
            r.status_code == 202
        ), f"Expected 202 ACCEPTED, got {r.status_code}: {r.text}"

    # Wait for workers to be registered
    _wait_for_workers(base, expected_count=2, timeout=60.0)
71
72
73
74
75
76
77
78

    genai_bench_runner(
        router_url=base,
        model_path=e2e_model,
        experiment_folder="benchmark_round_robin_regular",
        thresholds={
            "ttft_mean_max": 6,
            "e2e_latency_mean_max": 14,
79
            "input_throughput_mean_min": 800,  # temp relax from 1000 to 800 for now
80
            "output_throughput_mean_min": 12,
81
82
            # Enforce GPU utilization p50 >= 99% during the run.
            "gpu_util_p50_min": 99,
83
84
85
86
87
        },
        kill_procs=e2e_two_workers_dp2,
    )


88
89
90
91
92
@pytest.mark.e2e
def test_add_and_remove_worker_live(e2e_router_only_rr, e2e_primary_worker, e2e_model):
    base = e2e_router_only_rr.url
    worker_url = e2e_primary_worker.url

93
94
95
96
97
    r = requests.post(f"{base}/workers", json={"url": worker_url}, timeout=180)
    assert r.status_code == 202, f"Expected 202 ACCEPTED, got {r.status_code}: {r.text}"

    # Wait for worker to be registered
    _wait_for_workers(base, expected_count=1, timeout=60.0)
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113

    with requests.Session() as s:
        for i in range(8):
            r = s.post(
                f"{base}/v1/completions",
                json={
                    "model": e2e_model,
                    "prompt": f"x{i}",
                    "max_tokens": 1,
                    "stream": False,
                },
                timeout=120,
            )
            r.raise_for_status()

    # Remove the worker
114
115
116
117
118
    from urllib.parse import quote

    encoded_url = quote(worker_url, safe="")
    r = requests.delete(f"{base}/workers/{encoded_url}", timeout=60)
    assert r.status_code == 202, f"Expected 202 ACCEPTED, got {r.status_code}: {r.text}"
119
120
121
122
123
124
125


@pytest.mark.e2e
def test_lazy_fault_tolerance_live(e2e_router_only_rr, e2e_primary_worker, e2e_model):
    base = e2e_router_only_rr.url
    worker = e2e_primary_worker

126
127
128
129
130
    r = requests.post(f"{base}/workers", json={"url": worker.url}, timeout=180)
    assert r.status_code == 202, f"Expected 202 ACCEPTED, got {r.status_code}: {r.text}"

    # Wait for worker to be registered
    _wait_for_workers(base, expected_count=1, timeout=60.0)
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171

    def killer():
        time.sleep(10)
        try:
            worker.proc.terminate()
        except Exception:
            pass

    t = threading.Thread(target=killer, daemon=True)
    t.start()

    args = SimpleNamespace(
        base_url=base,
        model=e2e_model,
        eval_name="mmlu",
        num_examples=32,
        num_threads=16,
        temperature=0.0,
    )
    metrics = run_eval(args)
    assert 0.0 <= metrics["score"] <= 1.0


@pytest.mark.e2e
def test_dp_aware_worker_expansion_and_api_key(
    e2e_model,
    e2e_router_only_rr_dp_aware_api,
    e2e_worker_dp2_api,
):
    """
    Launch a router-only instance in dp_aware mode and a single worker with dp_size=2
    and API key protection. Verify expansion, auth enforcement, and basic eval.
    """
    import os

    router_url = e2e_router_only_rr_dp_aware_api.url
    worker_url = e2e_worker_dp2_api.url
    api_key = e2e_router_only_rr_dp_aware_api.api_key

    # Attach worker; router should expand to dp_size logical workers
    r = requests.post(
172
173
        f"{router_url}/workers",
        json={"url": worker_url, "api_key": api_key},
174
        headers={"Authorization": f"Bearer {api_key}"},
175
        timeout=180,
176
    )
177
178
179
180
181
182
183
184
185
    assert r.status_code == 202, f"Expected 202 ACCEPTED, got {r.status_code}: {r.text}"

    # Wait for workers to be registered and expanded
    _wait_for_workers(
        router_url,
        expected_count=2,
        timeout=60.0,
        headers={"Authorization": f"Bearer {api_key}"},
    )
186

187
    # Verify the expanded workers have correct URLs
188
    r = requests.get(
189
        f"{router_url}/workers",
190
191
192
        headers={"Authorization": f"Bearer {api_key}"},
        timeout=30,
    )
193
    r.raise_for_status()
194
195
    workers = r.json().get("workers", [])
    urls = [w["url"] for w in workers]
196
197
198
    assert len(urls) == 2
    assert set(urls) == {f"{worker_url}@0", f"{worker_url}@1"}

199
200
    # Verify API key enforcement
    # 1) Without Authorization -> Should get 401 Unauthorized
201
202
203
204
205
    r = requests.post(
        f"{router_url}/v1/completions",
        json={"model": e2e_model, "prompt": "hi", "max_tokens": 1},
        timeout=60,
    )
206
    assert r.status_code == 401
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228

    # 2) With correct Authorization -> 200
    r = requests.post(
        f"{router_url}/v1/completions",
        json={"model": e2e_model, "prompt": "hi", "max_tokens": 1},
        headers={"Authorization": f"Bearer {api_key}"},
        timeout=60,
    )
    assert r.status_code == 200

    # Finally, run MMLU eval through the router with auth
    os.environ["OPENAI_API_KEY"] = api_key
    args = SimpleNamespace(
        base_url=router_url,
        model=e2e_model,
        eval_name="mmlu",
        num_examples=64,
        num_threads=32,
        temperature=0.1,
    )
    metrics = run_eval(args)
    assert metrics["score"] >= 0.65