"lib/engines/vscode:/vscode.git/clone" did not exist on "552146f23ce1b691dbcb57919b85fe1ba0638bf2"
router_process.py 5.94 KB
Newer Older
1
2
3
4
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

import os
5
import sys
6
7
8
9

from tests.utils.managed_process import ManagedProcess


10
11
12
13
14
15
16
class FrontendRouterProcess(ManagedProcess):
    """Manages a dynamo.frontend process with configurable --router-mode.

    Supports all router modes (round-robin, random, kv, direct) and all
    KV-specific options (block size, thresholds, durable events, disagg).
    block_size is only sent to the CLI when router_mode is "kv".
    """
17
18
19
20
21
22
23
24
25
26
27
28
29
30

    def __init__(
        self,
        request,
        block_size: int,
        frontend_port: int,
        namespace: str,
        store_backend: str = "etcd",
        enforce_disagg: bool = False,
        blocks_threshold: float | None = None,
        tokens_threshold: float | None = None,
        tokens_threshold_frac: float | None = None,
        request_plane: str = "nats",
        durable_kv_events: bool = False,
31
        router_mode: str = "kv",
32
        min_initial_workers: int | None = None,
33
        router_aic_config: dict[str, str | int] | None = None,
34
35
        serve_indexer: bool = False,
        use_remote_indexer: bool = False,
36
37
    ):
        command = [
38
            sys.executable,
39
40
41
            "-m",
            "dynamo.frontend",
            "--router-mode",
42
            router_mode,
43
44
45
46
47
48
49
50
            "--http-port",
            str(frontend_port),
            "--discovery-backend",
            store_backend,
            "--namespace",
            namespace,
        ]

51
52
53
        if router_mode == "kv":
            command.extend(["--kv-cache-block-size", str(block_size)])

54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
        if enforce_disagg:
            command.append("--enforce-disagg")

        if blocks_threshold is not None:
            command.extend(["--active-decode-blocks-threshold", str(blocks_threshold)])

        if tokens_threshold is not None:
            command.extend(["--active-prefill-tokens-threshold", str(tokens_threshold)])

        if tokens_threshold_frac is not None:
            command.extend(
                ["--active-prefill-tokens-threshold-frac", str(tokens_threshold_frac)]
            )

        if durable_kv_events:
            command.append("--router-durable-kv-events")

71
72
73
74
75
76
        if serve_indexer:
            command.append("--serve-indexer")

        if use_remote_indexer:
            command.append("--use-remote-indexer")

77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
        if router_aic_config is not None:
            command.extend(
                [
                    "--router-track-prefill-tokens",
                    "--router-prefill-load-model",
                    "aic",
                    "--aic-backend",
                    str(router_aic_config["aic_backend"]),
                    "--aic-system",
                    str(router_aic_config["aic_system"]),
                    "--aic-model-path",
                    str(router_aic_config["aic_model_path"]),
                    "--aic-tp-size",
                    str(router_aic_config.get("aic_tp_size", 1)),
                ]
            )
            if "aic_backend_version" in router_aic_config:
                command.extend(
                    [
                        "--aic-backend-version",
                        str(router_aic_config["aic_backend_version"]),
                    ]
                )

101
102
        env = os.environ.copy()
        env["DYN_REQUEST_PLANE"] = request_plane
103
104
        if min_initial_workers is not None:
            env["DYN_ROUTER_MIN_INITIAL_WORKERS"] = str(min_initial_workers)
105
106
107
108
109
110
111
112
113
114
115
116

        super().__init__(
            command=command,
            env=env,
            timeout=60,
            display_output=True,
            health_check_ports=[frontend_port],
            health_check_urls=[
                (f"http://localhost:{frontend_port}/v1/models", self._check_ready)
            ],
            log_dir=request.node.name,
            terminate_all_matching_process_names=False,
117
            display_name=f"dynamo-frontend-{router_mode}",
118
119
        )
        self.port = frontend_port
120
        self.router_mode = router_mode
121
122

    def _check_ready(self, response):
123
        """Check if KV, random, round-robin, or direct router is ready"""
124
125
126
127
        return response.status_code == 200

    def __exit__(self, exc_type, exc_val, exc_tb):
        super().__exit__(exc_type, exc_val, exc_tb)
128
129


130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
class DirectRouterProcess(ManagedProcess):
    """Manages a process in Direct routing mode for EPP-style disagg tests.

    In Direct mode, the router does not select workers itself — worker IDs
    must be supplied via x-worker-instance-id and x-prefill-instance-id headers.
    """

    def __init__(
        self,
        request,
        frontend_port: int,
        namespace: str,
        enforce_disagg: bool = True,
        request_plane: str = "nats",
    ):
        command = [
146
            sys.executable,
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
            "-m",
            "dynamo.frontend",
            "--router-mode",
            "direct",
            "--http-port",
            str(frontend_port),
            "--namespace",
            namespace,
        ]

        if enforce_disagg:
            command.append("--enforce-disagg")

        env = os.environ.copy()
        env["DYN_REQUEST_PLANE"] = request_plane

        super().__init__(
            command=command,
            env=env,
            timeout=60,
            display_output=True,
            health_check_ports=[frontend_port],
            health_check_urls=[
                (f"http://localhost:{frontend_port}/v1/models", self._check_ready)
            ],
            log_dir=request.node.name,
            terminate_all_matching_process_names=False,
174
            display_name="dynamo-frontend-direct",
175
176
177
178
179
180
181
182
183
184
        )
        self.port = frontend_port

    def _check_ready(self, response):
        return response.status_code == 200

    def __exit__(self, exc_type, exc_val, exc_tb):
        super().__exit__(exc_type, exc_val, exc_tb)


185
186
187
# Backward-compatible alias so existing callers that import KVRouterProcess
# continue to work without changes.
KVRouterProcess = FrontendRouterProcess