router_process.py 5.58 KB
Newer Older
1
2
3
4
5
6
7
8
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

import os

from tests.utils.managed_process import ManagedProcess


9
10
11
12
13
14
15
class FrontendRouterProcess(ManagedProcess):
    """Manages a dynamo.frontend process with configurable --router-mode.

    Supports all router modes (round-robin, random, kv, direct) and all
    KV-specific options (block size, thresholds, durable events, disagg).
    block_size is only sent to the CLI when router_mode is "kv".
    """
16
17
18
19
20
21
22
23
24
25
26
27
28
29

    def __init__(
        self,
        request,
        block_size: int,
        frontend_port: int,
        namespace: str,
        store_backend: str = "etcd",
        enforce_disagg: bool = False,
        blocks_threshold: float | None = None,
        tokens_threshold: float | None = None,
        tokens_threshold_frac: float | None = None,
        request_plane: str = "nats",
        durable_kv_events: bool = False,
30
        router_mode: str = "kv",
31
        min_initial_workers: int | None = None,
32
        router_aic_config: dict[str, str | int] | None = None,
33
34
35
36
37
38
    ):
        command = [
            "python3",
            "-m",
            "dynamo.frontend",
            "--router-mode",
39
            router_mode,
40
41
42
43
44
45
46
47
            "--http-port",
            str(frontend_port),
            "--discovery-backend",
            store_backend,
            "--namespace",
            namespace,
        ]

48
49
50
        if router_mode == "kv":
            command.extend(["--kv-cache-block-size", str(block_size)])

51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
        if enforce_disagg:
            command.append("--enforce-disagg")

        if blocks_threshold is not None:
            command.extend(["--active-decode-blocks-threshold", str(blocks_threshold)])

        if tokens_threshold is not None:
            command.extend(["--active-prefill-tokens-threshold", str(tokens_threshold)])

        if tokens_threshold_frac is not None:
            command.extend(
                ["--active-prefill-tokens-threshold-frac", str(tokens_threshold_frac)]
            )

        if durable_kv_events:
            command.append("--router-durable-kv-events")

68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
        if router_aic_config is not None:
            command.extend(
                [
                    "--router-track-prefill-tokens",
                    "--router-prefill-load-model",
                    "aic",
                    "--aic-backend",
                    str(router_aic_config["aic_backend"]),
                    "--aic-system",
                    str(router_aic_config["aic_system"]),
                    "--aic-model-path",
                    str(router_aic_config["aic_model_path"]),
                    "--aic-tp-size",
                    str(router_aic_config.get("aic_tp_size", 1)),
                ]
            )
            if "aic_backend_version" in router_aic_config:
                command.extend(
                    [
                        "--aic-backend-version",
                        str(router_aic_config["aic_backend_version"]),
                    ]
                )

92
93
        env = os.environ.copy()
        env["DYN_REQUEST_PLANE"] = request_plane
94
95
        if min_initial_workers is not None:
            env["DYN_ROUTER_MIN_INITIAL_WORKERS"] = str(min_initial_workers)
96
97
98
99
100
101
102
103
104
105
106
107
108
109

        super().__init__(
            command=command,
            env=env,
            timeout=60,
            display_output=True,
            health_check_ports=[frontend_port],
            health_check_urls=[
                (f"http://localhost:{frontend_port}/v1/models", self._check_ready)
            ],
            log_dir=request.node.name,
            terminate_all_matching_process_names=False,
        )
        self.port = frontend_port
110
        self.router_mode = router_mode
111
112

    def _check_ready(self, response):
113
        """Check if KV, random, round-robin, or direct router is ready"""
114
115
116
117
        return response.status_code == 200

    def __exit__(self, exc_type, exc_val, exc_tb):
        super().__exit__(exc_type, exc_val, exc_tb)
118
119


120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
class DirectRouterProcess(ManagedProcess):
    """Manages a process in Direct routing mode for EPP-style disagg tests.

    In Direct mode, the router does not select workers itself — worker IDs
    must be supplied via x-worker-instance-id and x-prefill-instance-id headers.
    """

    def __init__(
        self,
        request,
        frontend_port: int,
        namespace: str,
        enforce_disagg: bool = True,
        request_plane: str = "nats",
    ):
        command = [
            "python3",
            "-m",
            "dynamo.frontend",
            "--router-mode",
            "direct",
            "--http-port",
            str(frontend_port),
            "--namespace",
            namespace,
        ]

        if enforce_disagg:
            command.append("--enforce-disagg")

        env = os.environ.copy()
        env["DYN_REQUEST_PLANE"] = request_plane

        super().__init__(
            command=command,
            env=env,
            timeout=60,
            display_output=True,
            health_check_ports=[frontend_port],
            health_check_urls=[
                (f"http://localhost:{frontend_port}/v1/models", self._check_ready)
            ],
            log_dir=request.node.name,
            terminate_all_matching_process_names=False,
        )
        self.port = frontend_port

    def _check_ready(self, response):
        return response.status_code == 200

    def __exit__(self, exc_type, exc_val, exc_tb):
        super().__exit__(exc_type, exc_val, exc_tb)


174
175
176
# Backward-compatible alias so existing callers that import KVRouterProcess
# continue to work without changes.
KVRouterProcess = FrontendRouterProcess