router_process.py 4.45 KB
Newer Older
1
2
3
4
5
6
7
8
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

import os

from tests.utils.managed_process import ManagedProcess


9
10
11
12
13
14
15
class FrontendRouterProcess(ManagedProcess):
    """Manages a dynamo.frontend process with configurable --router-mode.

    Supports all router modes (round-robin, random, kv, direct) and all
    KV-specific options (block size, thresholds, durable events, disagg).
    block_size is only sent to the CLI when router_mode is "kv".
    """
16
17
18
19
20
21
22
23
24
25
26
27
28
29

    def __init__(
        self,
        request,
        block_size: int,
        frontend_port: int,
        namespace: str,
        store_backend: str = "etcd",
        enforce_disagg: bool = False,
        blocks_threshold: float | None = None,
        tokens_threshold: float | None = None,
        tokens_threshold_frac: float | None = None,
        request_plane: str = "nats",
        durable_kv_events: bool = False,
30
        router_mode: str = "kv",
31
32
33
34
35
36
    ):
        command = [
            "python3",
            "-m",
            "dynamo.frontend",
            "--router-mode",
37
            router_mode,
38
39
40
41
42
43
44
45
            "--http-port",
            str(frontend_port),
            "--discovery-backend",
            store_backend,
            "--namespace",
            namespace,
        ]

46
47
48
        if router_mode == "kv":
            command.extend(["--kv-cache-block-size", str(block_size)])

49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
        if enforce_disagg:
            command.append("--enforce-disagg")

        if blocks_threshold is not None:
            command.extend(["--active-decode-blocks-threshold", str(blocks_threshold)])

        if tokens_threshold is not None:
            command.extend(["--active-prefill-tokens-threshold", str(tokens_threshold)])

        if tokens_threshold_frac is not None:
            command.extend(
                ["--active-prefill-tokens-threshold-frac", str(tokens_threshold_frac)]
            )

        if durable_kv_events:
            command.append("--router-durable-kv-events")

        env = os.environ.copy()
        env["DYN_REQUEST_PLANE"] = request_plane

        super().__init__(
            command=command,
            env=env,
            timeout=60,
            display_output=True,
            health_check_ports=[frontend_port],
            health_check_urls=[
                (f"http://localhost:{frontend_port}/v1/models", self._check_ready)
            ],
            log_dir=request.node.name,
            terminate_all_matching_process_names=False,
        )
        self.port = frontend_port
82
        self.router_mode = router_mode
83
84

    def _check_ready(self, response):
85
        """Check if KV, random, round-robin, or direct router is ready"""
86
87
88
89
        return response.status_code == 200

    def __exit__(self, exc_type, exc_val, exc_tb):
        super().__exit__(exc_type, exc_val, exc_tb)
90
91


92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
class DirectRouterProcess(ManagedProcess):
    """Manages a process in Direct routing mode for EPP-style disagg tests.

    In Direct mode, the router does not select workers itself — worker IDs
    must be supplied via x-worker-instance-id and x-prefill-instance-id headers.
    """

    def __init__(
        self,
        request,
        frontend_port: int,
        namespace: str,
        enforce_disagg: bool = True,
        request_plane: str = "nats",
    ):
        command = [
            "python3",
            "-m",
            "dynamo.frontend",
            "--router-mode",
            "direct",
            "--http-port",
            str(frontend_port),
            "--namespace",
            namespace,
        ]

        if enforce_disagg:
            command.append("--enforce-disagg")

        env = os.environ.copy()
        env["DYN_REQUEST_PLANE"] = request_plane

        super().__init__(
            command=command,
            env=env,
            timeout=60,
            display_output=True,
            health_check_ports=[frontend_port],
            health_check_urls=[
                (f"http://localhost:{frontend_port}/v1/models", self._check_ready)
            ],
            log_dir=request.node.name,
            terminate_all_matching_process_names=False,
        )
        self.port = frontend_port

    def _check_ready(self, response):
        return response.status_code == 200

    def __exit__(self, exc_type, exc_val, exc_tb):
        super().__exit__(exc_type, exc_val, exc_tb)


146
147
148
# Backward-compatible alias so existing callers that import KVRouterProcess
# continue to work without changes.
KVRouterProcess = FrontendRouterProcess