test_router_e2e_with_sglang.py 14.7 KB
Newer Older
1
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2
# SPDX-License-Identifier: Apache-2.0
3
4
5
6
7

# Timing notes (measured in an SGLang-enabled container):
# - GPU-1 subset (`-m "gpu_1"`): 92.35s total for 2 tests (+ 1 skipped).
# These tests load a real model and can be slow/flaky when GPU resources are contended,
# so we set explicit pytest timeouts to fail fast on hangs (see per-test markers below).
8
9
10
11
12
13
import logging
import os
from typing import Any, Dict, Optional

import pytest

14
15
16
from tests.router.e2e_harness import (
    ManagedEngineProcessMixin,
    run_basic_router_test,
17
    run_disagg_router_decisions_test,
18
19
    run_indexers_sync_test,
    run_router_decisions_test,
20
)
21
from tests.router.helper import generate_random_suffix
22
from tests.utils.constants import DefaultPort
23
from tests.utils.managed_process import ManagedProcess
24
from tests.utils.port_utils import allocate_ports, deallocate_ports
25
26
27

logger = logging.getLogger(__name__)

28
MODEL_NAME = "silence09/DeepSeek-R1-Small-2layers"
29
30
31

pytestmark = [
    pytest.mark.e2e,
32
    pytest.mark.router,
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
    pytest.mark.sglang,
    pytest.mark.model(MODEL_NAME),
]
PAGE_SIZE = 16  # SGLang uses "page_size" instead of "block_size"

# Shared SGLang configuration for all tests
# mem_fraction_static limits actual VRAM allocation (required for multi-worker on same GPU)
SGLANG_ARGS: Dict[str, Any] = {
    "page_size": PAGE_SIZE,
    "model": MODEL_NAME,
    "mem_fraction_static": 0.4,  # Limit VRAM allocation per worker (equivalent to vLLM's gpu_memory_utilization)
    "context_length": 1024,  # Limit context length to reduce KV cache size (equivalent to vLLM's max_model_len)
    "disable_cuda_graph": True,  # Disable CUDA graphs for faster startup & lower memory (equivalent to vLLM's enforce_eager)
}


49
class SGLangProcess(ManagedEngineProcessMixin):
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
    """Manages SGLang workers using dynamo.sglang (HTTP API + KV events).

    This is a drop-in replacement for MockerProcess that uses real SGLang workers.
    The key difference: dynamo.sglang automatically handles:
    - HTTP API serving
    - KV cache event publishing (ZMQ → NATS bridge)
    - Integration with dynamo.frontend router
    """

    def __init__(
        self,
        request,
        sglang_args: Optional[Dict[str, Any]] = None,
        num_workers: int = 2,
        single_gpu: bool = False,
        data_parallel_size: Optional[int] = None,
66
67
        request_plane: str = "tcp",
        store_backend: str = "etcd",
68
        durable_kv_events: bool = False,
69
70
71
        namespace: Optional[str] = None,
        gpu_start_index: int = 0,
        disaggregation_mode: Optional[str] = None,
72
73
74
75
76
77
78
79
80
81
82
83
84
85
    ):
        """Initialize SGLang workers with dynamo integration.

        Args:
            request: pytest request fixture for log directory
            sglang_args: Configuration dict with keys:
                - page_size: KV cache page size (default: 16)
                - model: Model name/path (default: TinyLlama-1.1B)
                - mem_fraction_static: Fraction of GPU memory to allocate (optional)
                - context_length: Maximum sequence length (optional)
                - disable_cuda_graph: Disable CUDA graphs (default: False)
            num_workers: Number of SGLang worker processes
            single_gpu: If True, all workers share GPU 0
            data_parallel_size: If set, enables data parallelism with this many ranks (num_workers must equal data_parallel_size)
86
87
            request_plane: Request plane to use ("nats", "tcp", or "http"). Defaults to "tcp".
            store_backend: Storage backend to use ("etcd" or "file"). Defaults to "etcd".
88
            durable_kv_events: If True, use JetStream for durable KV events. Defaults to False (NATS Core mode).
89
90
91
        """
        # Generate unique namespace for isolation
        namespace_suffix = generate_random_suffix()
92
93
94
95
        self.namespace = namespace or f"test-namespace-{namespace_suffix}"
        self.component_name = (
            "prefill" if disaggregation_mode == "prefill" else "backend"
        )
96
97
        self.endpoint = f"dyn://{self.namespace}.{self.component_name}.generate"
        self.num_workers = num_workers
98
        self.data_parallel_size = data_parallel_size
99
        self.worker_processes = []
100
        self.store_backend = store_backend
101

102
103
104
105
106
107
108
109
        # Dynamically allocate unique system and KV event ports (one per worker)
        # to avoid conflicts in parallel test runs.
        self._system_ports = allocate_ports(num_workers, DefaultPort.SYSTEM1.value)
        self._kv_event_ports = allocate_ports(num_workers, DefaultPort.SYSTEM1.value)
        request.addfinalizer(
            lambda: deallocate_ports(self._system_ports + self._kv_event_ports)
        )

110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
        if sglang_args is None:
            sglang_args = {}

        page_size = sglang_args.get("page_size", PAGE_SIZE)
        model = sglang_args.get("model", MODEL_NAME)
        mem_fraction_static = sglang_args.get("mem_fraction_static")
        context_length = sglang_args.get("context_length")
        disable_cuda_graph = sglang_args.get("disable_cuda_graph", False)

        self.model_name = model

        for worker_idx in range(num_workers):
            # Calculate GPU device for this process
            if single_gpu:
                # Force all processes to GPU 0 (for single-GPU testing)
125
                gpu_device = str(gpu_start_index)
126
127
            elif data_parallel_size is not None:
                # Worker sees dp_rank GPUs (each DP rank gets its own GPU)
128
                worker_start_gpu = gpu_start_index + worker_idx * data_parallel_size
129
130
131
132
133
134
135
136
                gpu_device = ",".join(
                    str(i)
                    for i in range(
                        worker_start_gpu, worker_start_gpu + data_parallel_size
                    )
                )
            else:
                # No DP; worker sees one GPU
137
                gpu_device = str(gpu_start_index + worker_idx)
138
139
140
141
142
143
144
145
146
147
148

            command = [
                "python3",
                "-m",
                "dynamo.sglang",
                "--model-path",
                model,
                "--page-size",
                str(page_size),
            ]

149
150
151
152
            # Disable CUDA graphs for faster startup & lower memory.
            # sglang 0.5.10+ has piecewise CUDA graphs (separate flag) that
            # consume ~7 GB during capture — must also be disabled for
            # multi-worker same-GPU tests to avoid OOM.
153
154
            if disable_cuda_graph:
                command.append("--disable-cuda-graph")
155
                command.append("--disable-piecewise-cuda-graph")
156
157
158
159
160
161
162
163
164

            # Limit VRAM allocation (required for multi-worker on same GPU)
            if mem_fraction_static is not None:
                command.extend(["--mem-fraction-static", str(mem_fraction_static)])

            # Add optional context_length if specified
            if context_length is not None:
                command.extend(["--context-length", str(context_length)])

165
166
167
168
            if disaggregation_mode is not None:
                command.extend(["--disaggregation-mode", disaggregation_mode])
                command.extend(["--disaggregation-transfer-backend", "nixl"])

169
170
171
172
173
174
            if data_parallel_size is not None:
                # Add DP configuration
                command.extend(
                    [
                        "--dp-size",
                        str(data_parallel_size),
175
176
177
                        "--tp-size",
                        str(data_parallel_size),
                        "--enable-dp-attention",
178
179
180
181
                    ]
                )

            # Add per-worker KV events config for ZMQ publishing
182
183
            # Ports are dynamically allocated for xdist-safe parallel execution.
            kv_events_port = self._kv_event_ports[worker_idx]
184
185
186
            kv_events_config = f'{{"publisher":"zmq","topic":"kv-events","endpoint":"tcp://*:{kv_events_port}"}}'
            command.extend(["--kv-events-config", kv_events_config])

187
188
189
190
            # Use --durable-kv-events to enable JetStream mode (local indexer disabled)
            if durable_kv_events:
                command.append("--durable-kv-events")

191
192
193
194
            # Each SGLang worker needs a unique DYN_SYSTEM_PORT to avoid conflicts.
            # Ports are dynamically allocated for xdist-safe parallel execution.
            system_port = self._system_ports[worker_idx]

195
            env = os.environ.copy()  # Copy parent environment
196
197
198
199
            env_vars = {
                "CUDA_VISIBLE_DEVICES": gpu_device,
                "DYN_NAMESPACE": self.namespace,
                "DYN_REQUEST_PLANE": request_plane,
200
                "DYN_SYSTEM_PORT": str(system_port),
201
202
203
204
205
206
207
208
                "PYTHONHASHSEED": "0",  # for deterministic event id's
            }

            # Add DYN_FILE_KV if using file storage backend
            if self.store_backend == "file" and "DYN_FILE_KV" in os.environ:
                env_vars["DYN_FILE_KV"] = os.environ["DYN_FILE_KV"]

            env.update(env_vars)
209
210
211
212
213
214
215
216
217
218

            # Create managed process for the worker
            process = ManagedProcess(
                command=command,
                env=env,
                timeout=120,  # Allow time for model loading
                display_output=True,
                health_check_ports=[],
                health_check_urls=[],
                log_dir=request.node.name,
219
                terminate_all_matching_process_names=False,
220
221
222
223
224
            )
            self.worker_processes.append(process)
            if data_parallel_size is not None:
                logger.info(
                    f"Created {data_parallel_size} DP ranks per worker on GPU(s) {gpu_device} "
225
                    f"(mem_frac={mem_fraction_static}, system_port={system_port}, kv_port={kv_events_port}) "
226
227
228
229
230
                    f"with endpoint: {self.endpoint}"
                )
            else:
                logger.info(
                    f"Created SGLang worker {worker_idx} on GPU {gpu_device} "
231
                    f"(mem_frac={mem_fraction_static}, system_port={system_port}, kv_port={kv_events_port}) "
232
233
234
                    f"with endpoint: {self.endpoint}"
                )

235
236
    process_name = "SGLang worker"
    cleanup_name = "SGLang worker resources"
237
238
239
240


@pytest.mark.pre_merge
@pytest.mark.gpu_1
241
@pytest.mark.parametrize("request_plane", ["tcp"], indirect=True)
242
@pytest.mark.timeout(150)  # ~3x average (~46s/test), rounded up
243
def test_sglang_kv_router_basic(
244
245
246
247
248
    request,
    runtime_services_dynamic_ports,
    predownload_models,
    set_ucx_tls_no_mm,
    request_plane,
249
):
250
251
252
253
254
255
256
    run_basic_router_test(
        engine_process_cls=SGLangProcess,
        engine_args_name="sglang_args",
        engine_args=SGLANG_ARGS,
        num_workers=2,
        single_gpu=True,
        request=request,
257
        request_plane=request_plane,
258
259
260
        block_size=PAGE_SIZE,
        model_name=MODEL_NAME,
    )
261
262
263
264


@pytest.mark.pre_merge
@pytest.mark.gpu_1
265
@pytest.mark.parametrize("request_plane", ["tcp"], indirect=True)
266
def test_router_decisions_sglang_multiple_workers(
267
268
269
270
271
    request,
    runtime_services_dynamic_ports,
    predownload_models,
    set_ucx_tls_no_mm,
    request_plane,
272
):
273
274
275
276
277
    run_router_decisions_test(
        engine_process_cls=SGLangProcess,
        engine_args_name="sglang_args",
        engine_args=SGLANG_ARGS,
        request=request,
278
        request_plane=request_plane,
279
280
281
282
283
284
285
        model_name=MODEL_NAME,
        block_size=PAGE_SIZE,
        component_name="backend",
        num_workers=2,
        single_gpu=True,
        test_dp_rank=False,
    )
286
287
288


@pytest.mark.gpu_2
289
@pytest.mark.pre_merge
290
@pytest.mark.parametrize("request_plane", ["tcp"], indirect=True)
291
@pytest.mark.timeout(600)  # 10 min max (multi-GPU + DP startup variance)
292
293
294
@pytest.mark.skip(
    reason="DYN-2265"
)  # Currently fails probably due to SGLang startup issues when multiple workers on same GPU; re-enable when fixed
295
def test_router_decisions_sglang_dp(
296
297
298
299
300
    request,
    runtime_services_dynamic_ports,
    predownload_models,
    set_ucx_tls_no_mm,
    request_plane,
301
302
303
304
305
306
307
308
):
    """Validate KV cache prefix reuse with SGLang by sending progressive requests with overlapping prefixes.
    Same flow as test_router_decisions_sglang_multiple_workers; force first request to (worker_id, dp_rank=1).
    Dump events from router and verify:
        * All but one (worker_id, dp_rank) should have no events (due to prefix reuse)
        * The (worker_id, dp_rank) with events should have exactly 4 events (one per request)
        * All events should be on the forced (worker_id, dp_rank=1) (verifying forced routing and prefix reuse)
    """
309
310
311
312
313
    run_router_decisions_test(
        engine_process_cls=SGLangProcess,
        engine_args_name="sglang_args",
        engine_args=SGLANG_ARGS,
        request=request,
314
        request_plane=request_plane,
315
316
317
318
319
320
321
322
        model_name=MODEL_NAME,
        block_size=PAGE_SIZE,
        component_name="backend",
        num_workers=1,
        single_gpu=False,
        test_dp_rank=True,
        extra_process_kwargs={"data_parallel_size": 2},
    )
323
324


325
@pytest.mark.skip(reason="Nightly CI failure: https://linear.app/nvidia/issue/DYN-2603")
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
@pytest.mark.gpu_2
@pytest.mark.nightly
@pytest.mark.parametrize("request_plane", ["nats"], indirect=True)
@pytest.mark.timeout(600)
def test_router_decisions_sglang_disagg(
    request,
    runtime_services_dynamic_ports,
    predownload_models,
    set_ucx_tls_no_mm,
    request_plane,
):
    run_disagg_router_decisions_test(
        engine_process_cls=SGLangProcess,
        engine_args_name="sglang_args",
        engine_args=SGLANG_ARGS,
        request=request,
        request_plane=request_plane,
        model_name=MODEL_NAME,
        block_size=PAGE_SIZE,
        num_prefill_workers=2,
        num_decode_workers=1,
        prefill_process_kwargs={
            "single_gpu": True,
            "gpu_start_index": 0,
            "disaggregation_mode": "prefill",
        },
        decode_process_kwargs={
            "single_gpu": True,
            "gpu_start_index": 1,
            "disaggregation_mode": "decode",
        },
    )


360
361
@pytest.mark.pre_merge
@pytest.mark.gpu_1
362
@pytest.mark.parametrize(
363
    "store_backend,durable_kv_events,request_plane",
364
    [
365
        ("etcd", False, "tcp"),
366
    ],
367
368
    ids=["nats_core"],
    indirect=["durable_kv_events", "request_plane"],
369
)
370
@pytest.mark.timeout(150)  # ~3x average (~46s/test), rounded up
371
def test_sglang_indexers_sync(
372
373
374
375
376
377
    request,
    runtime_services_dynamic_ports,
    predownload_models,
    file_storage_backend,
    set_ucx_tls_no_mm,
    store_backend,
378
    durable_kv_events,
379
    request_plane,
380
):
381
382
383
384
385
386
    run_indexers_sync_test(
        engine_process_cls=SGLangProcess,
        engine_args_name="sglang_args",
        engine_args=SGLANG_ARGS,
        request=request,
        runtime_services_dynamic_ports=runtime_services_dynamic_ports,
387
388
        store_backend=store_backend,
        durable_kv_events=durable_kv_events,
389
390
391
392
393
        request_plane=request_plane,
        block_size=PAGE_SIZE,
        model_name=MODEL_NAME,
        num_workers=2,
    )