test_router_e2e_with_trtllm.py 12.4 KB
Newer Older
1
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2
# SPDX-License-Identifier: Apache-2.0
3
4
5
6
7

# Timing notes (measured in a TRT-LLM-enabled container):
# - GPU-1 subset (`-m "gpu_1"`): 136.36s total for 3 tests.
# These tests load a real model and can be slow/flaky when GPU resources are contended,
# so we set explicit pytest timeouts to fail fast on hangs (see per-test markers below).
8
9
10
11
12
13
import logging
import os
from typing import Any, Dict, Optional

import pytest

14
15
16
17
18
from tests.router.e2e_harness import (
    ManagedEngineProcessMixin,
    run_basic_router_test,
    run_indexers_sync_test,
    run_router_decisions_test,
19
)
20
from tests.router.helper import generate_random_suffix
21
from tests.utils.constants import DefaultPort
22
from tests.utils.managed_process import ManagedProcess
23
from tests.utils.port_utils import allocate_ports, deallocate_ports
24
25
26
27
28
29
30
31

logger = logging.getLogger(__name__)

MODEL_NAME = "TinyLlama/TinyLlama-1.1B-Chat-v1.0"
TRTLLM_BLOCK_SIZE = 32  # fixed internally to 32

pytestmark = [
    pytest.mark.e2e,
32
    pytest.mark.router,
33
34
35
36
37
38
39
40
41
42
43
44
45
46
    pytest.mark.trtllm,
    pytest.mark.model(MODEL_NAME),
]

# Shared TRT-LLM configuration for all tests
# free_gpu_memory_fraction limits actual VRAM allocation (required for multi-worker on same GPU)
TRTLLM_ARGS: Dict[str, Any] = {
    "kv_block_size": TRTLLM_BLOCK_SIZE,
    "model": MODEL_NAME,
    "free_gpu_memory_fraction": 0.4,  # Limit VRAM allocation per worker
    "max_seq_len": 1024,  # Limit context length to reduce KV cache size
}


47
class TRTLLMProcess(ManagedEngineProcessMixin):
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
    """Manages TRT-LLM workers using dynamo.trtllm (HTTP API + KV events).

    This is a drop-in replacement for MockerProcess that uses real TRT-LLM workers.
    The key difference: dynamo.trtllm automatically handles:
    - HTTP API serving
    - KV cache event publishing
    - Integration with dynamo.frontend router
    """

    def __init__(
        self,
        request,
        trtllm_args: Optional[Dict[str, Any]] = None,
        num_workers: int = 2,
        single_gpu: bool = False,
63
64
        request_plane: str = "tcp",
        store_backend: str = "etcd",
65
        durable_kv_events: bool = False,
66
67
68
69
70
71
72
73
74
75
    ):
        """Initialize TRT-LLM workers with dynamo integration.

        Args:
            request: pytest request fixture for log directory
            trtllm_args: Configuration dict with keys:
                - kv_block_size: KV cache block size (default: 32)
                - model: Model name/path (default: TinyLlama-1.1B)
                - free_gpu_memory_fraction: Fraction of GPU memory to allocate (optional)
                - max_seq_len: Maximum sequence length (optional)
76
77
78
79
80
                - tensor_parallel_size: Number of GPUs for tensor parallelism (optional).
                  When attention DP is enabled, this sets the world size, which then is the attention_dp_size.
                - enable_attention_dp: If True, enable TRT-LLM attention data parallelism.
                  When enabled, attention_dp_size equals tensor_parallel_size, creating
                  multiple routing targets within a single TRT-LLM worker process.
81
82
            num_workers: Number of TRT-LLM worker processes
            single_gpu: If True, all workers share GPU 0
83
84
            request_plane: Request plane to use ("nats", "tcp", or "http"). Defaults to "tcp".
            store_backend: Storage backend to use ("etcd" or "file"). Defaults to "etcd".
85
            durable_kv_events: If True, use JetStream for durable KV events. Defaults to False (NATS Core mode).
86

87
88
89
90
        Note: TRT-LLM supports two forms of parallelism for routing:
              1. Multiple workers (num_workers > 1): Each worker is a separate routing target
              2. Attention DP (enable_attention_dp=True in trtllm_args): Single worker with
                 multiple internal attention DP ranks, each being a separate routing target
91
92
93
94
95
96
97
98
        """
        # Generate unique namespace for isolation
        namespace_suffix = generate_random_suffix()
        self.namespace = f"test-namespace-{namespace_suffix}"
        self.component_name = "tensorrt_llm"
        self.endpoint = f"dyn://{self.namespace}.{self.component_name}.generate"
        self.num_workers = num_workers
        self.worker_processes = []
99
        self.store_backend = store_backend
100

101
102
103
104
105
        # Dynamically allocate unique system ports (one per worker) to avoid
        # conflicts when tests run in parallel via pytest-xdist.
        self._system_ports = allocate_ports(num_workers, DefaultPort.SYSTEM1.value)
        request.addfinalizer(lambda: deallocate_ports(self._system_ports))

106
107
108
109
110
111
        if trtllm_args is None:
            trtllm_args = {}

        model = trtllm_args.get("model", MODEL_NAME)
        free_gpu_memory_fraction = trtllm_args.get("free_gpu_memory_fraction")
        max_seq_len = trtllm_args.get("max_seq_len")
112
113
        enable_attention_dp = trtllm_args.get("enable_attention_dp", False)
        tensor_parallel_size = trtllm_args.get("tensor_parallel_size")
114
115
116
117
118
119
120
121

        self.model_name = model

        for worker_idx in range(num_workers):
            # Calculate GPU device for this process
            if single_gpu:
                # Force all processes to GPU 0 (for single-GPU testing)
                gpu_device = "0"
122
123
124
125
            elif enable_attention_dp and tensor_parallel_size:
                # For attention DP, TRT-LLM spawns tensor_parallel_size internal MPI workers.
                # So one process = two attention DP ranks = visibility in to both GPUs.
                gpu_device = ",".join(str(i) for i in range(tensor_parallel_size))
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
            else:
                # Each worker sees one GPU
                gpu_device = str(worker_idx)

            # Single-node TRT-LLM workers use python3 -m dynamo.trtllm directly
            # (trtllm-llmapi-launch is only needed for multi-node MPI deployments)
            command = [
                "python3",
                "-m",
                "dynamo.trtllm",
                "--model-path",
                model,
                "--kv-block-size",
                str(TRTLLM_BLOCK_SIZE),
                # Enable KV events publishing for router integration
                "--publish-events-and-metrics",
            ]

            # Limit VRAM allocation (required for multi-worker on same GPU)
            if free_gpu_memory_fraction is not None:
                command.extend(
                    ["--free-gpu-memory-fraction", str(free_gpu_memory_fraction)]
                )

            # Add optional max_seq_len if specified
            if max_seq_len is not None:
                command.extend(["--max-seq-len", str(max_seq_len)])

154
155
156
157
            # Use --durable-kv-events to enable JetStream mode (local indexer disabled)
            if durable_kv_events:
                command.append("--durable-kv-events")

158
159
160
161
162
163
164
165
            # Set tensor parallel size if specified (needed for attention DP)
            if tensor_parallel_size is not None:
                command.extend(["--tensor-parallel-size", str(tensor_parallel_size)])

            # Enable attention data parallelism if requested
            if enable_attention_dp:
                command.append("--enable-attention-dp")

166
            # Each TRT-LLM worker needs a unique DYN_SYSTEM_PORT to avoid conflicts.
167
168
            # Ports are dynamically allocated for xdist-safe parallel execution.
            system_port = self._system_ports[worker_idx]
169
170

            env = os.environ.copy()  # Copy parent environment
171
172
173
174
175
176
177
178
179
180
181
182
183
            env_vars = {
                "CUDA_VISIBLE_DEVICES": gpu_device,
                "DYN_NAMESPACE": self.namespace,
                "DYN_REQUEST_PLANE": request_plane,
                "PYTHONHASHSEED": "0",  # for deterministic event id's
                "DYN_SYSTEM_PORT": str(system_port),
            }

            # Add DYN_FILE_KV if using file storage backend
            if self.store_backend == "file" and "DYN_FILE_KV" in os.environ:
                env_vars["DYN_FILE_KV"] = os.environ["DYN_FILE_KV"]

            env.update(env_vars)
184
185
186
187
188
189
190
191
192
193

            # Create managed process for the worker
            process = ManagedProcess(
                command=command,
                env=env,
                timeout=180,  # Allow time for model loading (TRT-LLM may take longer)
                display_output=True,
                health_check_ports=[],
                health_check_urls=[],
                log_dir=request.node.name,
194
                terminate_all_matching_process_names=False,
195
196
197
198
199
200
201
202
            )
            self.worker_processes.append(process)
            logger.info(
                f"Created TRT-LLM worker {worker_idx} on GPU {gpu_device} "
                f"(gpu_mem_frac={free_gpu_memory_fraction}, system_port={system_port}) "
                f"with endpoint: {self.endpoint}"
            )

203
204
    process_name = "TRT-LLM worker"
    cleanup_name = "TRT-LLM worker resources"
205
206
207


@pytest.mark.gpu_1
208
@pytest.mark.nightly
209
@pytest.mark.parametrize("request_plane", ["tcp"], indirect=True)
210
@pytest.mark.timeout(300)
211
def test_trtllm_kv_router_basic(
212
213
214
215
216
    request,
    runtime_services_dynamic_ports,
    predownload_models,
    set_ucx_tls_no_mm,
    request_plane,
217
):
218
219
220
221
222
223
224
    run_basic_router_test(
        engine_process_cls=TRTLLMProcess,
        engine_args_name="trtllm_args",
        engine_args=TRTLLM_ARGS,
        num_workers=2,
        single_gpu=True,
        request=request,
225
        request_plane=request_plane,
226
227
228
        block_size=TRTLLM_BLOCK_SIZE,
        model_name=MODEL_NAME,
    )
229
230


231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
@pytest.mark.gpu_2
@pytest.mark.nightly
@pytest.mark.parametrize("request_plane", ["tcp"], indirect=True)
@pytest.mark.timeout(600)  # 10 min max (multi-GPU + DP startup variance)
def test_router_decisions_trtllm_attention_dp(
    request,
    runtime_services_dynamic_ports,
    predownload_models,
    set_ucx_tls_no_mm,
    request_plane,
):
    """Validate KV cache prefix reuse with TRTLLM by sending progressive requests with overlapping prefixes.
    Same flow as test_router_decisions_trtllm_multiple_workers; force first request to (worker_id, dp_rank=1).
    Dump events from router and verify:
        * All but one (worker_id, dp_rank) should have no events (due to prefix reuse)
        * The (worker_id, dp_rank) with events should have exactly 4 events (one per request)
        * All events should be on the forced (worker_id, dp_rank=1) (verifying forced routing and prefix reuse)
    """
249
250
251
252
253
254
255
256
257
    run_router_decisions_test(
        engine_process_cls=TRTLLMProcess,
        engine_args_name="trtllm_args",
        engine_args={
            **TRTLLM_ARGS,
            "enable_attention_dp": True,
            "tensor_parallel_size": 2,
        },
        request=request,
258
        request_plane=request_plane,
259
260
261
262
263
264
265
        model_name=MODEL_NAME,
        block_size=TRTLLM_BLOCK_SIZE,
        component_name="tensorrt_llm",
        num_workers=1,
        single_gpu=False,
        test_dp_rank=True,
    )
266
267


268
@pytest.mark.gpu_1
269
@pytest.mark.nightly
270
@pytest.mark.parametrize("request_plane", ["tcp"], indirect=True)
271
@pytest.mark.timeout(150)  # ~3x average (~45s/test), rounded up
272
def test_router_decisions_trtllm_multiple_workers(
273
274
275
276
277
    request,
    runtime_services_dynamic_ports,
    predownload_models,
    set_ucx_tls_no_mm,
    request_plane,
278
):
279
280
281
282
283
    run_router_decisions_test(
        engine_process_cls=TRTLLMProcess,
        engine_args_name="trtllm_args",
        engine_args=TRTLLM_ARGS,
        request=request,
284
        request_plane=request_plane,
285
286
287
288
289
290
291
        model_name=MODEL_NAME,
        block_size=TRTLLM_BLOCK_SIZE,
        component_name="tensorrt_llm",
        num_workers=2,
        single_gpu=True,
        test_dp_rank=False,
    )
292
293
294


@pytest.mark.gpu_1
295
@pytest.mark.nightly
296
@pytest.mark.timeout(150)  # ~3x average (~45s/test), rounded up
297
@pytest.mark.parametrize(
298
    "store_backend,durable_kv_events,request_plane",
299
    [
300
        ("etcd", False, "tcp"),
301
    ],
302
303
    ids=["nats_core"],
    indirect=["durable_kv_events", "request_plane"],
304
)
305
def test_trtllm_indexers_sync(
306
307
308
309
310
311
    request,
    runtime_services_dynamic_ports,
    predownload_models,
    file_storage_backend,
    set_ucx_tls_no_mm,
    store_backend,
312
    durable_kv_events,
313
    request_plane,
314
):
315
316
317
318
319
320
    run_indexers_sync_test(
        engine_process_cls=TRTLLMProcess,
        engine_args_name="trtllm_args",
        engine_args=TRTLLM_ARGS,
        request=request,
        runtime_services_dynamic_ports=runtime_services_dynamic_ports,
321
322
        store_backend=store_backend,
        durable_kv_events=durable_kv_events,
323
324
325
326
327
        request_plane=request_plane,
        block_size=TRTLLM_BLOCK_SIZE,
        model_name=MODEL_NAME,
        num_workers=2,
    )