test_completion_mocker_engine.py 6.45 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0


from __future__ import annotations

import logging
import os
import shutil
import time
from typing import Any, Dict

import pytest
import requests

from tests.conftest import EtcdServer, NatsServer
from tests.utils.constants import QWEN
from tests.utils.managed_process import ManagedProcess
from tests.utils.payloads import check_models_api

logger = logging.getLogger(__name__)

TEST_MODEL = QWEN


class DynamoFrontendProcess(ManagedProcess):
    """Process manager for Dynamo frontend"""

    def __init__(self, request):
        command = ["python", "-m", "dynamo.frontend", "--router-mode", "round-robin"]

32
33
34
35
        # Unset DYN_SYSTEM_PORT - frontend doesn't use system metrics server
        env = os.environ.copy()
        env.pop("DYN_SYSTEM_PORT", None)

36
37
38
39
40
41
42
43
44
45
46
47
        log_dir = f"{request.node.name}_frontend"

        # Clean up any existing log directory from previous runs
        try:
            shutil.rmtree(log_dir)
            logger.info(f"Cleaned up existing log directory: {log_dir}")
        except FileNotFoundError:
            # Directory doesn't exist, which is fine
            pass

        super().__init__(
            command=command,
48
            env=env,
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
            display_output=True,
            terminate_existing=True,
            log_dir=log_dir,
        )


class MockWorkerProcess(ManagedProcess):
    def __init__(self, request, worker_id: str = "mocker-worker"):
        self.worker_id = worker_id

        command = [
            "python3",
            "-m",
            "dynamo.mocker",
            "--model-path",
            TEST_MODEL,
            "--speedup-ratio",
            "100",
        ]

        env = os.environ.copy()
        env["DYN_LOG"] = "debug"
        env["DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS"] = '["generate"]'
        env["DYN_SYSTEM_PORT"] = "8083"

        log_dir = f"{request.node.name}_{worker_id}"

        try:
            shutil.rmtree(log_dir)
        except FileNotFoundError:
            pass

        super().__init__(
            command=command,
            env=env,
            health_check_urls=[
                ("http://localhost:8000/v1/models", check_models_api),
                ("http://localhost:8083/health", self.is_ready),
            ],
            timeout=300,
            display_output=True,
            terminate_existing=False,
            stragglers=["VLLM::EngineCore"],
            straggler_commands=["-m dynamo.mocker"],
            log_dir=log_dir,
        )

    def is_ready(self, response) -> bool:
        try:
            status = (response.json() or {}).get("status")
        except ValueError:
            logger.warning("%s health response is not valid JSON", self.worker_id)
            return False

        is_ready = status == "ready"
        if is_ready:
            logger.info("%s status is ready", self.worker_id)
        else:
            logger.warning("%s status is not ready: %s", self.worker_id, status)
        return is_ready


def _send_completion_request(
    payload: Dict[str, Any],
    timeout: int = 180,
) -> requests.Response:
    """Send a text completion request"""

    headers = {"Content-Type": "application/json"}
    print(f"Sending request: {time.time()}")

    response = requests.post(
        "http://localhost:8000/v1/completions",
        headers=headers,
        json=payload,
        timeout=timeout,
    )
    return response


@pytest.fixture(scope="module")
def runtime_services(request):
    """Module-scoped runtime services for this test file."""
    with NatsServer(request) as nats_process:
        with EtcdServer(request) as etcd_process:
            yield nats_process, etcd_process


@pytest.fixture(scope="module")
def start_services(request, runtime_services):
    """Start frontend and worker processes once for this module's tests."""
    with DynamoFrontendProcess(request):
        logger.info("Frontend started for tests")
        with MockWorkerProcess(request):
            logger.info("Worker started for tests")
            yield


@pytest.mark.usefixtures("start_services")
@pytest.mark.e2e
149
150
@pytest.mark.gpu_1
@pytest.mark.post_merge
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
@pytest.mark.model(TEST_MODEL)
def test_completion_string_prompt() -> None:
    payload: Dict[str, Any] = {
        "model": TEST_MODEL,
        "prompt": "Tell me about Mars",
        "max_tokens": 2000,
    }

    response = _send_completion_request(payload)

    assert response.status_code == 200, (
        f"Completion request failed with status "
        f"{response.status_code}: {response.text}"
    )


167
168
@pytest.mark.usefixtures("start_services")
@pytest.mark.e2e
169
170
@pytest.mark.gpu_1
@pytest.mark.post_merge
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
@pytest.mark.model(TEST_MODEL)
def test_completion_empty_array_prompt() -> None:
    payload: Dict[str, Any] = {
        "model": TEST_MODEL,
        "prompt": [],
        "max_tokens": 2000,
    }

    response = _send_completion_request(payload)

    assert response.status_code == 400, (
        f"Completion request should failed with status 400 but got"
        f"{response.status_code}: {response.text}"
    )


187
188
@pytest.mark.usefixtures("start_services")
@pytest.mark.e2e
189
190
@pytest.mark.gpu_1
@pytest.mark.post_merge
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
@pytest.mark.model(TEST_MODEL)
def test_completion_single_element_array_prompt() -> None:
    payload: Dict[str, Any] = {
        "model": TEST_MODEL,
        "prompt": ["Tell me about Mars"],
        "max_tokens": 2000,
    }

    response = _send_completion_request(payload)

    assert response.status_code == 200, (
        f"Completion request failed with status "
        f"{response.status_code}: {response.text}"
    )


@pytest.mark.usefixtures("start_services")
@pytest.mark.e2e
209
210
@pytest.mark.gpu_1
@pytest.mark.post_merge
211
212
213
214
@pytest.mark.model(TEST_MODEL)
def test_completion_multi_element_array_prompt() -> None:
    payload: Dict[str, Any] = {
        "model": TEST_MODEL,
215
216
217
218
219
220
        "prompt": [
            "Tell me about Mars",
            "Tell me about Ceres",
            "Tell me about Jupiter",
        ],
        "max_tokens": 300,
221
222
223
    }

    response = _send_completion_request(payload)
224
225
226
227
228
229
230
231
232
    response_data = response.json()

    assert response.status_code == 200, (
        f"Completion request failed with status "
        f"{response.status_code}: {response.text}"
    )

    expected_choices = len(payload.get("prompt"))  # type: ignore
    choices = len(response_data.get("choices", []))
233
234

    assert (
235
236
        expected_choices == choices
    ), f"Expected {expected_choices} choices, got {choices}"