test_router_e2e_with_sglang.py 15.4 KB
Newer Older
1
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2
# SPDX-License-Identifier: Apache-2.0
3
4
5
6
7

# Timing notes (measured in an SGLang-enabled container):
# - GPU-1 subset (`-m "gpu_1"`): 92.35s total for 2 tests (+ 1 skipped).
# These tests load a real model and can be slow/flaky when GPU resources are contended,
# so we set explicit pytest timeouts to fail fast on hangs (see per-test markers below).
8
9
10
11
12
13
import logging
import os
from typing import Any, Dict, Optional

import pytest

14
15
16
from tests.router.e2e_harness import (
    ManagedEngineProcessMixin,
    run_basic_router_test,
17
    run_disagg_router_decisions_test,
18
19
    run_indexers_sync_test,
    run_router_decisions_test,
20
)
21
from tests.router.helper import generate_random_suffix
22
from tests.utils.constants import DefaultPort
23
from tests.utils.managed_process import ManagedProcess
24
from tests.utils.port_utils import allocate_ports, deallocate_ports
25
26
27

logger = logging.getLogger(__name__)

28
MODEL_NAME = "silence09/DeepSeek-R1-Small-2layers"
29
30
31

pytestmark = [
    pytest.mark.e2e,
32
    pytest.mark.router,
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
    pytest.mark.sglang,
    pytest.mark.model(MODEL_NAME),
]
PAGE_SIZE = 16  # SGLang uses "page_size" instead of "block_size"

# Shared SGLang configuration for all tests
# mem_fraction_static limits actual VRAM allocation (required for multi-worker on same GPU)
SGLANG_ARGS: Dict[str, Any] = {
    "page_size": PAGE_SIZE,
    "model": MODEL_NAME,
    "mem_fraction_static": 0.4,  # Limit VRAM allocation per worker (equivalent to vLLM's gpu_memory_utilization)
    "context_length": 1024,  # Limit context length to reduce KV cache size (equivalent to vLLM's max_model_len)
    "disable_cuda_graph": True,  # Disable CUDA graphs for faster startup & lower memory (equivalent to vLLM's enforce_eager)
}


49
class SGLangProcess(ManagedEngineProcessMixin):
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
    """Manages SGLang workers using dynamo.sglang (HTTP API + KV events).

    This is a drop-in replacement for MockerProcess that uses real SGLang workers.
    The key difference: dynamo.sglang automatically handles:
    - HTTP API serving
    - KV cache event publishing (ZMQ → NATS bridge)
    - Integration with dynamo.frontend router
    """

    def __init__(
        self,
        request,
        sglang_args: Optional[Dict[str, Any]] = None,
        num_workers: int = 2,
        single_gpu: bool = False,
        data_parallel_size: Optional[int] = None,
66
67
        request_plane: str = "tcp",
        store_backend: str = "etcd",
68
        durable_kv_events: bool = False,
69
70
71
        namespace: Optional[str] = None,
        gpu_start_index: int = 0,
        disaggregation_mode: Optional[str] = None,
72
73
74
75
76
77
78
79
80
81
82
83
84
85
    ):
        """Initialize SGLang workers with dynamo integration.

        Args:
            request: pytest request fixture for log directory
            sglang_args: Configuration dict with keys:
                - page_size: KV cache page size (default: 16)
                - model: Model name/path (default: TinyLlama-1.1B)
                - mem_fraction_static: Fraction of GPU memory to allocate (optional)
                - context_length: Maximum sequence length (optional)
                - disable_cuda_graph: Disable CUDA graphs (default: False)
            num_workers: Number of SGLang worker processes
            single_gpu: If True, all workers share GPU 0
            data_parallel_size: If set, enables data parallelism with this many ranks (num_workers must equal data_parallel_size)
86
87
            request_plane: Request plane to use ("nats", "tcp", or "http"). Defaults to "tcp".
            store_backend: Storage backend to use ("etcd" or "file"). Defaults to "etcd".
88
            durable_kv_events: If True, use JetStream for durable KV events. Defaults to False (NATS Core mode).
89
90
91
        """
        # Generate unique namespace for isolation
        namespace_suffix = generate_random_suffix()
92
93
94
95
        self.namespace = namespace or f"test-namespace-{namespace_suffix}"
        self.component_name = (
            "prefill" if disaggregation_mode == "prefill" else "backend"
        )
96
97
        self.endpoint = f"dyn://{self.namespace}.{self.component_name}.generate"
        self.num_workers = num_workers
98
        self.data_parallel_size = data_parallel_size
99
        self.worker_processes = []
100
        self.store_backend = store_backend
101

102
103
104
105
106
107
108
109
        # Dynamically allocate unique system and KV event ports (one per worker)
        # to avoid conflicts in parallel test runs.
        self._system_ports = allocate_ports(num_workers, DefaultPort.SYSTEM1.value)
        self._kv_event_ports = allocate_ports(num_workers, DefaultPort.SYSTEM1.value)
        request.addfinalizer(
            lambda: deallocate_ports(self._system_ports + self._kv_event_ports)
        )

110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
        if sglang_args is None:
            sglang_args = {}

        page_size = sglang_args.get("page_size", PAGE_SIZE)
        model = sglang_args.get("model", MODEL_NAME)
        mem_fraction_static = sglang_args.get("mem_fraction_static")
        context_length = sglang_args.get("context_length")
        disable_cuda_graph = sglang_args.get("disable_cuda_graph", False)

        self.model_name = model

        for worker_idx in range(num_workers):
            # Calculate GPU device for this process
            if single_gpu:
                # Force all processes to GPU 0 (for single-GPU testing)
125
                gpu_device = str(gpu_start_index)
126
127
            elif data_parallel_size is not None:
                # Worker sees dp_rank GPUs (each DP rank gets its own GPU)
128
                worker_start_gpu = gpu_start_index + worker_idx * data_parallel_size
129
130
131
132
133
134
135
136
                gpu_device = ",".join(
                    str(i)
                    for i in range(
                        worker_start_gpu, worker_start_gpu + data_parallel_size
                    )
                )
            else:
                # No DP; worker sees one GPU
137
                gpu_device = str(gpu_start_index + worker_idx)
138
139
140
141
142
143
144
145
146
147
148

            command = [
                "python3",
                "-m",
                "dynamo.sglang",
                "--model-path",
                model,
                "--page-size",
                str(page_size),
            ]

149
150
151
152
            # Disable CUDA graphs for faster startup & lower memory.
            # sglang 0.5.10+ has piecewise CUDA graphs (separate flag) that
            # consume ~7 GB during capture — must also be disabled for
            # multi-worker same-GPU tests to avoid OOM.
153
154
            if disable_cuda_graph:
                command.append("--disable-cuda-graph")
155
                command.append("--disable-piecewise-cuda-graph")
156
157
158
159
160
161
162
163
164

            # Limit VRAM allocation (required for multi-worker on same GPU)
            if mem_fraction_static is not None:
                command.extend(["--mem-fraction-static", str(mem_fraction_static)])

            # Add optional context_length if specified
            if context_length is not None:
                command.extend(["--context-length", str(context_length)])

165
166
167
168
            if disaggregation_mode is not None:
                command.extend(["--disaggregation-mode", disaggregation_mode])
                command.extend(["--disaggregation-transfer-backend", "nixl"])

169
170
171
172
173
174
            if data_parallel_size is not None:
                # Add DP configuration
                command.extend(
                    [
                        "--dp-size",
                        str(data_parallel_size),
175
176
177
                        "--tp-size",
                        str(data_parallel_size),
                        "--enable-dp-attention",
178
179
180
181
                    ]
                )

            # Add per-worker KV events config for ZMQ publishing
182
183
            # Ports are dynamically allocated for xdist-safe parallel execution.
            kv_events_port = self._kv_event_ports[worker_idx]
184
185
186
            kv_events_config = f'{{"publisher":"zmq","topic":"kv-events","endpoint":"tcp://*:{kv_events_port}"}}'
            command.extend(["--kv-events-config", kv_events_config])

187
188
189
190
            # Use --durable-kv-events to enable JetStream mode (local indexer disabled)
            if durable_kv_events:
                command.append("--durable-kv-events")

191
192
193
194
            # Each SGLang worker needs a unique DYN_SYSTEM_PORT to avoid conflicts.
            # Ports are dynamically allocated for xdist-safe parallel execution.
            system_port = self._system_ports[worker_idx]

195
            env = os.environ.copy()  # Copy parent environment
196
197
198
199
            env_vars = {
                "CUDA_VISIBLE_DEVICES": gpu_device,
                "DYN_NAMESPACE": self.namespace,
                "DYN_REQUEST_PLANE": request_plane,
200
                "DYN_SYSTEM_PORT": str(system_port),
201
202
203
204
205
206
207
208
                "PYTHONHASHSEED": "0",  # for deterministic event id's
            }

            # Add DYN_FILE_KV if using file storage backend
            if self.store_backend == "file" and "DYN_FILE_KV" in os.environ:
                env_vars["DYN_FILE_KV"] = os.environ["DYN_FILE_KV"]

            env.update(env_vars)
209
210
211
212
213
214
215
216
217
218

            # Create managed process for the worker
            process = ManagedProcess(
                command=command,
                env=env,
                timeout=120,  # Allow time for model loading
                display_output=True,
                health_check_ports=[],
                health_check_urls=[],
                log_dir=request.node.name,
219
                terminate_all_matching_process_names=False,
220
221
222
223
224
            )
            self.worker_processes.append(process)
            if data_parallel_size is not None:
                logger.info(
                    f"Created {data_parallel_size} DP ranks per worker on GPU(s) {gpu_device} "
225
                    f"(mem_frac={mem_fraction_static}, system_port={system_port}, kv_port={kv_events_port}) "
226
227
228
229
230
                    f"with endpoint: {self.endpoint}"
                )
            else:
                logger.info(
                    f"Created SGLang worker {worker_idx} on GPU {gpu_device} "
231
                    f"(mem_frac={mem_fraction_static}, system_port={system_port}, kv_port={kv_events_port}) "
232
233
234
                    f"with endpoint: {self.endpoint}"
                )

235
236
    process_name = "SGLang worker"
    cleanup_name = "SGLang worker resources"
237
238
239
240


@pytest.mark.pre_merge
@pytest.mark.gpu_1
241
@pytest.mark.parametrize("request_plane", ["tcp"], indirect=True)
242
@pytest.mark.timeout(150)  # ~3x average (~46s/test), rounded up
243
def test_sglang_kv_router_basic(
244
245
246
247
248
    request,
    runtime_services_dynamic_ports,
    predownload_models,
    set_ucx_tls_no_mm,
    request_plane,
249
):
250
251
252
253
254
255
256
    run_basic_router_test(
        engine_process_cls=SGLangProcess,
        engine_args_name="sglang_args",
        engine_args=SGLANG_ARGS,
        num_workers=2,
        single_gpu=True,
        request=request,
257
        request_plane=request_plane,
258
259
260
        block_size=PAGE_SIZE,
        model_name=MODEL_NAME,
    )
261
262


263
264
265
@pytest.mark.skip(
    reason="Nightly CI failure: https://linear.app/nvidia/issue/DYN-2784"
)  # Worker #2 dies during startup; KvRouter blocks forever waiting for min_initial_workers=2.
266
267
@pytest.mark.pre_merge
@pytest.mark.gpu_1
268
@pytest.mark.parametrize("request_plane", ["tcp"], indirect=True)
269
def test_router_decisions_sglang_multiple_workers(
270
271
272
273
274
    request,
    runtime_services_dynamic_ports,
    predownload_models,
    set_ucx_tls_no_mm,
    request_plane,
275
):
276
277
278
279
280
    run_router_decisions_test(
        engine_process_cls=SGLangProcess,
        engine_args_name="sglang_args",
        engine_args=SGLANG_ARGS,
        request=request,
281
        request_plane=request_plane,
282
283
284
285
286
287
288
        model_name=MODEL_NAME,
        block_size=PAGE_SIZE,
        component_name="backend",
        num_workers=2,
        single_gpu=True,
        test_dp_rank=False,
    )
289
290
291


@pytest.mark.gpu_2
292
@pytest.mark.pre_merge
293
@pytest.mark.parametrize("request_plane", ["tcp"], indirect=True)
294
@pytest.mark.timeout(600)  # 10 min max (multi-GPU + DP startup variance)
295
296
297
@pytest.mark.skip(
    reason="DYN-2265"
)  # Currently fails probably due to SGLang startup issues when multiple workers on same GPU; re-enable when fixed
298
def test_router_decisions_sglang_dp(
299
300
301
302
303
    request,
    runtime_services_dynamic_ports,
    predownload_models,
    set_ucx_tls_no_mm,
    request_plane,
304
305
306
307
308
309
310
311
):
    """Validate KV cache prefix reuse with SGLang by sending progressive requests with overlapping prefixes.
    Same flow as test_router_decisions_sglang_multiple_workers; force first request to (worker_id, dp_rank=1).
    Dump events from router and verify:
        * All but one (worker_id, dp_rank) should have no events (due to prefix reuse)
        * The (worker_id, dp_rank) with events should have exactly 4 events (one per request)
        * All events should be on the forced (worker_id, dp_rank=1) (verifying forced routing and prefix reuse)
    """
312
313
314
315
316
    run_router_decisions_test(
        engine_process_cls=SGLangProcess,
        engine_args_name="sglang_args",
        engine_args=SGLANG_ARGS,
        request=request,
317
        request_plane=request_plane,
318
319
320
321
322
323
324
325
        model_name=MODEL_NAME,
        block_size=PAGE_SIZE,
        component_name="backend",
        num_workers=1,
        single_gpu=False,
        test_dp_rank=True,
        extra_process_kwargs={"data_parallel_size": 2},
    )
326
327


328
@pytest.mark.skip(reason="Nightly CI failure: https://linear.app/nvidia/issue/DYN-2603")
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
@pytest.mark.gpu_2
@pytest.mark.nightly
@pytest.mark.parametrize("request_plane", ["nats"], indirect=True)
@pytest.mark.timeout(600)
def test_router_decisions_sglang_disagg(
    request,
    runtime_services_dynamic_ports,
    predownload_models,
    set_ucx_tls_no_mm,
    request_plane,
):
    run_disagg_router_decisions_test(
        engine_process_cls=SGLangProcess,
        engine_args_name="sglang_args",
        engine_args=SGLANG_ARGS,
        request=request,
        request_plane=request_plane,
        model_name=MODEL_NAME,
        block_size=PAGE_SIZE,
        num_prefill_workers=2,
        num_decode_workers=1,
        prefill_process_kwargs={
            "single_gpu": True,
            "gpu_start_index": 0,
            "disaggregation_mode": "prefill",
        },
        decode_process_kwargs={
            "single_gpu": True,
            "gpu_start_index": 1,
            "disaggregation_mode": "decode",
        },
    )


363
364
365
366
367
368
369
# DYN-2784: Fixture setup hangs silently in nightly only (worker #2 dies
# in SGLangProcess launch, KvRouter blocks forever on min_initial_workers=2;
# pytest.mark.timeout signal gets swallowed at the C-level syscall).
# Passes reliably in pre_merge/post_merge runs, so scope the skip to the
# nightly pipeline via skip_in_nightly, which nightly-ci.yml excludes from
# its sglang single-GPU marker filter. Remove once DYN-2784 lands a real fix.
@pytest.mark.skip_in_nightly
370
371
@pytest.mark.pre_merge
@pytest.mark.gpu_1
372
@pytest.mark.parametrize(
373
    "store_backend,durable_kv_events,request_plane",
374
    [
375
        ("etcd", False, "tcp"),
376
    ],
377
378
    ids=["nats_core"],
    indirect=["durable_kv_events", "request_plane"],
379
)
380
@pytest.mark.timeout(150)  # ~3x average (~46s/test), rounded up
381
def test_sglang_indexers_sync(
382
383
384
385
386
387
    request,
    runtime_services_dynamic_ports,
    predownload_models,
    file_storage_backend,
    set_ucx_tls_no_mm,
    store_backend,
388
    durable_kv_events,
389
    request_plane,
390
):
391
392
393
394
395
396
    run_indexers_sync_test(
        engine_process_cls=SGLangProcess,
        engine_args_name="sglang_args",
        engine_args=SGLANG_ARGS,
        request=request,
        runtime_services_dynamic_ports=runtime_services_dynamic_ports,
397
398
        store_backend=store_backend,
        durable_kv_events=durable_kv_events,
399
400
401
402
403
        request_plane=request_plane,
        block_size=PAGE_SIZE,
        model_name=MODEL_NAME,
        num_workers=2,
    )