test_regular_router.py 4.02 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import threading
import time
from types import SimpleNamespace

import pytest
import requests

from sglang.test.run_eval import run_eval


@pytest.mark.e2e
def test_mmlu(e2e_router_only_rr, e2e_primary_worker, e2e_model):
    # Attach the primary worker to a fresh router-only instance (single model)
    base = e2e_router_only_rr.url
    r = requests.post(
        f"{base}/add_worker", params={"url": e2e_primary_worker.url}, timeout=180
    )
    r.raise_for_status()

    args = SimpleNamespace(
        base_url=base,
        model=e2e_model,
        eval_name="mmlu",
        num_examples=64,
        num_threads=32,
        temperature=0.1,
    )
    metrics = run_eval(args)
    assert metrics["score"] >= 0.65


@pytest.mark.e2e
def test_add_and_remove_worker_live(e2e_router_only_rr, e2e_primary_worker, e2e_model):
    base = e2e_router_only_rr.url
    worker_url = e2e_primary_worker.url

    r = requests.post(f"{base}/add_worker", params={"url": worker_url}, timeout=180)
    r.raise_for_status()

    with requests.Session() as s:
        for i in range(8):
            r = s.post(
                f"{base}/v1/completions",
                json={
                    "model": e2e_model,
                    "prompt": f"x{i}",
                    "max_tokens": 1,
                    "stream": False,
                },
                timeout=120,
            )
            r.raise_for_status()

    # Remove the worker
    r = requests.post(f"{base}/remove_worker", params={"url": worker_url}, timeout=60)
    r.raise_for_status()


@pytest.mark.e2e
def test_lazy_fault_tolerance_live(e2e_router_only_rr, e2e_primary_worker, e2e_model):
    base = e2e_router_only_rr.url
    worker = e2e_primary_worker

    r = requests.post(f"{base}/add_worker", params={"url": worker.url}, timeout=180)
    r.raise_for_status()

    def killer():
        time.sleep(10)
        try:
            worker.proc.terminate()
        except Exception:
            pass

    t = threading.Thread(target=killer, daemon=True)
    t.start()

    args = SimpleNamespace(
        base_url=base,
        model=e2e_model,
        eval_name="mmlu",
        num_examples=32,
        num_threads=16,
        temperature=0.0,
    )
    metrics = run_eval(args)
    assert 0.0 <= metrics["score"] <= 1.0


@pytest.mark.e2e
def test_dp_aware_worker_expansion_and_api_key(
    e2e_model,
    e2e_router_only_rr_dp_aware_api,
    e2e_worker_dp2_api,
):
    """
    Launch a router-only instance in dp_aware mode and a single worker with dp_size=2
    and API key protection. Verify expansion, auth enforcement, and basic eval.
    """
    import os

    router_url = e2e_router_only_rr_dp_aware_api.url
    worker_url = e2e_worker_dp2_api.url
    api_key = e2e_router_only_rr_dp_aware_api.api_key

    # Attach worker; router should expand to dp_size logical workers
    r = requests.post(
        f"{router_url}/add_worker", params={"url": worker_url}, timeout=180
    )
    r.raise_for_status()

    r = requests.get(f"{router_url}/list_workers", timeout=30)
    r.raise_for_status()
    urls = r.json().get("urls", [])
    assert len(urls) == 2
    assert set(urls) == {f"{worker_url}@0", f"{worker_url}@1"}

    # Verify API key enforcement path-through
    # 1) Without Authorization -> 401 from backend
    r = requests.post(
        f"{router_url}/v1/completions",
        json={"model": e2e_model, "prompt": "hi", "max_tokens": 1},
        timeout=60,
    )
    assert r.status_code == 401

    # 2) With correct Authorization -> 200
    r = requests.post(
        f"{router_url}/v1/completions",
        json={"model": e2e_model, "prompt": "hi", "max_tokens": 1},
        headers={"Authorization": f"Bearer {api_key}"},
        timeout=60,
    )
    assert r.status_code == 200

    # Finally, run MMLU eval through the router with auth
    os.environ["OPENAI_API_KEY"] = api_key
    args = SimpleNamespace(
        base_url=router_url,
        model=e2e_model,
        eval_name="mmlu",
        num_examples=64,
        num_threads=32,
        temperature=0.1,
    )
    metrics = run_eval(args)
    assert metrics["score"] >= 0.65