test_router_e2e_with_trtllm.py 13.9 KB
Newer Older
1
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2
# SPDX-License-Identifier: Apache-2.0
3
4
5
6
7

# Timing notes (measured in a TRT-LLM-enabled container):
# - GPU-1 subset (`-m "gpu_1"`): 136.36s total for 3 tests.
# These tests load a real model and can be slow/flaky when GPU resources are contended,
# so we set explicit pytest timeouts to fail fast on hangs (see per-test markers below).
8
9
10
11
12
13
import logging
import os
from typing import Any, Dict, Optional

import pytest

14
15
16
from tests.router.e2e_harness import (
    ManagedEngineProcessMixin,
    run_basic_router_test,
17
    run_disagg_router_decisions_test,
18
19
    run_indexers_sync_test,
    run_router_decisions_test,
20
)
21
from tests.router.helper import generate_random_suffix
22
from tests.utils.constants import DefaultPort
23
from tests.utils.managed_process import ManagedProcess
24
from tests.utils.port_utils import allocate_ports, deallocate_ports
25
26
27
28
29
30
31
32

logger = logging.getLogger(__name__)

MODEL_NAME = "TinyLlama/TinyLlama-1.1B-Chat-v1.0"
TRTLLM_BLOCK_SIZE = 32  # fixed internally to 32

pytestmark = [
    pytest.mark.e2e,
33
    pytest.mark.router,
34
35
36
37
38
39
40
41
42
43
44
45
46
47
    pytest.mark.trtllm,
    pytest.mark.model(MODEL_NAME),
]

# Shared TRT-LLM configuration for all tests
# free_gpu_memory_fraction limits actual VRAM allocation (required for multi-worker on same GPU)
TRTLLM_ARGS: Dict[str, Any] = {
    "kv_block_size": TRTLLM_BLOCK_SIZE,
    "model": MODEL_NAME,
    "free_gpu_memory_fraction": 0.4,  # Limit VRAM allocation per worker
    "max_seq_len": 1024,  # Limit context length to reduce KV cache size
}


48
class TRTLLMProcess(ManagedEngineProcessMixin):
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
    """Manages TRT-LLM workers using dynamo.trtllm (HTTP API + KV events).

    This is a drop-in replacement for MockerProcess that uses real TRT-LLM workers.
    The key difference: dynamo.trtllm automatically handles:
    - HTTP API serving
    - KV cache event publishing
    - Integration with dynamo.frontend router
    """

    def __init__(
        self,
        request,
        trtllm_args: Optional[Dict[str, Any]] = None,
        num_workers: int = 2,
        single_gpu: bool = False,
64
65
        request_plane: str = "tcp",
        store_backend: str = "etcd",
66
        durable_kv_events: bool = False,
67
68
69
        namespace: Optional[str] = None,
        gpu_start_index: int = 0,
        disaggregation_mode: Optional[str] = None,
70
71
72
73
74
75
76
77
78
79
    ):
        """Initialize TRT-LLM workers with dynamo integration.

        Args:
            request: pytest request fixture for log directory
            trtllm_args: Configuration dict with keys:
                - kv_block_size: KV cache block size (default: 32)
                - model: Model name/path (default: TinyLlama-1.1B)
                - free_gpu_memory_fraction: Fraction of GPU memory to allocate (optional)
                - max_seq_len: Maximum sequence length (optional)
80
81
82
83
84
                - tensor_parallel_size: Number of GPUs for tensor parallelism (optional).
                  When attention DP is enabled, this sets the world size, which then is the attention_dp_size.
                - enable_attention_dp: If True, enable TRT-LLM attention data parallelism.
                  When enabled, attention_dp_size equals tensor_parallel_size, creating
                  multiple routing targets within a single TRT-LLM worker process.
85
86
            num_workers: Number of TRT-LLM worker processes
            single_gpu: If True, all workers share GPU 0
87
88
            request_plane: Request plane to use ("nats", "tcp", or "http"). Defaults to "tcp".
            store_backend: Storage backend to use ("etcd" or "file"). Defaults to "etcd".
89
            durable_kv_events: If True, use JetStream for durable KV events. Defaults to False (NATS Core mode).
90

91
92
93
94
        Note: TRT-LLM supports two forms of parallelism for routing:
              1. Multiple workers (num_workers > 1): Each worker is a separate routing target
              2. Attention DP (enable_attention_dp=True in trtllm_args): Single worker with
                 multiple internal attention DP ranks, each being a separate routing target
95
96
97
        """
        # Generate unique namespace for isolation
        namespace_suffix = generate_random_suffix()
98
99
100
101
        self.namespace = namespace or f"test-namespace-{namespace_suffix}"
        self.component_name = (
            "prefill" if disaggregation_mode == "prefill" else "tensorrt_llm"
        )
102
103
104
        self.endpoint = f"dyn://{self.namespace}.{self.component_name}.generate"
        self.num_workers = num_workers
        self.worker_processes = []
105
        self.store_backend = store_backend
106

107
108
109
110
111
        # Dynamically allocate unique system ports (one per worker) to avoid
        # conflicts when tests run in parallel via pytest-xdist.
        self._system_ports = allocate_ports(num_workers, DefaultPort.SYSTEM1.value)
        request.addfinalizer(lambda: deallocate_ports(self._system_ports))

112
113
114
115
116
117
        if trtllm_args is None:
            trtllm_args = {}

        model = trtllm_args.get("model", MODEL_NAME)
        free_gpu_memory_fraction = trtllm_args.get("free_gpu_memory_fraction")
        max_seq_len = trtllm_args.get("max_seq_len")
118
119
        enable_attention_dp = trtllm_args.get("enable_attention_dp", False)
        tensor_parallel_size = trtllm_args.get("tensor_parallel_size")
120
121
122
123
124
125
126

        self.model_name = model

        for worker_idx in range(num_workers):
            # Calculate GPU device for this process
            if single_gpu:
                # Force all processes to GPU 0 (for single-GPU testing)
127
                gpu_device = str(gpu_start_index)
128
129
130
            elif enable_attention_dp and tensor_parallel_size:
                # For attention DP, TRT-LLM spawns tensor_parallel_size internal MPI workers.
                # So one process = two attention DP ranks = visibility in to both GPUs.
131
132
133
                gpu_device = ",".join(
                    str(gpu_start_index + i) for i in range(tensor_parallel_size)
                )
134
135
            else:
                # Each worker sees one GPU
136
                gpu_device = str(gpu_start_index + worker_idx)
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151

            # Single-node TRT-LLM workers use python3 -m dynamo.trtllm directly
            # (trtllm-llmapi-launch is only needed for multi-node MPI deployments)
            command = [
                "python3",
                "-m",
                "dynamo.trtllm",
                "--model-path",
                model,
                "--kv-block-size",
                str(TRTLLM_BLOCK_SIZE),
                # Enable KV events publishing for router integration
                "--publish-events-and-metrics",
            ]

152
153
154
            if disaggregation_mode is not None:
                command.extend(["--disaggregation-mode", disaggregation_mode])

155
156
157
158
159
160
161
162
163
164
            # Limit VRAM allocation (required for multi-worker on same GPU)
            if free_gpu_memory_fraction is not None:
                command.extend(
                    ["--free-gpu-memory-fraction", str(free_gpu_memory_fraction)]
                )

            # Add optional max_seq_len if specified
            if max_seq_len is not None:
                command.extend(["--max-seq-len", str(max_seq_len)])

165
166
167
168
            # Use --durable-kv-events to enable JetStream mode (local indexer disabled)
            if durable_kv_events:
                command.append("--durable-kv-events")

169
170
171
172
173
174
175
176
            # Set tensor parallel size if specified (needed for attention DP)
            if tensor_parallel_size is not None:
                command.extend(["--tensor-parallel-size", str(tensor_parallel_size)])

            # Enable attention data parallelism if requested
            if enable_attention_dp:
                command.append("--enable-attention-dp")

177
            # Each TRT-LLM worker needs a unique DYN_SYSTEM_PORT to avoid conflicts.
178
179
            # Ports are dynamically allocated for xdist-safe parallel execution.
            system_port = self._system_ports[worker_idx]
180
181

            env = os.environ.copy()  # Copy parent environment
182
183
184
185
186
187
188
189
190
191
192
193
194
            env_vars = {
                "CUDA_VISIBLE_DEVICES": gpu_device,
                "DYN_NAMESPACE": self.namespace,
                "DYN_REQUEST_PLANE": request_plane,
                "PYTHONHASHSEED": "0",  # for deterministic event id's
                "DYN_SYSTEM_PORT": str(system_port),
            }

            # Add DYN_FILE_KV if using file storage backend
            if self.store_backend == "file" and "DYN_FILE_KV" in os.environ:
                env_vars["DYN_FILE_KV"] = os.environ["DYN_FILE_KV"]

            env.update(env_vars)
195
196
197
198
199
200
201
202
203
204

            # Create managed process for the worker
            process = ManagedProcess(
                command=command,
                env=env,
                timeout=180,  # Allow time for model loading (TRT-LLM may take longer)
                display_output=True,
                health_check_ports=[],
                health_check_urls=[],
                log_dir=request.node.name,
205
                terminate_all_matching_process_names=False,
206
207
208
209
210
211
212
213
            )
            self.worker_processes.append(process)
            logger.info(
                f"Created TRT-LLM worker {worker_idx} on GPU {gpu_device} "
                f"(gpu_mem_frac={free_gpu_memory_fraction}, system_port={system_port}) "
                f"with endpoint: {self.endpoint}"
            )

214
215
    process_name = "TRT-LLM worker"
    cleanup_name = "TRT-LLM worker resources"
216
217
218


@pytest.mark.gpu_1
219
@pytest.mark.nightly
220
@pytest.mark.parametrize("request_plane", ["tcp"], indirect=True)
221
@pytest.mark.timeout(300)
222
def test_trtllm_kv_router_basic(
223
224
225
226
227
    request,
    runtime_services_dynamic_ports,
    predownload_models,
    set_ucx_tls_no_mm,
    request_plane,
228
):
229
230
231
232
233
234
235
    run_basic_router_test(
        engine_process_cls=TRTLLMProcess,
        engine_args_name="trtllm_args",
        engine_args=TRTLLM_ARGS,
        num_workers=2,
        single_gpu=True,
        request=request,
236
        request_plane=request_plane,
237
238
239
        block_size=TRTLLM_BLOCK_SIZE,
        model_name=MODEL_NAME,
    )
240
241


242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
@pytest.mark.gpu_2
@pytest.mark.nightly
@pytest.mark.parametrize("request_plane", ["tcp"], indirect=True)
@pytest.mark.timeout(600)  # 10 min max (multi-GPU + DP startup variance)
def test_router_decisions_trtllm_attention_dp(
    request,
    runtime_services_dynamic_ports,
    predownload_models,
    set_ucx_tls_no_mm,
    request_plane,
):
    """Validate KV cache prefix reuse with TRTLLM by sending progressive requests with overlapping prefixes.
    Same flow as test_router_decisions_trtllm_multiple_workers; force first request to (worker_id, dp_rank=1).
    Dump events from router and verify:
        * All but one (worker_id, dp_rank) should have no events (due to prefix reuse)
        * The (worker_id, dp_rank) with events should have exactly 4 events (one per request)
        * All events should be on the forced (worker_id, dp_rank=1) (verifying forced routing and prefix reuse)
    """
260
261
262
263
264
265
266
267
268
    run_router_decisions_test(
        engine_process_cls=TRTLLMProcess,
        engine_args_name="trtllm_args",
        engine_args={
            **TRTLLM_ARGS,
            "enable_attention_dp": True,
            "tensor_parallel_size": 2,
        },
        request=request,
269
        request_plane=request_plane,
270
271
272
273
274
275
276
        model_name=MODEL_NAME,
        block_size=TRTLLM_BLOCK_SIZE,
        component_name="tensorrt_llm",
        num_workers=1,
        single_gpu=False,
        test_dp_rank=True,
    )
277
278


279
@pytest.mark.gpu_1
280
@pytest.mark.nightly
281
@pytest.mark.parametrize("request_plane", ["tcp"], indirect=True)
282
@pytest.mark.timeout(150)  # ~3x average (~45s/test), rounded up
283
def test_router_decisions_trtllm_multiple_workers(
284
285
286
287
288
    request,
    runtime_services_dynamic_ports,
    predownload_models,
    set_ucx_tls_no_mm,
    request_plane,
289
):
290
291
292
293
294
    run_router_decisions_test(
        engine_process_cls=TRTLLMProcess,
        engine_args_name="trtllm_args",
        engine_args=TRTLLM_ARGS,
        request=request,
295
        request_plane=request_plane,
296
297
298
299
300
301
302
        model_name=MODEL_NAME,
        block_size=TRTLLM_BLOCK_SIZE,
        component_name="tensorrt_llm",
        num_workers=2,
        single_gpu=True,
        test_dp_rank=False,
    )
303
304


305
@pytest.mark.skip(reason="Nightly CI failure: https://linear.app/nvidia/issue/DYN-2609")
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
@pytest.mark.gpu_2
@pytest.mark.nightly
@pytest.mark.parametrize("request_plane", ["nats"], indirect=True)
@pytest.mark.timeout(600)
def test_router_decisions_trtllm_disagg(
    request,
    runtime_services_dynamic_ports,
    predownload_models,
    set_ucx_tls_no_mm,
    request_plane,
):
    run_disagg_router_decisions_test(
        engine_process_cls=TRTLLMProcess,
        engine_args_name="trtllm_args",
        engine_args=TRTLLM_ARGS,
        request=request,
        request_plane=request_plane,
        model_name=MODEL_NAME,
        block_size=TRTLLM_BLOCK_SIZE,
        num_prefill_workers=2,
        num_decode_workers=1,
        prefill_process_kwargs={
            "single_gpu": True,
            "gpu_start_index": 0,
            "disaggregation_mode": "prefill",
        },
        decode_process_kwargs={
            "single_gpu": True,
            "gpu_start_index": 1,
            "disaggregation_mode": "decode",
        },
    )


340
@pytest.mark.gpu_1
341
@pytest.mark.nightly
342
@pytest.mark.timeout(150)  # ~3x average (~45s/test), rounded up
343
@pytest.mark.parametrize(
344
    "store_backend,durable_kv_events,request_plane",
345
    [
346
        ("etcd", False, "tcp"),
347
    ],
348
349
    ids=["nats_core"],
    indirect=["durable_kv_events", "request_plane"],
350
)
351
def test_trtllm_indexers_sync(
352
353
354
355
356
357
    request,
    runtime_services_dynamic_ports,
    predownload_models,
    file_storage_backend,
    set_ucx_tls_no_mm,
    store_backend,
358
    durable_kv_events,
359
    request_plane,
360
):
361
362
363
364
365
366
    run_indexers_sync_test(
        engine_process_cls=TRTLLMProcess,
        engine_args_name="trtllm_args",
        engine_args=TRTLLM_ARGS,
        request=request,
        runtime_services_dynamic_ports=runtime_services_dynamic_ports,
367
368
        store_backend=store_backend,
        durable_kv_events=durable_kv_events,
369
370
371
372
373
        request_plane=request_plane,
        block_size=TRTLLM_BLOCK_SIZE,
        model_name=MODEL_NAME,
        num_workers=2,
    )