test_router_e2e_with_mockers.py 26.5 KB
Newer Older
1
2
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
3
4
5
6
7
8

# Parallelization: Hermetic tests (xdist-safe via dynamic ports + per-test namespaces).
# Tested on: Linux container.
# Combined pre_merge wall time (this file):
# - Serialized: 304.01s.
# - Parallel (-n auto): 34.55s (269.46s saved, 8.80x).
9
10
import logging
import os
11
from typing import Any, Dict, Optional
12
13
14

import pytest

15
from tests.router.common import (  # utilities
16
    _test_busy_threshold_endpoint,
17
18
19
    _test_python_router_bindings,
    _test_router_basic,
    _test_router_decisions,
20
    _test_router_decisions_disagg,
21
22
23
24
25
26
27
    _test_router_indexers_sync,
    _test_router_overload_503,
    _test_router_query_instance_id,
    _test_router_two_routers,
    generate_random_suffix,
    get_runtime,
)
Alec's avatar
Alec committed
28
from tests.utils.constants import ROUTER_MODEL_NAME
29
from tests.utils.managed_process import ManagedProcess
30
from tests.utils.port_utils import allocate_ports, deallocate_ports
31

32
33
34
35
logger = logging.getLogger(__name__)

MODEL_NAME = ROUTER_MODEL_NAME

36
37
38
39
pytestmark = [
    pytest.mark.pre_merge,
    pytest.mark.gpu_0,
    pytest.mark.integration,
40
    pytest.mark.parallel,
41
    pytest.mark.model(MODEL_NAME),
42
]
43
44
NUM_MOCKERS = 2
SPEEDUP_RATIO = 10.0
45
BASE_PORT = 9100  # Base port for all tests (high port to avoid conflicts)
46
NUM_REQUESTS = 100
47
BLOCK_SIZE = 16
48
49


50
def get_unique_ports(
51
52
53
54
    request,
    num_ports: int = 1,
    store_backend: str = "etcd",
    request_plane: str = "nats",
55
    registration_order: str = "prefill_first",
56
) -> list[int]:
57
    """Allocate random free ports for xdist-safe router tests.
58

59
60
61
    This replaces the previous "test-name offset" scheme with the shared flock-backed
    allocator from `tests.utils.port_utils`, which avoids collisions across pytest-xdist
    worker processes.
62

63
64
65
66
    Notes:
    - The extra parameters are kept for call-site compatibility (they no longer affect
      the chosen ports).
    - Ports are released at the end of the test via a pytest finalizer.
67
    """
68
69
70
    _ = (store_backend, request_plane, registration_order)
    ports = allocate_ports(num_ports, BASE_PORT)
    request.addfinalizer(lambda: deallocate_ports(ports))
71
72
73
    return ports


74
75
76
77
78
79
80
81
82
83
84
85
86
# Shared test payload for all tests
TEST_PAYLOAD: Dict[str, Any] = {
    "model": MODEL_NAME,
    "messages": [
        {
            "role": "user",
            "content": "In a quiet meadow tucked between rolling hills, a plump gray rabbit nibbled on clover beneath the shade of a gnarled oak tree. Its ears twitched at the faint rustle of leaves, but it remained calm, confident in the safety of its burrow just a few hops away. The late afternoon sun warmed its fur, and tiny dust motes danced in the golden light as bees hummed lazily nearby. Though the rabbit lived a simple life, every day was an adventure of scents, shadows, and snacks—an endless search for the tastiest patch of greens and the softest spot to nap.",
        }
    ],
    "stream": True,
    "max_tokens": 10,
}

87

88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
def _build_mocker_command(
    endpoint: str,
    store_backend: str,
    num_workers: int,
    mocker_args: Dict[str, Any],
    worker_type: Optional[str] = None,
) -> list[str]:
    """Build the mocker CLI command with all arguments.

    Args:
        endpoint: The dynamo endpoint string
        store_backend: Storage backend ("etcd" or "file")
        num_workers: Number of workers to spawn (uses --num-workers flag)
        mocker_args: Dictionary of mocker arguments
        worker_type: Optional worker type ("prefill" or "decode") for disagg mode

    Returns:
        List of command arguments for subprocess
    """
    command = [
        "python",
        "-m",
        "dynamo.mocker",
        "--model-path",
        MODEL_NAME,
        "--endpoint",
        endpoint,
        "--store-kv",
        store_backend,
        "--num-workers",
        str(num_workers),
    ]

    # Add worker type flag for disaggregated mode
    if worker_type == "prefill":
        command.append("--is-prefill-worker")
    elif worker_type == "decode":
        command.append("--is-decode-worker")

    # Add individual CLI arguments from mocker_args
    if "speedup_ratio" in mocker_args:
        command.extend(["--speedup-ratio", str(mocker_args["speedup_ratio"])])
    if "block_size" in mocker_args:
        command.extend(["--block-size", str(mocker_args["block_size"])])
    if "num_gpu_blocks" in mocker_args:
        command.extend(
            ["--num-gpu-blocks-override", str(mocker_args["num_gpu_blocks"])]
        )
    if "max_num_seqs" in mocker_args:
        command.extend(["--max-num-seqs", str(mocker_args["max_num_seqs"])])
    if "max_num_batched_tokens" in mocker_args:
        command.extend(
            ["--max-num-batched-tokens", str(mocker_args["max_num_batched_tokens"])]
        )
    if "enable_prefix_caching" in mocker_args:
        if mocker_args["enable_prefix_caching"]:
            command.append("--enable-prefix-caching")
        else:
            command.append("--no-enable-prefix-caching")
    if "enable_chunked_prefill" in mocker_args:
        if mocker_args["enable_chunked_prefill"]:
            command.append("--enable-chunked-prefill")
        else:
            command.append("--no-enable-chunked-prefill")
    if "watermark" in mocker_args:
        command.extend(["--watermark", str(mocker_args["watermark"])])
    if "dp_size" in mocker_args:
        command.extend(["--data-parallel-size", str(mocker_args["dp_size"])])
156
157
    if mocker_args.get("enable_local_indexer"):
        command.append("--enable-local-indexer")
158
159
160
161

    return command


162
class MockerProcess:
163
    """Manages mocker engine instances with shared tokio runtime via --num-workers."""
164

165
166
167
168
169
    def __init__(
        self,
        request,
        mocker_args: Optional[Dict[str, Any]] = None,
        num_mockers: int = 1,
170
        store_backend: str = "etcd",
171
        request_plane: str = "nats",
172
    ):
173
174
        namespace_suffix = generate_random_suffix()
        self.namespace = f"test-namespace-{namespace_suffix}"
175
176
        self.component_name = "mocker"
        self.endpoint = f"dyn://{self.namespace}.{self.component_name}.generate"
177
178
179
180
181
182
183
184
185
186
187
        self.num_workers = num_mockers

        mocker_args = mocker_args or {}

        command = _build_mocker_command(
            endpoint=self.endpoint,
            store_backend=store_backend,
            num_workers=num_mockers,
            mocker_args=mocker_args,
        )

188
189
190
        env = os.environ.copy()
        env["DYN_REQUEST_PLANE"] = request_plane

191
192
        self._process = ManagedProcess(
            command=command,
193
            env=env,
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
            timeout=60,
            display_output=True,
            health_check_ports=[],
            health_check_urls=[],
            log_dir=request.node.name,
            terminate_existing=False,
        )
        logger.info(
            f"Created mocker process with {num_mockers} worker(s), endpoint: {self.endpoint}"
        )

    def __enter__(self):
        logger.info(f"Starting mocker process with {self.num_workers} worker(s)")
        self._process.__enter__()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        logger.info("Stopping mocker process")
        self._process.__exit__(exc_type, exc_val, exc_tb)


class DisaggMockerProcess:
    """Manages prefill or decode mocker instances for disaggregated serving.

    Uses --num-workers for shared tokio runtime. For disaggregated serving:
    - Prefill workers: worker_type="prefill", endpoint is namespace.prefill.generate
    - Decode workers: worker_type="decode", endpoint is namespace.backend.generate

    Both prefill and decode workers should share the same namespace for proper discovery.
    """

    def __init__(
        self,
        request,
        namespace: str,
        worker_type: str,
        mocker_args: Optional[Dict[str, Any]] = None,
        num_mockers: int = 1,
        store_backend: str = "etcd",
    ):
        if worker_type not in ("prefill", "decode"):
            raise ValueError(
                f"worker_type must be 'prefill' or 'decode', got {worker_type}"
237
            )
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273

        self.namespace = namespace
        self.worker_type = worker_type
        self.num_workers = num_mockers

        # Set component name and endpoint based on worker type
        if worker_type == "prefill":
            self.component_name = "prefill"
            self.endpoint = f"dyn://{self.namespace}.prefill.generate"
        else:
            self.component_name = "backend"
            self.endpoint = f"dyn://{self.namespace}.backend.generate"

        mocker_args = mocker_args or {}

        command = _build_mocker_command(
            endpoint=self.endpoint,
            store_backend=store_backend,
            num_workers=num_mockers,
            mocker_args=mocker_args,
            worker_type=worker_type,
        )

        self._process = ManagedProcess(
            command=command,
            timeout=60,
            display_output=True,
            health_check_ports=[],
            health_check_urls=[],
            log_dir=request.node.name,
            terminate_existing=False,
        )
        logger.info(
            f"Created {worker_type} mocker process with {num_mockers} worker(s), "
            f"endpoint: {self.endpoint}"
        )
274

275
    def __enter__(self):
276
277
278
279
        logger.info(
            f"Starting {self.worker_type} mocker process with {self.num_workers} worker(s)"
        )
        self._process.__enter__()
280
        return self
281

282
    def __exit__(self, exc_type, exc_val, exc_tb):
283
284
        logger.info(f"Stopping {self.worker_type} mocker process")
        self._process.__exit__(exc_type, exc_val, exc_tb)
285
286


287
288
289
290
@pytest.mark.timeout(42)  # ~3x average (~13.80s), rounded up
def test_mocker_kv_router(
    request, runtime_services_dynamic_ports, predownload_tokenizers
):
291
292
293
294
295
296
297
298
    """
    Test KV router with multiple mocker engine instances.
    This test doesn't require GPUs and runs quickly for pre-merge validation.
    """

    # runtime_services starts etcd and nats
    logger.info("Starting mocker KV router test")

299
    # Create mocker args dictionary
300
    mocker_args = {"speedup_ratio": SPEEDUP_RATIO, "block_size": BLOCK_SIZE}
301
302

    try:
303
        # Start mocker instances with the new CLI interface
304
        logger.info(f"Starting {NUM_MOCKERS} mocker instances")
305
306
307
        mockers = MockerProcess(
            request, mocker_args=mocker_args, num_mockers=NUM_MOCKERS
        )
308
309
        logger.info(f"All mockers using endpoint: {mockers.endpoint}")
        mockers.__enter__()
310

311
312
313
314
        # Get unique port for this test
        frontend_port = get_unique_ports(request, num_ports=1)[0]

        # Run basic router test (starts router internally and waits for workers to be ready)
315
316
317
318
        _test_router_basic(
            engine_workers=mockers,
            block_size=BLOCK_SIZE,
            request=request,
319
            frontend_port=frontend_port,
320
321
            test_payload=TEST_PAYLOAD,
            num_requests=NUM_REQUESTS,
322
323
324
        )

    finally:
325
326
        if "mockers" in locals():
            mockers.__exit__(None, None, None)
327
328


329
@pytest.mark.parametrize("store_backend", ["etcd", "file"])
330
@pytest.mark.timeout(60)  # ~3x average (~19.86s), rounded up
331
332
def test_mocker_two_kv_router(
    request,
333
    runtime_services_dynamic_ports,
334
335
336
337
    predownload_tokenizers,
    file_storage_backend,
    store_backend,
):
338
339
340
    """
    Test with two KV routers and multiple mocker engine instances.
    Alternates requests between the two routers to test load distribution.
341
    Tests with both etcd and file storage backends.
342
343
344
    """

    # runtime_services starts etcd and nats
345
346
347
    logger.info(
        f"Starting mocker two KV router test with {store_backend} storage backend"
    )
348

349
    # Create mocker args dictionary
350
351
352
    mocker_args = {"speedup_ratio": SPEEDUP_RATIO, "block_size": BLOCK_SIZE}

    try:
353
        # Start mocker instances with the new CLI interface
354
        logger.info(f"Starting {NUM_MOCKERS} mocker instances")
355
        mockers = MockerProcess(
356
357
358
359
            request,
            mocker_args=mocker_args,
            num_mockers=NUM_MOCKERS,
            store_backend=store_backend,
360
        )
361
362
        logger.info(f"All mockers using endpoint: {mockers.endpoint}")
        mockers.__enter__()
363

364
365
366
367
368
        # Get unique ports for this test (2 ports for two routers)
        router_ports = get_unique_ports(
            request, num_ports=2, store_backend=store_backend
        )

369
370
371
372
373
        # Run two-router test (starts KV routers internally and manages their lifecycle)
        _test_router_two_routers(
            engine_workers=mockers,
            block_size=BLOCK_SIZE,
            request=request,
374
            router_ports=router_ports,
375
376
377
            test_payload=TEST_PAYLOAD,
            num_requests=NUM_REQUESTS,
            store_backend=store_backend,
378
379
380
        )

    finally:
381
382
        if "mockers" in locals():
            mockers.__exit__(None, None, None)
383
384


385
@pytest.mark.skip(reason="Flaky, temporarily disabled")
386
@pytest.mark.timeout(60)  # ~3x average (~19.86s), rounded up (when enabled)
Alec's avatar
Alec committed
387
def test_mocker_kv_router_overload_503(
388
    request, runtime_services_dynamic_ports, predownload_tokenizers
Alec's avatar
Alec committed
389
):
390
    """Test that KV router returns 503 when mocker workers are overloaded."""
391
    logger.info("Starting mocker KV router overload test for 503 status")
392
    # Create mocker args dictionary with limited resources
393
394
395
396
397
    mocker_args = {
        "speedup_ratio": 10,
        "block_size": 4,  # Smaller block size
        "num_gpu_blocks": 64,  # Limited GPU blocks to exhaust quickly
    }
398

399
    try:
400
        # Start single mocker instance with limited resources
401
        logger.info("Starting single mocker instance with limited resources")
402
        mockers = MockerProcess(request, mocker_args=mocker_args, num_mockers=1)
403
404
        logger.info(f"Mocker using endpoint: {mockers.endpoint}")
        mockers.__enter__()
405

406
407
408
        # Get unique port for this test
        frontend_port = get_unique_ports(request, num_ports=1)[0]

409
410
411
412
413
414
415
        # Run overload 503 test
        _test_router_overload_503(
            engine_workers=mockers,
            block_size=4,  # Match the mocker's block size
            request=request,
            frontend_port=frontend_port,
            test_payload=TEST_PAYLOAD,
416
            blocks_threshold=0.2,
417
        )
418
419

    finally:
420
421
        if "mockers" in locals():
            mockers.__exit__(None, None, None)
422

423

424
@pytest.mark.timeout(22)  # ~3x average (~7.10s), rounded up
425
def test_kv_push_router_bindings(
426
    request, runtime_services_dynamic_ports, predownload_tokenizers
427
):
428
    """Test KvPushRouter Python bindings with mocker engines."""
429
430
431
432
    logger.info("Starting KvPushRouter bindings test")
    mocker_args = {"speedup_ratio": SPEEDUP_RATIO, "block_size": BLOCK_SIZE}

    try:
433
        # Start mocker instances
434
        logger.info(f"Starting {NUM_MOCKERS} mocker instances")
435
436
437
        mockers = MockerProcess(
            request, mocker_args=mocker_args, num_mockers=NUM_MOCKERS
        )
438
439
        logger.info(f"All mockers using endpoint: {mockers.endpoint}")
        mockers.__enter__()
440

441
442
443
        # Get runtime and create endpoint
        runtime = get_runtime()
        namespace = runtime.namespace(mockers.namespace)
444
        component = namespace.component(mockers.component_name)
445
446
        endpoint = component.endpoint("generate")

447
448
449
        # Run Python router bindings test
        _test_python_router_bindings(
            engine_workers=mockers,
450
451
            endpoint=endpoint,
            block_size=BLOCK_SIZE,
452
453
            model_name=MODEL_NAME,
            num_workers=NUM_MOCKERS,
454
        )
455
456

    finally:
457
458
459
460
        if "mockers" in locals():
            mockers.__exit__(None, None, None)


461
462
463
464
465
466
467
468
469
470
# NO @pytest.mark.parallel - nats_core variant stops/restarts NATS
@pytest.mark.parametrize(
    "store_backend,use_nats_core,request_plane",
    [
        ("etcd", False, "nats"),  # JetStream mode
        # ("etcd", True, "tcp"),  # ignored, needs unconditional nats_client
        ("file", False, "nats"),  # File backend
    ],
    ids=["jetstream", "file"],  # "nats_core" commented out to match commented test case
)
471
@pytest.mark.timeout(27)  # ~3x average (~8.93s), rounded up
472
473
def test_indexers_sync(
    request,
474
    runtime_services_dynamic_ports,
475
476
477
    predownload_tokenizers,
    file_storage_backend,
    store_backend,
478
479
    use_nats_core,
    request_plane,
480
):
481
482
483
484
    """
    Test that two KV routers have synchronized indexer states after processing requests.
    This test verifies that both routers converge to the same internal state.

485
486
487
488
489
490
491
492
493
494
    Tests with three configurations:
    - jetstream: etcd backend, JetStream for KV events, NATS request plane
    - nats_core: etcd backend, local indexer with NATS Core, TCP request plane
                 (includes NATS interruption/recovery testing)
    - file: file backend, JetStream for KV events, NATS request plane
    """
    logger.info(
        f"Starting indexers sync test: store_backend={store_backend}, "
        f"use_nats_core={use_nats_core}, request_plane={request_plane}"
    )
495

496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
    # Use the dynamic-port fixture to avoid hardcoded localhost:4222/2379 in parallel runs.
    nats_process, _etcd_process = runtime_services_dynamic_ports

    # Create mocker args dictionary
    mocker_args = {
        "speedup_ratio": SPEEDUP_RATIO,
        "block_size": BLOCK_SIZE,
        "enable_local_indexer": use_nats_core,
    }

    try:
        # Start mocker instances
        logger.info(f"Starting {NUM_MOCKERS} mocker instances")
        mockers = MockerProcess(
            request,
            mocker_args=mocker_args,
            num_mockers=NUM_MOCKERS,
            store_backend=store_backend,
            request_plane=request_plane,
        )
        logger.info(f"All mockers using endpoint: {mockers.endpoint}")
        mockers.__enter__()

        # Use the common test implementation (creates its own runtimes for each router)
        # Note: Consumer verification is done inside _test_router_indexers_sync while routers are alive
        _test_router_indexers_sync(
            engine_workers=mockers,
            block_size=BLOCK_SIZE,
            model_name=MODEL_NAME,
            num_workers=NUM_MOCKERS,
            store_backend=store_backend,
            request_plane=request_plane,
            test_nats_interruption=use_nats_core,
            nats_server=nats_process if use_nats_core else None,
        )

        logger.info("Indexers sync test completed successfully")

    finally:
        if "mockers" in locals():
            mockers.__exit__(None, None, None)


@pytest.mark.timeout(42)  # ~3x average (~13.80s), rounded up
Alec's avatar
Alec committed
540
def test_query_instance_id_returns_worker_and_tokens(
541
    request, runtime_services_dynamic_ports, predownload_tokenizers
Alec's avatar
Alec committed
542
):
543
    """Test query_instance_id annotation with mocker engines."""
544
545
546
547
548
    logger.info("Starting KV router query_instance_id annotation test")
    mocker_args = {"speedup_ratio": SPEEDUP_RATIO, "block_size": BLOCK_SIZE}
    os.makedirs(request.node.name, exist_ok=True)

    try:
549
        # Start mocker instances
550
        logger.info(f"Starting {NUM_MOCKERS} mocker instances")
551
552
553
        mockers = MockerProcess(
            request, mocker_args=mocker_args, num_mockers=NUM_MOCKERS
        )
554
555
        logger.info(f"All mockers using endpoint: {mockers.endpoint}")
        mockers.__enter__()
556

557
558
559
        # Get unique port for this test
        frontend_port = get_unique_ports(request, num_ports=1)[0]

560
561
562
563
564
565
566
567
        # Run query_instance_id annotation test
        _test_router_query_instance_id(
            engine_workers=mockers,
            block_size=BLOCK_SIZE,
            request=request,
            frontend_port=frontend_port,
            test_payload=TEST_PAYLOAD,
        )
568
569

    finally:
570
571
        if "mockers" in locals():
            mockers.__exit__(None, None, None)
572
573


574
@pytest.mark.parametrize("use_nats_core", [False, True], ids=["jetstream", "nats_core"])
575
@pytest.mark.timeout(29)  # ~3x average (~9.55s), rounded up
576
def test_router_decisions(
577
    request, runtime_services_dynamic_ports, predownload_tokenizers, use_nats_core
578
579
580
581
582
):
    """Validate KV cache prefix reuse and dp_rank routing by sending progressive requests with overlapping prefixes.

    Parameterized to test both JetStream (default) and NATS Core (local indexer) modes.
    """
583
584

    # runtime_services starts etcd and nats
585
586
587
588
    mode = "NATS Core (local indexer)" if use_nats_core else "JetStream"
    logger.info(
        f"Starting test router prefix reuse and KV events synchronization ({mode})"
    )
589

Yan Ru Pei's avatar
Yan Ru Pei committed
590
591
592
593
594
    # Create mocker args dictionary with dp_size=4
    mocker_args = {
        "speedup_ratio": SPEEDUP_RATIO,
        "block_size": BLOCK_SIZE,
        "dp_size": 4,
595
        "enable_local_indexer": use_nats_core,
Yan Ru Pei's avatar
Yan Ru Pei committed
596
    }
597
598

    try:
Yan Ru Pei's avatar
Yan Ru Pei committed
599
        logger.info(
600
            f"Starting 2 mocker instances with dp_size=4 each (8 total dp ranks), {mode}"
601
        )
Yan Ru Pei's avatar
Yan Ru Pei committed
602
        mockers = MockerProcess(request, mocker_args=mocker_args, num_mockers=2)
603
        logger.info(f"All mockers using endpoint: {mockers.endpoint}")
604

605
606
607
608
609
610
611
612
613
614
        # Initialize mockers
        mockers.__enter__()

        # Get runtime and create endpoint
        runtime = get_runtime()
        # Use the namespace from the mockers
        namespace = runtime.namespace(mockers.namespace)
        component = namespace.component("mocker")
        endpoint = component.endpoint("generate")

615
616
        _test_router_decisions(
            mockers, endpoint, MODEL_NAME, request, test_dp_rank=True
617
618
619
620
621
        )

    finally:
        if "mockers" in locals():
            mockers.__exit__(None, None, None)
622
623


624
@pytest.mark.parametrize("registration_order", ["prefill_first", "decode_first"])
625
@pytest.mark.timeout(59)  # ~3x average (~19.51s), rounded up
626
def test_router_decisions_disagg(
627
    request, runtime_services_dynamic_ports, predownload_tokenizers, registration_order
628
629
630
631
632
):
    """Validate KV cache prefix reuse in disaggregated prefill-decode setup.

    Tests that progressive requests with overlapping prefixes are routed to the
    same prefill worker due to KV cache reuse.
633
634
635
636

    Parameterized to test both registration orders:
    - prefill_first: prefill workers register before decode workers
    - decode_first: decode workers register before prefill workers
637
    """
638
639
640
641
    logger.info(
        f"Starting disaggregated router prefix reuse test "
        f"(registration_order={registration_order})"
    )
642
643
644
645
646
647
648
649
650
651
652
653

    # Generate shared namespace for prefill and decode workers
    namespace_suffix = generate_random_suffix()
    shared_namespace = f"test-namespace-{namespace_suffix}"

    # Create mocker args
    mocker_args = {"speedup_ratio": SPEEDUP_RATIO, "block_size": BLOCK_SIZE}

    prefill_workers = None
    decode_workers = None

    try:
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
        if registration_order == "prefill_first":
            # Start prefill workers first
            logger.info("Starting 4 prefill mocker instances (first)")
            prefill_workers = DisaggMockerProcess(
                request,
                namespace=shared_namespace,
                worker_type="prefill",
                mocker_args=mocker_args,
                num_mockers=4,
            )
            prefill_workers.__enter__()
            logger.info(f"Prefill workers using endpoint: {prefill_workers.endpoint}")

            # Then start decode workers
            logger.info("Starting 4 decode mocker instances (second)")
            decode_workers = DisaggMockerProcess(
                request,
                namespace=shared_namespace,
                worker_type="decode",
                mocker_args=mocker_args,
                num_mockers=4,
            )
            decode_workers.__enter__()
            logger.info(f"Decode workers using endpoint: {decode_workers.endpoint}")
        else:
            # Start decode workers first
            logger.info("Starting 4 decode mocker instances (first)")
            decode_workers = DisaggMockerProcess(
                request,
                namespace=shared_namespace,
                worker_type="decode",
                mocker_args=mocker_args,
                num_mockers=4,
            )
            decode_workers.__enter__()
            logger.info(f"Decode workers using endpoint: {decode_workers.endpoint}")

            # Then start prefill workers
            logger.info("Starting 4 prefill mocker instances (second)")
            prefill_workers = DisaggMockerProcess(
                request,
                namespace=shared_namespace,
                worker_type="prefill",
                mocker_args=mocker_args,
                num_mockers=4,
            )
            prefill_workers.__enter__()
            logger.info(f"Prefill workers using endpoint: {prefill_workers.endpoint}")
702
703

        # Get unique port for this test
704
705
706
        frontend_port = get_unique_ports(
            request, num_ports=1, registration_order=registration_order
        )[0]
707
708

        # Run disagg routing test
709
        _test_router_decisions_disagg(
710
711
712
713
714
715
716
717
718
719
720
721
722
            prefill_workers=prefill_workers,
            decode_workers=decode_workers,
            block_size=BLOCK_SIZE,
            request=request,
            frontend_port=frontend_port,
            test_payload=TEST_PAYLOAD,
        )

    finally:
        if decode_workers is not None:
            decode_workers.__exit__(None, None, None)
        if prefill_workers is not None:
            prefill_workers.__exit__(None, None, None)
723
724


725
@pytest.mark.parametrize("request_plane", ["nats", "tcp"], indirect=True)
726
@pytest.mark.timeout(39)  # ~3x average (~12.84s), rounded up
727
def test_busy_threshold_endpoint(
728
    request, runtime_services_dynamic_ports, predownload_tokenizers, request_plane
729
730
731
732
733
734
735
736
737
738
):
    """Test that the /busy_threshold endpoint can be hit and responds correctly.

    TODO: This doesn't actually test any e2e rejection for now. A proper test would:
    1. Set a very low threshold
    2. Send enough requests to exceed the threshold
    3. Verify that subsequent requests are rejected with 503

    For now, this test only verifies the endpoint is accessible and returns valid responses.
    """
739
740
741
    logger.info(
        f"Starting busy_threshold endpoint test with request_plane={request_plane}"
    )
742
743
744
745
746
747

    mocker_args = {"speedup_ratio": SPEEDUP_RATIO, "block_size": BLOCK_SIZE}

    try:
        logger.info(f"Starting {NUM_MOCKERS} mocker instances")
        mockers = MockerProcess(
748
749
750
751
            request,
            mocker_args=mocker_args,
            num_mockers=NUM_MOCKERS,
            request_plane=request_plane,
752
753
754
755
        )
        logger.info(f"All mockers using endpoint: {mockers.endpoint}")
        mockers.__enter__()

756
757
758
        frontend_port = get_unique_ports(
            request, num_ports=1, request_plane=request_plane
        )[0]
759
760
761
762
763
764
765

        _test_busy_threshold_endpoint(
            engine_workers=mockers,
            block_size=BLOCK_SIZE,
            request=request,
            frontend_port=frontend_port,
            test_payload=TEST_PAYLOAD,
766
            request_plane=request_plane,
767
768
769
770
771
        )

    finally:
        if "mockers" in locals():
            mockers.__exit__(None, None, None)