test_router_e2e_with_sglang.py 14.5 KB
Newer Older
1
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2
# SPDX-License-Identifier: Apache-2.0
3
4
5
6
7

# Timing notes (measured in an SGLang-enabled container):
# - GPU-1 subset (`-m "gpu_1"`): 92.35s total for 2 tests (+ 1 skipped).
# These tests load a real model and can be slow/flaky when GPU resources are contended,
# so we set explicit pytest timeouts to fail fast on hangs (see per-test markers below).
8
9
10
11
12
13
import logging
import os
from typing import Any, Dict, Optional

import pytest

14
15
16
from tests.router.e2e_harness import (
    ManagedEngineProcessMixin,
    run_basic_router_test,
17
    run_disagg_router_decisions_test,
18
19
    run_indexers_sync_test,
    run_router_decisions_test,
20
)
21
from tests.router.helper import generate_random_suffix
22
from tests.utils.constants import DefaultPort
23
from tests.utils.managed_process import ManagedProcess
24
from tests.utils.port_utils import allocate_ports, deallocate_ports
25
26
27

logger = logging.getLogger(__name__)

28
MODEL_NAME = "silence09/DeepSeek-R1-Small-2layers"
29
30
31

pytestmark = [
    pytest.mark.e2e,
32
    pytest.mark.router,
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
    pytest.mark.sglang,
    pytest.mark.model(MODEL_NAME),
]
PAGE_SIZE = 16  # SGLang uses "page_size" instead of "block_size"

# Shared SGLang configuration for all tests
# mem_fraction_static limits actual VRAM allocation (required for multi-worker on same GPU)
SGLANG_ARGS: Dict[str, Any] = {
    "page_size": PAGE_SIZE,
    "model": MODEL_NAME,
    "mem_fraction_static": 0.4,  # Limit VRAM allocation per worker (equivalent to vLLM's gpu_memory_utilization)
    "context_length": 1024,  # Limit context length to reduce KV cache size (equivalent to vLLM's max_model_len)
    "disable_cuda_graph": True,  # Disable CUDA graphs for faster startup & lower memory (equivalent to vLLM's enforce_eager)
}


49
class SGLangProcess(ManagedEngineProcessMixin):
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
    """Manages SGLang workers using dynamo.sglang (HTTP API + KV events).

    This is a drop-in replacement for MockerProcess that uses real SGLang workers.
    The key difference: dynamo.sglang automatically handles:
    - HTTP API serving
    - KV cache event publishing (ZMQ → NATS bridge)
    - Integration with dynamo.frontend router
    """

    def __init__(
        self,
        request,
        sglang_args: Optional[Dict[str, Any]] = None,
        num_workers: int = 2,
        single_gpu: bool = False,
        data_parallel_size: Optional[int] = None,
66
67
        request_plane: str = "tcp",
        store_backend: str = "etcd",
68
        durable_kv_events: bool = False,
69
70
71
        namespace: Optional[str] = None,
        gpu_start_index: int = 0,
        disaggregation_mode: Optional[str] = None,
72
73
74
75
76
77
78
79
80
81
82
83
84
85
    ):
        """Initialize SGLang workers with dynamo integration.

        Args:
            request: pytest request fixture for log directory
            sglang_args: Configuration dict with keys:
                - page_size: KV cache page size (default: 16)
                - model: Model name/path (default: TinyLlama-1.1B)
                - mem_fraction_static: Fraction of GPU memory to allocate (optional)
                - context_length: Maximum sequence length (optional)
                - disable_cuda_graph: Disable CUDA graphs (default: False)
            num_workers: Number of SGLang worker processes
            single_gpu: If True, all workers share GPU 0
            data_parallel_size: If set, enables data parallelism with this many ranks (num_workers must equal data_parallel_size)
86
87
            request_plane: Request plane to use ("nats", "tcp", or "http"). Defaults to "tcp".
            store_backend: Storage backend to use ("etcd" or "file"). Defaults to "etcd".
88
            durable_kv_events: If True, use JetStream for durable KV events. Defaults to False (NATS Core mode).
89
90
91
        """
        # Generate unique namespace for isolation
        namespace_suffix = generate_random_suffix()
92
93
94
95
        self.namespace = namespace or f"test-namespace-{namespace_suffix}"
        self.component_name = (
            "prefill" if disaggregation_mode == "prefill" else "backend"
        )
96
97
        self.endpoint = f"dyn://{self.namespace}.{self.component_name}.generate"
        self.num_workers = num_workers
98
        self.data_parallel_size = data_parallel_size
99
        self.worker_processes = []
100
        self.store_backend = store_backend
101

102
103
104
105
106
107
108
109
        # Dynamically allocate unique system and KV event ports (one per worker)
        # to avoid conflicts in parallel test runs.
        self._system_ports = allocate_ports(num_workers, DefaultPort.SYSTEM1.value)
        self._kv_event_ports = allocate_ports(num_workers, DefaultPort.SYSTEM1.value)
        request.addfinalizer(
            lambda: deallocate_ports(self._system_ports + self._kv_event_ports)
        )

110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
        if sglang_args is None:
            sglang_args = {}

        page_size = sglang_args.get("page_size", PAGE_SIZE)
        model = sglang_args.get("model", MODEL_NAME)
        mem_fraction_static = sglang_args.get("mem_fraction_static")
        context_length = sglang_args.get("context_length")
        disable_cuda_graph = sglang_args.get("disable_cuda_graph", False)

        self.model_name = model

        for worker_idx in range(num_workers):
            # Calculate GPU device for this process
            if single_gpu:
                # Force all processes to GPU 0 (for single-GPU testing)
125
                gpu_device = str(gpu_start_index)
126
127
            elif data_parallel_size is not None:
                # Worker sees dp_rank GPUs (each DP rank gets its own GPU)
128
                worker_start_gpu = gpu_start_index + worker_idx * data_parallel_size
129
130
131
132
133
134
135
136
                gpu_device = ",".join(
                    str(i)
                    for i in range(
                        worker_start_gpu, worker_start_gpu + data_parallel_size
                    )
                )
            else:
                # No DP; worker sees one GPU
137
                gpu_device = str(gpu_start_index + worker_idx)
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160

            command = [
                "python3",
                "-m",
                "dynamo.sglang",
                "--model-path",
                model,
                "--page-size",
                str(page_size),
            ]

            # Disable CUDA graphs for faster startup & lower memory
            if disable_cuda_graph:
                command.append("--disable-cuda-graph")

            # Limit VRAM allocation (required for multi-worker on same GPU)
            if mem_fraction_static is not None:
                command.extend(["--mem-fraction-static", str(mem_fraction_static)])

            # Add optional context_length if specified
            if context_length is not None:
                command.extend(["--context-length", str(context_length)])

161
162
163
164
            if disaggregation_mode is not None:
                command.extend(["--disaggregation-mode", disaggregation_mode])
                command.extend(["--disaggregation-transfer-backend", "nixl"])

165
166
167
168
169
170
            if data_parallel_size is not None:
                # Add DP configuration
                command.extend(
                    [
                        "--dp-size",
                        str(data_parallel_size),
171
172
173
                        "--tp-size",
                        str(data_parallel_size),
                        "--enable-dp-attention",
174
175
176
177
                    ]
                )

            # Add per-worker KV events config for ZMQ publishing
178
179
            # Ports are dynamically allocated for xdist-safe parallel execution.
            kv_events_port = self._kv_event_ports[worker_idx]
180
181
182
            kv_events_config = f'{{"publisher":"zmq","topic":"kv-events","endpoint":"tcp://*:{kv_events_port}"}}'
            command.extend(["--kv-events-config", kv_events_config])

183
184
185
186
            # Use --durable-kv-events to enable JetStream mode (local indexer disabled)
            if durable_kv_events:
                command.append("--durable-kv-events")

187
188
189
190
            # Each SGLang worker needs a unique DYN_SYSTEM_PORT to avoid conflicts.
            # Ports are dynamically allocated for xdist-safe parallel execution.
            system_port = self._system_ports[worker_idx]

191
            env = os.environ.copy()  # Copy parent environment
192
193
194
195
            env_vars = {
                "CUDA_VISIBLE_DEVICES": gpu_device,
                "DYN_NAMESPACE": self.namespace,
                "DYN_REQUEST_PLANE": request_plane,
196
                "DYN_SYSTEM_PORT": str(system_port),
197
198
199
200
201
202
203
204
                "PYTHONHASHSEED": "0",  # for deterministic event id's
            }

            # Add DYN_FILE_KV if using file storage backend
            if self.store_backend == "file" and "DYN_FILE_KV" in os.environ:
                env_vars["DYN_FILE_KV"] = os.environ["DYN_FILE_KV"]

            env.update(env_vars)
205
206
207
208
209
210
211
212
213
214

            # Create managed process for the worker
            process = ManagedProcess(
                command=command,
                env=env,
                timeout=120,  # Allow time for model loading
                display_output=True,
                health_check_ports=[],
                health_check_urls=[],
                log_dir=request.node.name,
215
                terminate_all_matching_process_names=False,
216
217
218
219
220
            )
            self.worker_processes.append(process)
            if data_parallel_size is not None:
                logger.info(
                    f"Created {data_parallel_size} DP ranks per worker on GPU(s) {gpu_device} "
221
                    f"(mem_frac={mem_fraction_static}, system_port={system_port}, kv_port={kv_events_port}) "
222
223
224
225
226
                    f"with endpoint: {self.endpoint}"
                )
            else:
                logger.info(
                    f"Created SGLang worker {worker_idx} on GPU {gpu_device} "
227
                    f"(mem_frac={mem_fraction_static}, system_port={system_port}, kv_port={kv_events_port}) "
228
229
230
                    f"with endpoint: {self.endpoint}"
                )

231
232
    process_name = "SGLang worker"
    cleanup_name = "SGLang worker resources"
233
234
235
236


@pytest.mark.pre_merge
@pytest.mark.gpu_1
237
@pytest.mark.parametrize("request_plane", ["tcp"], indirect=True)
238
@pytest.mark.timeout(150)  # ~3x average (~46s/test), rounded up
239
def test_sglang_kv_router_basic(
240
241
242
243
244
    request,
    runtime_services_dynamic_ports,
    predownload_models,
    set_ucx_tls_no_mm,
    request_plane,
245
):
246
247
248
249
250
251
252
    run_basic_router_test(
        engine_process_cls=SGLangProcess,
        engine_args_name="sglang_args",
        engine_args=SGLANG_ARGS,
        num_workers=2,
        single_gpu=True,
        request=request,
253
        request_plane=request_plane,
254
255
256
        block_size=PAGE_SIZE,
        model_name=MODEL_NAME,
    )
257
258
259
260


@pytest.mark.pre_merge
@pytest.mark.gpu_1
261
@pytest.mark.parametrize("request_plane", ["tcp"], indirect=True)
262
def test_router_decisions_sglang_multiple_workers(
263
264
265
266
267
    request,
    runtime_services_dynamic_ports,
    predownload_models,
    set_ucx_tls_no_mm,
    request_plane,
268
):
269
270
271
272
273
    run_router_decisions_test(
        engine_process_cls=SGLangProcess,
        engine_args_name="sglang_args",
        engine_args=SGLANG_ARGS,
        request=request,
274
        request_plane=request_plane,
275
276
277
278
279
280
281
        model_name=MODEL_NAME,
        block_size=PAGE_SIZE,
        component_name="backend",
        num_workers=2,
        single_gpu=True,
        test_dp_rank=False,
    )
282
283
284


@pytest.mark.gpu_2
285
@pytest.mark.pre_merge
286
@pytest.mark.parametrize("request_plane", ["tcp"], indirect=True)
287
@pytest.mark.timeout(600)  # 10 min max (multi-GPU + DP startup variance)
288
289
290
@pytest.mark.skip(
    reason="DYN-2265"
)  # Currently fails probably due to SGLang startup issues when multiple workers on same GPU; re-enable when fixed
291
def test_router_decisions_sglang_dp(
292
293
294
295
296
    request,
    runtime_services_dynamic_ports,
    predownload_models,
    set_ucx_tls_no_mm,
    request_plane,
297
298
299
300
301
302
303
304
):
    """Validate KV cache prefix reuse with SGLang by sending progressive requests with overlapping prefixes.
    Same flow as test_router_decisions_sglang_multiple_workers; force first request to (worker_id, dp_rank=1).
    Dump events from router and verify:
        * All but one (worker_id, dp_rank) should have no events (due to prefix reuse)
        * The (worker_id, dp_rank) with events should have exactly 4 events (one per request)
        * All events should be on the forced (worker_id, dp_rank=1) (verifying forced routing and prefix reuse)
    """
305
306
307
308
309
    run_router_decisions_test(
        engine_process_cls=SGLangProcess,
        engine_args_name="sglang_args",
        engine_args=SGLANG_ARGS,
        request=request,
310
        request_plane=request_plane,
311
312
313
314
315
316
317
318
        model_name=MODEL_NAME,
        block_size=PAGE_SIZE,
        component_name="backend",
        num_workers=1,
        single_gpu=False,
        test_dp_rank=True,
        extra_process_kwargs={"data_parallel_size": 2},
    )
319
320


321
@pytest.mark.skip(reason="Nightly CI failure: https://linear.app/nvidia/issue/DYN-2603")
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
@pytest.mark.gpu_2
@pytest.mark.nightly
@pytest.mark.parametrize("request_plane", ["nats"], indirect=True)
@pytest.mark.timeout(600)
def test_router_decisions_sglang_disagg(
    request,
    runtime_services_dynamic_ports,
    predownload_models,
    set_ucx_tls_no_mm,
    request_plane,
):
    run_disagg_router_decisions_test(
        engine_process_cls=SGLangProcess,
        engine_args_name="sglang_args",
        engine_args=SGLANG_ARGS,
        request=request,
        request_plane=request_plane,
        model_name=MODEL_NAME,
        block_size=PAGE_SIZE,
        num_prefill_workers=2,
        num_decode_workers=1,
        prefill_process_kwargs={
            "single_gpu": True,
            "gpu_start_index": 0,
            "disaggregation_mode": "prefill",
        },
        decode_process_kwargs={
            "single_gpu": True,
            "gpu_start_index": 1,
            "disaggregation_mode": "decode",
        },
    )


356
357
@pytest.mark.pre_merge
@pytest.mark.gpu_1
358
@pytest.mark.parametrize(
359
    "store_backend,durable_kv_events,request_plane",
360
    [
361
        ("etcd", False, "tcp"),
362
    ],
363
364
    ids=["nats_core"],
    indirect=["durable_kv_events", "request_plane"],
365
)
366
@pytest.mark.timeout(150)  # ~3x average (~46s/test), rounded up
367
def test_sglang_indexers_sync(
368
369
370
371
372
373
    request,
    runtime_services_dynamic_ports,
    predownload_models,
    file_storage_backend,
    set_ucx_tls_no_mm,
    store_backend,
374
    durable_kv_events,
375
    request_plane,
376
):
377
378
379
380
381
382
    run_indexers_sync_test(
        engine_process_cls=SGLangProcess,
        engine_args_name="sglang_args",
        engine_args=SGLANG_ARGS,
        request=request,
        runtime_services_dynamic_ports=runtime_services_dynamic_ports,
383
384
        store_backend=store_backend,
        durable_kv_events=durable_kv_events,
385
386
387
388
389
        request_plane=request_plane,
        block_size=PAGE_SIZE,
        model_name=MODEL_NAME,
        num_workers=2,
    )