conftest.py 9.65 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

"""Shared fixtures for frontend tests."""

import logging
import os
import shutil
import time

import pytest
import requests
import tritonclient.grpc as grpcclient

from tests.utils.constants import QWEN
from tests.utils.managed_process import DynamoFrontendProcess, ManagedProcess
from tests.utils.port_utils import allocate_port, deallocate_port

logger = logging.getLogger(__name__)


@pytest.fixture(scope="function")
def start_services_with_http(
    request, runtime_services_dynamic_ports, dynamo_dynamic_ports
):
    """Start HTTP frontend with dynamic ports.

    Function-scoped to allow parallel test execution.
    Each test gets its own HTTP frontend on a unique port.
    Uses runtime_services_dynamic_ports for truly dynamic NATS/Etcd ports.

    Individual test files should start their specific worker processes.

    Yields:
        Tuple of (frontend_port, system_port) for use by worker processes
    """
    ports = dynamo_dynamic_ports
    # In xdist/parallel runs, never kill other workers' frontends.
    with DynamoFrontendProcess(
        request,
        frontend_port=ports.frontend_port,
        terminate_existing=False,
    ):
        logger.info(f"HTTP Frontend started on port {ports.frontend_port}")
        yield ports.frontend_port, ports.system_ports[0]


def check_grpc_server_ready(
    port: int, max_attempts: int = 30, retry_delay: float = 0.5
) -> bool:
    """Check if gRPC server is ready to accept connections.

    Args:
        port: gRPC server port
        max_attempts: Maximum number of connection attempts
        retry_delay: Delay between retry attempts in seconds

    Returns:
        True if server is ready

    Raises:
        Exception: If server is not ready after max_attempts
    """
    for attempt in range(max_attempts):
        try:
            client = grpcclient.InferenceServerClient(f"localhost:{port}")
            if client.is_server_ready():
                logger.info(
                    f"gRPC server is ready on port {port} (attempt {attempt + 1}/{max_attempts})"
                )
                # Add delay after readiness check to ensure server is fully stable for parallel tests
                # Retry the check once more to confirm stability
                time.sleep(0.5)
                if client.is_server_ready():
                    logger.info(f"gRPC server confirmed stable on port {port}")
                    return True
                else:
                    logger.warning(
                        f"gRPC server became unstable on port {port}, retrying..."
                    )
                    continue
        except Exception as e:
            if attempt < max_attempts - 1:
                logger.debug(f"gRPC server not ready on attempt {attempt + 1}: {e}")
                time.sleep(retry_delay)
            else:
                logger.error(
                    f"gRPC server not ready after {max_attempts} attempts: {e}"
                )
                raise
    return False


def wait_for_http_completions_ready(
    *,
    frontend_port: int,
    model: str,
    max_attempts: int = 30,
    retry_delay: float = 0.25,
) -> None:
    """Wait until the HTTP completions route can actually serve the given model.

    Why this exists:
    - `/v1/models` can list a model slightly before the HTTP completions route is
      ready to route requests to it (under xdist parallel startup).
    - If we start sending requests immediately, we can intermittently get 404
      "Model not found" even though the model shows up in `/v1/models`.
    """

    payload = {"model": model, "prompt": "ping", "max_tokens": 1}
    last_status: int | None = None
    last_body: str = ""

    for attempt in range(max_attempts):
        try:
            resp = requests.post(
                f"http://localhost:{frontend_port}/v1/completions",
                json=payload,
                timeout=10,
            )
            last_status = resp.status_code
            last_body = resp.text

            if resp.status_code == 200:
                return

            # Common transient during startup: model is discovered but not routable yet.
            if resp.status_code == 404 and "Model not found" in resp.text:
                time.sleep(retry_delay)
                continue

            # Any other error is likely real (e.g. schema validation changed).
            time.sleep(retry_delay)
        except requests.RequestException as e:
            last_body = str(e)
            time.sleep(retry_delay)

    raise RuntimeError(
        "HTTP completions route did not become ready "
        f"(frontend_port={frontend_port}, model={model}, "
        f"last_status={last_status}, last_body={last_body})"
    )


@pytest.fixture(scope="function")
def start_services_with_grpc(
    request, runtime_services_dynamic_ports, dynamo_dynamic_ports
):
    """Start gRPC frontend with dynamic ports.

    Function-scoped to allow parallel test execution.
    Each test gets its own gRPC frontend on a unique port.
    Uses runtime_services_dynamic_ports which provides isolated NATS/Etcd per test,
    so no namespace conflicts - each test has its own Etcd/NATS instance!

    Allocates an additional port for HTTP metrics server (used by gRPC service internally)
    to enable parallel test execution without port 8788 conflicts.

    Individual test files should start their specific worker processes.

    Yields:
        Tuple of (frontend_port, system_port) for use by worker processes
    """
    ports = dynamo_dynamic_ports
    # Allocate additional port for HTTP metrics server (gRPC service requirement)
    grpc_metrics_port = allocate_port(8788)

    try:
        with DynamoFrontendProcess(
            request,
            frontend_port=ports.frontend_port,
            terminate_existing=False,
            extra_args=[
                "--kserve-grpc-server",
                "--grpc-metrics-port",
                str(grpc_metrics_port),
            ],
        ):
            logger.info(
                f"gRPC Frontend starting on port {ports.frontend_port} "
                f"(metrics on {grpc_metrics_port})"
            )
            check_grpc_server_ready(ports.frontend_port)
            yield ports.frontend_port, ports.system_ports[0]
    finally:
        deallocate_port(grpc_metrics_port)


########################################################
# Shared Worker Classes
########################################################


class MockerWorkerProcess(ManagedProcess):
    """Shared mocker worker process for frontend tests.

    Uses dynamo.mocker with configurable model and speedup ratio.
    Can be used by any frontend test that needs a fast mock backend.
    """

    def __init__(
        self,
        request,
        model: str,
        frontend_port: int,
        system_port: int,
        speedup_ratio: int = 100,
        worker_id: str = "mocker-worker",
    ):
        self.worker_id = worker_id
        self.frontend_port = frontend_port
        self.system_port = system_port

        command = [
            "python3",
            "-m",
            "dynamo.mocker",
            "--model-path",
            model,
            "--speedup-ratio",
            str(speedup_ratio),
        ]

        env = os.environ.copy()
        env["DYN_LOG"] = "debug"
        env["DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS"] = '["generate"]'
        env["DYN_SYSTEM_PORT"] = str(system_port)

        log_dir = f"{request.node.name}_{worker_id}"

        try:
            shutil.rmtree(log_dir)
        except FileNotFoundError:
            pass

        super().__init__(
            command=command,
            env=env,
            health_check_urls=[
                (f"http://localhost:{frontend_port}/v1/models", self._check_models_api),
                (f"http://localhost:{system_port}/health", self.is_ready),
            ],
            timeout=300,
            display_output=True,
            terminate_existing=False,
            stragglers=["VLLM::EngineCore"],
            straggler_commands=["-m dynamo.mocker"],
            log_dir=log_dir,
        )

    def _check_models_api(self, response):
        """Check if models API is ready"""
        try:
            if response.status_code != 200:
                return False
            data = response.json()
            models = data.get("data", [])
            return len(models) > 0
        except Exception:
            return False

    def is_ready(self, response) -> bool:
        try:
            status = (response.json() or {}).get("status")
        except ValueError:
            logger.warning("%s health response is not valid JSON", self.worker_id)
            return False

        is_ready = status == "ready"
        if is_ready:
            logger.info("%s status is ready", self.worker_id)
        else:
            logger.warning("%s status is not ready: %s", self.worker_id, status)
        return is_ready


@pytest.fixture(scope="function")
def start_services_with_mocker(
    request, start_services_with_http, predownload_tokenizers
):
    """Start mocker worker with the shared HTTP frontend.

    Function-scoped to allow parallel test execution.
    Each test gets its own frontend + mocker worker on unique ports.

    Yields:
        frontend_port: Port where frontend is running
    """
    frontend_port, system_port = start_services_with_http
    # Default to QWEN for compatibility; per-test model selection not yet implemented.
    model = QWEN

    with MockerWorkerProcess(request, model, frontend_port, system_port):
        wait_for_http_completions_ready(frontend_port=frontend_port, model=model)
        logger.info(f"Mocker Worker started for test on port {frontend_port}")
        yield frontend_port