test_router_e2e_with_sglang.py 13 KB
Newer Older
1
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2
# SPDX-License-Identifier: Apache-2.0
3
4
5
6
7

# Timing notes (measured in an SGLang-enabled container):
# - GPU-1 subset (`-m "gpu_1"`): 92.35s total for 2 tests (+ 1 skipped).
# These tests load a real model and can be slow/flaky when GPU resources are contended,
# so we set explicit pytest timeouts to fail fast on hangs (see per-test markers below).
8
9
10
11
12
13
import logging
import os
from typing import Any, Dict, Optional

import pytest

14
15
16
17
18
from tests.router.e2e_harness import (
    ManagedEngineProcessMixin,
    run_basic_router_test,
    run_indexers_sync_test,
    run_router_decisions_test,
19
)
20
from tests.router.helper import generate_random_suffix
21
from tests.utils.constants import DefaultPort
22
from tests.utils.managed_process import ManagedProcess
23
from tests.utils.port_utils import allocate_ports, deallocate_ports
24
25
26

logger = logging.getLogger(__name__)

27
MODEL_NAME = "silence09/DeepSeek-R1-Small-2layers"
28
29
30

pytestmark = [
    pytest.mark.e2e,
31
    pytest.mark.router,
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
    pytest.mark.sglang,
    pytest.mark.model(MODEL_NAME),
]
PAGE_SIZE = 16  # SGLang uses "page_size" instead of "block_size"

# Shared SGLang configuration for all tests
# mem_fraction_static limits actual VRAM allocation (required for multi-worker on same GPU)
SGLANG_ARGS: Dict[str, Any] = {
    "page_size": PAGE_SIZE,
    "model": MODEL_NAME,
    "mem_fraction_static": 0.4,  # Limit VRAM allocation per worker (equivalent to vLLM's gpu_memory_utilization)
    "context_length": 1024,  # Limit context length to reduce KV cache size (equivalent to vLLM's max_model_len)
    "disable_cuda_graph": True,  # Disable CUDA graphs for faster startup & lower memory (equivalent to vLLM's enforce_eager)
}


48
class SGLangProcess(ManagedEngineProcessMixin):
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
    """Manages SGLang workers using dynamo.sglang (HTTP API + KV events).

    This is a drop-in replacement for MockerProcess that uses real SGLang workers.
    The key difference: dynamo.sglang automatically handles:
    - HTTP API serving
    - KV cache event publishing (ZMQ → NATS bridge)
    - Integration with dynamo.frontend router
    """

    def __init__(
        self,
        request,
        sglang_args: Optional[Dict[str, Any]] = None,
        num_workers: int = 2,
        single_gpu: bool = False,
        data_parallel_size: Optional[int] = None,
65
66
        request_plane: str = "tcp",
        store_backend: str = "etcd",
67
        durable_kv_events: bool = False,
68
69
70
71
72
73
74
75
76
77
78
79
80
81
    ):
        """Initialize SGLang workers with dynamo integration.

        Args:
            request: pytest request fixture for log directory
            sglang_args: Configuration dict with keys:
                - page_size: KV cache page size (default: 16)
                - model: Model name/path (default: TinyLlama-1.1B)
                - mem_fraction_static: Fraction of GPU memory to allocate (optional)
                - context_length: Maximum sequence length (optional)
                - disable_cuda_graph: Disable CUDA graphs (default: False)
            num_workers: Number of SGLang worker processes
            single_gpu: If True, all workers share GPU 0
            data_parallel_size: If set, enables data parallelism with this many ranks (num_workers must equal data_parallel_size)
82
83
            request_plane: Request plane to use ("nats", "tcp", or "http"). Defaults to "tcp".
            store_backend: Storage backend to use ("etcd" or "file"). Defaults to "etcd".
84
            durable_kv_events: If True, use JetStream for durable KV events. Defaults to False (NATS Core mode).
85
86
87
88
89
90
91
        """
        # Generate unique namespace for isolation
        namespace_suffix = generate_random_suffix()
        self.namespace = f"test-namespace-{namespace_suffix}"
        self.component_name = "backend"
        self.endpoint = f"dyn://{self.namespace}.{self.component_name}.generate"
        self.num_workers = num_workers
92
        self.data_parallel_size = data_parallel_size
93
        self.worker_processes = []
94
        self.store_backend = store_backend
95

96
97
98
99
100
101
102
103
        # Dynamically allocate unique system and KV event ports (one per worker)
        # to avoid conflicts in parallel test runs.
        self._system_ports = allocate_ports(num_workers, DefaultPort.SYSTEM1.value)
        self._kv_event_ports = allocate_ports(num_workers, DefaultPort.SYSTEM1.value)
        request.addfinalizer(
            lambda: deallocate_ports(self._system_ports + self._kv_event_ports)
        )

104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
        if sglang_args is None:
            sglang_args = {}

        page_size = sglang_args.get("page_size", PAGE_SIZE)
        model = sglang_args.get("model", MODEL_NAME)
        mem_fraction_static = sglang_args.get("mem_fraction_static")
        context_length = sglang_args.get("context_length")
        disable_cuda_graph = sglang_args.get("disable_cuda_graph", False)

        self.model_name = model

        for worker_idx in range(num_workers):
            # Calculate GPU device for this process
            if single_gpu:
                # Force all processes to GPU 0 (for single-GPU testing)
                gpu_device = "0"
            elif data_parallel_size is not None:
                # Worker sees dp_rank GPUs (each DP rank gets its own GPU)
                worker_start_gpu = worker_idx * data_parallel_size
                gpu_device = ",".join(
                    str(i)
                    for i in range(
                        worker_start_gpu, worker_start_gpu + data_parallel_size
                    )
                )
            else:
                # No DP; worker sees one GPU
                gpu_device = str(worker_idx)

            command = [
                "python3",
                "-m",
                "dynamo.sglang",
                "--model-path",
                model,
                "--page-size",
                str(page_size),
            ]

            # Disable CUDA graphs for faster startup & lower memory
            if disable_cuda_graph:
                command.append("--disable-cuda-graph")

            # Limit VRAM allocation (required for multi-worker on same GPU)
            if mem_fraction_static is not None:
                command.extend(["--mem-fraction-static", str(mem_fraction_static)])

            # Add optional context_length if specified
            if context_length is not None:
                command.extend(["--context-length", str(context_length)])

            if data_parallel_size is not None:
                # Add DP configuration
                command.extend(
                    [
                        "--dp-size",
                        str(data_parallel_size),
161
162
163
                        "--tp-size",
                        str(data_parallel_size),
                        "--enable-dp-attention",
164
165
166
167
                    ]
                )

            # Add per-worker KV events config for ZMQ publishing
168
169
            # Ports are dynamically allocated for xdist-safe parallel execution.
            kv_events_port = self._kv_event_ports[worker_idx]
170
171
172
            kv_events_config = f'{{"publisher":"zmq","topic":"kv-events","endpoint":"tcp://*:{kv_events_port}"}}'
            command.extend(["--kv-events-config", kv_events_config])

173
174
175
176
            # Use --durable-kv-events to enable JetStream mode (local indexer disabled)
            if durable_kv_events:
                command.append("--durable-kv-events")

177
178
179
180
            # Each SGLang worker needs a unique DYN_SYSTEM_PORT to avoid conflicts.
            # Ports are dynamically allocated for xdist-safe parallel execution.
            system_port = self._system_ports[worker_idx]

181
            env = os.environ.copy()  # Copy parent environment
182
183
184
185
            env_vars = {
                "CUDA_VISIBLE_DEVICES": gpu_device,
                "DYN_NAMESPACE": self.namespace,
                "DYN_REQUEST_PLANE": request_plane,
186
                "DYN_SYSTEM_PORT": str(system_port),
187
188
189
190
191
192
193
194
                "PYTHONHASHSEED": "0",  # for deterministic event id's
            }

            # Add DYN_FILE_KV if using file storage backend
            if self.store_backend == "file" and "DYN_FILE_KV" in os.environ:
                env_vars["DYN_FILE_KV"] = os.environ["DYN_FILE_KV"]

            env.update(env_vars)
195
196
197
198
199
200
201
202
203
204

            # Create managed process for the worker
            process = ManagedProcess(
                command=command,
                env=env,
                timeout=120,  # Allow time for model loading
                display_output=True,
                health_check_ports=[],
                health_check_urls=[],
                log_dir=request.node.name,
205
                terminate_all_matching_process_names=False,
206
207
208
209
210
            )
            self.worker_processes.append(process)
            if data_parallel_size is not None:
                logger.info(
                    f"Created {data_parallel_size} DP ranks per worker on GPU(s) {gpu_device} "
211
                    f"(mem_frac={mem_fraction_static}, system_port={system_port}, kv_port={kv_events_port}) "
212
213
214
215
216
                    f"with endpoint: {self.endpoint}"
                )
            else:
                logger.info(
                    f"Created SGLang worker {worker_idx} on GPU {gpu_device} "
217
                    f"(mem_frac={mem_fraction_static}, system_port={system_port}, kv_port={kv_events_port}) "
218
219
220
                    f"with endpoint: {self.endpoint}"
                )

221
222
    process_name = "SGLang worker"
    cleanup_name = "SGLang worker resources"
223
224
225
226


@pytest.mark.pre_merge
@pytest.mark.gpu_1
227
@pytest.mark.parametrize("request_plane", ["tcp"], indirect=True)
228
@pytest.mark.timeout(150)  # ~3x average (~46s/test), rounded up
229
def test_sglang_kv_router_basic(
230
231
232
233
234
    request,
    runtime_services_dynamic_ports,
    predownload_models,
    set_ucx_tls_no_mm,
    request_plane,
235
):
236
237
238
239
240
241
242
    run_basic_router_test(
        engine_process_cls=SGLangProcess,
        engine_args_name="sglang_args",
        engine_args=SGLANG_ARGS,
        num_workers=2,
        single_gpu=True,
        request=request,
243
        request_plane=request_plane,
244
245
246
        block_size=PAGE_SIZE,
        model_name=MODEL_NAME,
    )
247
248
249
250


@pytest.mark.pre_merge
@pytest.mark.gpu_1
251
@pytest.mark.parametrize("request_plane", ["tcp"], indirect=True)
252
def test_router_decisions_sglang_multiple_workers(
253
254
255
256
257
    request,
    runtime_services_dynamic_ports,
    predownload_models,
    set_ucx_tls_no_mm,
    request_plane,
258
):
259
260
261
262
263
    run_router_decisions_test(
        engine_process_cls=SGLangProcess,
        engine_args_name="sglang_args",
        engine_args=SGLANG_ARGS,
        request=request,
264
        request_plane=request_plane,
265
266
267
268
269
270
271
        model_name=MODEL_NAME,
        block_size=PAGE_SIZE,
        component_name="backend",
        num_workers=2,
        single_gpu=True,
        test_dp_rank=False,
    )
272
273
274


@pytest.mark.gpu_2
275
@pytest.mark.pre_merge
276
@pytest.mark.parametrize("request_plane", ["tcp"], indirect=True)
277
@pytest.mark.timeout(600)  # 10 min max (multi-GPU + DP startup variance)
278
279
280
@pytest.mark.skip(
    reason="DYN-2265"
)  # Currently fails probably due to SGLang startup issues when multiple workers on same GPU; re-enable when fixed
281
def test_router_decisions_sglang_dp(
282
283
284
285
286
    request,
    runtime_services_dynamic_ports,
    predownload_models,
    set_ucx_tls_no_mm,
    request_plane,
287
288
289
290
291
292
293
294
):
    """Validate KV cache prefix reuse with SGLang by sending progressive requests with overlapping prefixes.
    Same flow as test_router_decisions_sglang_multiple_workers; force first request to (worker_id, dp_rank=1).
    Dump events from router and verify:
        * All but one (worker_id, dp_rank) should have no events (due to prefix reuse)
        * The (worker_id, dp_rank) with events should have exactly 4 events (one per request)
        * All events should be on the forced (worker_id, dp_rank=1) (verifying forced routing and prefix reuse)
    """
295
296
297
298
299
    run_router_decisions_test(
        engine_process_cls=SGLangProcess,
        engine_args_name="sglang_args",
        engine_args=SGLANG_ARGS,
        request=request,
300
        request_plane=request_plane,
301
302
303
304
305
306
307
308
        model_name=MODEL_NAME,
        block_size=PAGE_SIZE,
        component_name="backend",
        num_workers=1,
        single_gpu=False,
        test_dp_rank=True,
        extra_process_kwargs={"data_parallel_size": 2},
    )
309
310
311
312


@pytest.mark.pre_merge
@pytest.mark.gpu_1
313
@pytest.mark.parametrize(
314
    "store_backend,durable_kv_events,request_plane",
315
    [
316
        ("etcd", False, "tcp"),
317
    ],
318
319
    ids=["nats_core"],
    indirect=["durable_kv_events", "request_plane"],
320
)
321
@pytest.mark.timeout(150)  # ~3x average (~46s/test), rounded up
322
def test_sglang_indexers_sync(
323
324
325
326
327
328
    request,
    runtime_services_dynamic_ports,
    predownload_models,
    file_storage_backend,
    set_ucx_tls_no_mm,
    store_backend,
329
    durable_kv_events,
330
    request_plane,
331
):
332
333
334
335
336
337
    run_indexers_sync_test(
        engine_process_cls=SGLangProcess,
        engine_args_name="sglang_args",
        engine_args=SGLANG_ARGS,
        request=request,
        runtime_services_dynamic_ports=runtime_services_dynamic_ports,
338
339
        store_backend=store_backend,
        durable_kv_events=durable_kv_events,
340
341
342
343
344
        request_plane=request_plane,
        block_size=PAGE_SIZE,
        model_name=MODEL_NAME,
        num_workers=2,
    )