test_router_e2e_with_mockers.py 16.9 KB
Newer Older
1
2
3
4
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import logging
import os
5
from typing import Any, Dict, Optional
6
7
8

import pytest

9
10
11
12
13
14
15
16
17
18
19
from tests.router.common import (  # utilities
    _test_python_router_bindings,
    _test_router_basic,
    _test_router_decisions,
    _test_router_indexers_sync,
    _test_router_overload_503,
    _test_router_query_instance_id,
    _test_router_two_routers,
    generate_random_suffix,
    get_runtime,
)
Alec's avatar
Alec committed
20
from tests.utils.constants import ROUTER_MODEL_NAME
21
22
23
24
from tests.utils.managed_process import ManagedProcess

pytestmark = pytest.mark.pre_merge

25

26
logger = logging.getLogger(__name__)
27
28


Alec's avatar
Alec committed
29
MODEL_NAME = ROUTER_MODEL_NAME
30
31
NUM_MOCKERS = 2
SPEEDUP_RATIO = 10.0
32
BASE_PORT = 9100  # Base port for all tests (high port to avoid conflicts)
33
NUM_REQUESTS = 100
34
BLOCK_SIZE = 16
35
36


37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def get_unique_ports(
    request, num_ports: int = 1, store_backend: str = "etcd"
) -> list[int]:
    """Generate unique ports for parallel test execution.

    Ports are unique based on:
    - Test function name (each test gets a base offset)
    - Parametrization value (etcd=0, file=50)
    - Port index (for multi-port tests)

    Args:
        request: Pytest request fixture
        num_ports: Number of ports needed (1 for single router, 2 for two routers)
        store_backend: Storage backend parameter ("etcd" or "file")

    Returns:
        List of unique port numbers
    """
    # Get test name without parametrization suffix
    test_name = request.node.name.split("[")[0]

    # Base offsets per test function (ensures each test gets unique range)
    test_offsets = {
        "test_mocker_kv_router": 0,
        "test_mocker_two_kv_router": 100,
        "test_mocker_kv_router_overload_503": 200,
        "test_query_instance_id_returns_worker_and_tokens": 300,
    }

    base_offset = test_offsets.get(test_name, 0)

    # Parametrization offset (etcd=0, file=50)
    param_offset = 0 if store_backend == "etcd" else 50

    # Generate ports
    ports = [BASE_PORT + base_offset + param_offset + i for i in range(num_ports)]
    return ports


76
77
78
79
80
81
82
83
84
85
86
87
88
# Shared test payload for all tests
TEST_PAYLOAD: Dict[str, Any] = {
    "model": MODEL_NAME,
    "messages": [
        {
            "role": "user",
            "content": "In a quiet meadow tucked between rolling hills, a plump gray rabbit nibbled on clover beneath the shade of a gnarled oak tree. Its ears twitched at the faint rustle of leaves, but it remained calm, confident in the safety of its burrow just a few hops away. The late afternoon sun warmed its fur, and tiny dust motes danced in the golden light as bees hummed lazily nearby. Though the rabbit lived a simple life, every day was an adventure of scents, shadows, and snacks—an endless search for the tastiest patch of greens and the softest spot to nap.",
        }
    ],
    "stream": True,
    "max_tokens": 10,
}

89

90
91
92
class MockerProcess:
    """Manages multiple mocker engine instances with the same namespace"""

93
94
95
96
97
    def __init__(
        self,
        request,
        mocker_args: Optional[Dict[str, Any]] = None,
        num_mockers: int = 1,
98
        store_backend: str = "etcd",
99
    ):
100
101
102
        # Generate a unique namespace suffix shared by all mockers
        namespace_suffix = generate_random_suffix()
        self.namespace = f"test-namespace-{namespace_suffix}"
103
104
        self.component_name = "mocker"
        self.endpoint = f"dyn://{self.namespace}.{self.component_name}.generate"
105
        self.num_mockers = num_mockers
106
        self.num_workers = self.num_mockers  # for compatibility with common.py
107
108
        self.mocker_processes = []

109
110
111
112
        # Default mocker args if not provided
        if mocker_args is None:
            mocker_args = {}

113
114
115
116
117
118
119
120
121
122
        # Create multiple mocker processes with the same namespace
        for i in range(num_mockers):
            command = [
                "python",
                "-m",
                "dynamo.mocker",
                "--model-path",
                MODEL_NAME,
                "--endpoint",
                self.endpoint,
123
124
                "--store-kv",
                store_backend,
125
126
            ]

127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
            # Add individual CLI arguments from mocker_args
            if "speedup_ratio" in mocker_args:
                command.extend(["--speedup-ratio", str(mocker_args["speedup_ratio"])])
            if "block_size" in mocker_args:
                command.extend(["--block-size", str(mocker_args["block_size"])])
            if "num_gpu_blocks" in mocker_args:
                command.extend(
                    ["--num-gpu-blocks-override", str(mocker_args["num_gpu_blocks"])]
                )
            if "max_num_seqs" in mocker_args:
                command.extend(["--max-num-seqs", str(mocker_args["max_num_seqs"])])
            if "max_num_batched_tokens" in mocker_args:
                command.extend(
                    [
                        "--max-num-batched-tokens",
                        str(mocker_args["max_num_batched_tokens"]),
                    ]
                )
            if "enable_prefix_caching" in mocker_args:
                if mocker_args["enable_prefix_caching"]:
                    command.append("--enable-prefix-caching")
                else:
                    command.append("--no-enable-prefix-caching")
            if "enable_chunked_prefill" in mocker_args:
                if mocker_args["enable_chunked_prefill"]:
                    command.append("--enable-chunked-prefill")
                else:
                    command.append("--no-enable-chunked-prefill")
            if "watermark" in mocker_args:
                command.extend(["--watermark", str(mocker_args["watermark"])])
            if "dp_size" in mocker_args:
                command.extend(["--data-parallel-size", str(mocker_args["dp_size"])])

160
161
162
163
164
165
166
167
168
169
170
            process = ManagedProcess(
                command=command,
                timeout=60,
                display_output=True,
                health_check_ports=[],
                health_check_urls=[],
                log_dir=request.node.name,
                terminate_existing=False,
            )
            self.mocker_processes.append(process)
            logger.info(f"Created mocker instance {i} with endpoint: {self.endpoint}")
171

172
173
174
175
176
177
    def __enter__(self):
        """Start all mocker processes"""
        for i, process in enumerate(self.mocker_processes):
            logger.info(f"Starting mocker instance {i}")
            process.__enter__()
        return self
178

179
180
181
182
183
    def __exit__(self, exc_type, exc_val, exc_tb):
        """Stop all mocker processes"""
        for i, process in enumerate(self.mocker_processes):
            logger.info(f"Stopping mocker instance {i}")
            process.__exit__(exc_type, exc_val, exc_tb)
184
185
186


@pytest.mark.pre_merge
187
@pytest.mark.parallel
Alec's avatar
Alec committed
188
@pytest.mark.model(MODEL_NAME)
189
def test_mocker_kv_router(request, runtime_services_session, predownload_tokenizers):
190
191
192
193
194
195
196
197
    """
    Test KV router with multiple mocker engine instances.
    This test doesn't require GPUs and runs quickly for pre-merge validation.
    """

    # runtime_services starts etcd and nats
    logger.info("Starting mocker KV router test")

198
    # Create mocker args dictiona: FixtureRequestry: tuple[NatsServer, EtcdServer]: NoneType
199
    mocker_args = {"speedup_ratio": SPEEDUP_RATIO, "block_size": BLOCK_SIZE}
200
201

    try:
202
        # Start mocker instances with the new CLI interface
203
        logger.info(f"Starting {NUM_MOCKERS} mocker instances")
204
205
206
        mockers = MockerProcess(
            request, mocker_args=mocker_args, num_mockers=NUM_MOCKERS
        )
207
208
        logger.info(f"All mockers using endpoint: {mockers.endpoint}")
        mockers.__enter__()
209

210
211
212
213
        # Get unique port for this test
        frontend_port = get_unique_ports(request, num_ports=1)[0]

        # Run basic router test (starts router internally and waits for workers to be ready)
214
215
216
217
        _test_router_basic(
            engine_workers=mockers,
            block_size=BLOCK_SIZE,
            request=request,
218
            frontend_port=frontend_port,
219
220
            test_payload=TEST_PAYLOAD,
            num_requests=NUM_REQUESTS,
221
222
223
        )

    finally:
224
225
        if "mockers" in locals():
            mockers.__exit__(None, None, None)
226
227


228
@pytest.mark.pre_merge
229
@pytest.mark.parallel
Alec's avatar
Alec committed
230
@pytest.mark.model(MODEL_NAME)
231
232
233
@pytest.mark.parametrize("store_backend", ["etcd", "file"])
def test_mocker_two_kv_router(
    request,
234
    runtime_services_session,
235
236
237
238
    predownload_tokenizers,
    file_storage_backend,
    store_backend,
):
239
240
241
    """
    Test with two KV routers and multiple mocker engine instances.
    Alternates requests between the two routers to test load distribution.
242
    Tests with both etcd and file storage backends.
243
244
245
    """

    # runtime_services starts etcd and nats
246
247
248
    logger.info(
        f"Starting mocker two KV router test with {store_backend} storage backend"
    )
249

250
    # Create mocker args dictionary
251
252
253
    mocker_args = {"speedup_ratio": SPEEDUP_RATIO, "block_size": BLOCK_SIZE}

    try:
254
        # Start mocker instances with the new CLI interface
255
        logger.info(f"Starting {NUM_MOCKERS} mocker instances")
256
        mockers = MockerProcess(
257
258
259
260
            request,
            mocker_args=mocker_args,
            num_mockers=NUM_MOCKERS,
            store_backend=store_backend,
261
        )
262
263
        logger.info(f"All mockers using endpoint: {mockers.endpoint}")
        mockers.__enter__()
264

265
266
267
268
269
        # Get unique ports for this test (2 ports for two routers)
        router_ports = get_unique_ports(
            request, num_ports=2, store_backend=store_backend
        )

270
271
272
273
274
        # Run two-router test (starts KV routers internally and manages their lifecycle)
        _test_router_two_routers(
            engine_workers=mockers,
            block_size=BLOCK_SIZE,
            request=request,
275
            router_ports=router_ports,
276
277
278
            test_payload=TEST_PAYLOAD,
            num_requests=NUM_REQUESTS,
            store_backend=store_backend,
279
280
281
        )

    finally:
282
283
        if "mockers" in locals():
            mockers.__exit__(None, None, None)
284
285


286
@pytest.mark.pre_merge
287
@pytest.mark.parallel
Alec's avatar
Alec committed
288
@pytest.mark.model(MODEL_NAME)
289
@pytest.mark.skip(reason="Flaky, temporarily disabled")
Alec's avatar
Alec committed
290
def test_mocker_kv_router_overload_503(
291
    request, runtime_services_session, predownload_tokenizers
Alec's avatar
Alec committed
292
):
293
    """Test that KV router returns 503 when mocker workers are overloaded."""
294
    logger.info("Starting mocker KV router overload test for 503 status")
295
    # Create mocker args dictionary with limited resources
296
297
298
299
300
    mocker_args = {
        "speedup_ratio": 10,
        "block_size": 4,  # Smaller block size
        "num_gpu_blocks": 64,  # Limited GPU blocks to exhaust quickly
    }
301

302
    try:
303
        # Start single mocker instance with limited resources
304
        logger.info("Starting single mocker instance with limited resources")
305
        mockers = MockerProcess(request, mocker_args=mocker_args, num_mockers=1)
306
307
        logger.info(f"Mocker using endpoint: {mockers.endpoint}")
        mockers.__enter__()
308

309
310
311
        # Get unique port for this test
        frontend_port = get_unique_ports(request, num_ports=1)[0]

312
313
314
315
316
317
318
319
320
        # Run overload 503 test
        _test_router_overload_503(
            engine_workers=mockers,
            block_size=4,  # Match the mocker's block size
            request=request,
            frontend_port=frontend_port,
            test_payload=TEST_PAYLOAD,
            busy_threshold=0.2,
        )
321
322

    finally:
323
324
        if "mockers" in locals():
            mockers.__exit__(None, None, None)
325

326
327

@pytest.mark.pre_merge
328
@pytest.mark.parallel
Alec's avatar
Alec committed
329
@pytest.mark.model(MODEL_NAME)
330
331
332
def test_kv_push_router_bindings(
    request, runtime_services_session, predownload_tokenizers
):
333
    """Test KvPushRouter Python bindings with mocker engines."""
334
335
336
337
    logger.info("Starting KvPushRouter bindings test")
    mocker_args = {"speedup_ratio": SPEEDUP_RATIO, "block_size": BLOCK_SIZE}

    try:
338
        # Start mocker instances
339
        logger.info(f"Starting {NUM_MOCKERS} mocker instances")
340
341
342
        mockers = MockerProcess(
            request, mocker_args=mocker_args, num_mockers=NUM_MOCKERS
        )
343
344
        logger.info(f"All mockers using endpoint: {mockers.endpoint}")
        mockers.__enter__()
345

346
347
348
        # Get runtime and create endpoint
        runtime = get_runtime()
        namespace = runtime.namespace(mockers.namespace)
349
        component = namespace.component(mockers.component_name)
350
351
        endpoint = component.endpoint("generate")

352
353
354
        # Run Python router bindings test
        _test_python_router_bindings(
            engine_workers=mockers,
355
356
            endpoint=endpoint,
            block_size=BLOCK_SIZE,
357
358
            model_name=MODEL_NAME,
            num_workers=NUM_MOCKERS,
359
        )
360
361

    finally:
362
363
364
365
366
        if "mockers" in locals():
            mockers.__exit__(None, None, None)


@pytest.mark.pre_merge
367
@pytest.mark.parallel
Alec's avatar
Alec committed
368
@pytest.mark.model(MODEL_NAME)
369
370
371
@pytest.mark.parametrize("store_backend", ["etcd", "file"])
def test_indexers_sync(
    request,
372
    runtime_services_session,
373
374
375
376
    predownload_tokenizers,
    file_storage_backend,
    store_backend,
):
377
378
379
    """
    Test that two KV routers have synchronized indexer states after processing requests.
    This test verifies that both routers converge to the same internal state.
380
    Tests with both etcd and file storage backends.
381
382
383
    """

    # runtime_services starts etcd and nats
384
    logger.info(f"Starting indexers sync test with {store_backend} storage backend")
385

386
    # Create mocker args dictionary
387
388
389
    mocker_args = {"speedup_ratio": SPEEDUP_RATIO, "block_size": BLOCK_SIZE}

    try:
390
        # Start mocker instances
391
        logger.info(f"Starting {NUM_MOCKERS} mocker instances")
392
        mockers = MockerProcess(
393
394
395
396
            request,
            mocker_args=mocker_args,
            num_mockers=NUM_MOCKERS,
            store_backend=store_backend,
397
        )
398
399
400
        logger.info(f"All mockers using endpoint: {mockers.endpoint}")
        mockers.__enter__()

401
402
403
404
405
406
407
408
409
        # Use the common test implementation (creates its own runtimes for each router)
        # Note: Consumer verification is done inside _test_router_indexers_sync while routers are alive
        _test_router_indexers_sync(
            engine_workers=mockers,
            block_size=BLOCK_SIZE,
            model_name=MODEL_NAME,
            num_workers=NUM_MOCKERS,
            store_backend=store_backend,
        )
410
411
412
413
414
415

        logger.info("Indexers sync test completed successfully")

    finally:
        if "mockers" in locals():
            mockers.__exit__(None, None, None)
416

417
418

@pytest.mark.pre_merge
419
@pytest.mark.parallel
Alec's avatar
Alec committed
420
421
@pytest.mark.model(MODEL_NAME)
def test_query_instance_id_returns_worker_and_tokens(
422
    request, runtime_services_session, predownload_tokenizers
Alec's avatar
Alec committed
423
):
424
    """Test query_instance_id annotation with mocker engines."""
425
426
427
428
429
    logger.info("Starting KV router query_instance_id annotation test")
    mocker_args = {"speedup_ratio": SPEEDUP_RATIO, "block_size": BLOCK_SIZE}
    os.makedirs(request.node.name, exist_ok=True)

    try:
430
        # Start mocker instances
431
        logger.info(f"Starting {NUM_MOCKERS} mocker instances")
432
433
434
        mockers = MockerProcess(
            request, mocker_args=mocker_args, num_mockers=NUM_MOCKERS
        )
435
436
        logger.info(f"All mockers using endpoint: {mockers.endpoint}")
        mockers.__enter__()
437

438
439
440
        # Get unique port for this test
        frontend_port = get_unique_ports(request, num_ports=1)[0]

441
442
443
444
445
446
447
448
        # Run query_instance_id annotation test
        _test_router_query_instance_id(
            engine_workers=mockers,
            block_size=BLOCK_SIZE,
            request=request,
            frontend_port=frontend_port,
            test_payload=TEST_PAYLOAD,
        )
449
450

    finally:
451
452
        if "mockers" in locals():
            mockers.__exit__(None, None, None)
453
454
455


@pytest.mark.pre_merge
456
@pytest.mark.parallel
457
@pytest.mark.model(MODEL_NAME)
458
def test_router_decisions(request, runtime_services_session, predownload_tokenizers):
459
    """Validate KV cache prefix reuse and dp_rank routing by sending progressive requests with overlapping prefixes."""
460
461
462
463

    # runtime_services starts etcd and nats
    logger.info("Starting test router prefix reuse and KV events synchronization")

Yan Ru Pei's avatar
Yan Ru Pei committed
464
465
466
467
468
469
    # Create mocker args dictionary with dp_size=4
    mocker_args = {
        "speedup_ratio": SPEEDUP_RATIO,
        "block_size": BLOCK_SIZE,
        "dp_size": 4,
    }
470
471

    try:
Yan Ru Pei's avatar
Yan Ru Pei committed
472
473
        logger.info(
            "Starting 2 mocker instances with dp_size=4 each (8 total dp ranks)"
474
        )
Yan Ru Pei's avatar
Yan Ru Pei committed
475
        mockers = MockerProcess(request, mocker_args=mocker_args, num_mockers=2)
476
        logger.info(f"All mockers using endpoint: {mockers.endpoint}")
477

478
479
480
481
482
483
484
485
486
487
        # Initialize mockers
        mockers.__enter__()

        # Get runtime and create endpoint
        runtime = get_runtime()
        # Use the namespace from the mockers
        namespace = runtime.namespace(mockers.namespace)
        component = namespace.component("mocker")
        endpoint = component.endpoint("generate")

488
489
        _test_router_decisions(
            mockers, endpoint, MODEL_NAME, request, test_dp_rank=True
490
491
492
493
494
        )

    finally:
        if "mockers" in locals():
            mockers.__exit__(None, None, None)