"lib/llm/src/vscode:/vscode.git/clone" did not exist on "6c539fbdacaa8d752695ca50e44fd2c96f04fdcc"
test_completion_mocker_engine.py 6.21 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0


from __future__ import annotations

import logging
import os
import shutil
import time
from typing import Any, Dict

import pytest
import requests

from tests.conftest import EtcdServer, NatsServer
from tests.utils.constants import QWEN
from tests.utils.managed_process import ManagedProcess
from tests.utils.payloads import check_models_api

logger = logging.getLogger(__name__)

TEST_MODEL = QWEN

25
26
27
28
29
30
31
pytestmark = [
    pytest.mark.e2e,
    pytest.mark.gpu_1,
    pytest.mark.post_merge,
    pytest.mark.model(TEST_MODEL),
]

32
33
34
35
36
37
38

class DynamoFrontendProcess(ManagedProcess):
    """Process manager for Dynamo frontend"""

    def __init__(self, request):
        command = ["python", "-m", "dynamo.frontend", "--router-mode", "round-robin"]

39
40
41
42
        # Unset DYN_SYSTEM_PORT - frontend doesn't use system metrics server
        env = os.environ.copy()
        env.pop("DYN_SYSTEM_PORT", None)

43
44
45
46
47
48
49
50
51
52
53
54
        log_dir = f"{request.node.name}_frontend"

        # Clean up any existing log directory from previous runs
        try:
            shutil.rmtree(log_dir)
            logger.info(f"Cleaned up existing log directory: {log_dir}")
        except FileNotFoundError:
            # Directory doesn't exist, which is fine
            pass

        super().__init__(
            command=command,
55
            env=env,
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
            display_output=True,
            terminate_existing=True,
            log_dir=log_dir,
        )


class MockWorkerProcess(ManagedProcess):
    def __init__(self, request, worker_id: str = "mocker-worker"):
        self.worker_id = worker_id

        command = [
            "python3",
            "-m",
            "dynamo.mocker",
            "--model-path",
            TEST_MODEL,
            "--speedup-ratio",
            "100",
        ]

        env = os.environ.copy()
        env["DYN_LOG"] = "debug"
        env["DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS"] = '["generate"]'
        env["DYN_SYSTEM_PORT"] = "8083"

        log_dir = f"{request.node.name}_{worker_id}"

        try:
            shutil.rmtree(log_dir)
        except FileNotFoundError:
            pass

        super().__init__(
            command=command,
            env=env,
            health_check_urls=[
                ("http://localhost:8000/v1/models", check_models_api),
                ("http://localhost:8083/health", self.is_ready),
            ],
            timeout=300,
            display_output=True,
            terminate_existing=False,
            stragglers=["VLLM::EngineCore"],
            straggler_commands=["-m dynamo.mocker"],
            log_dir=log_dir,
        )

    def is_ready(self, response) -> bool:
        try:
            status = (response.json() or {}).get("status")
        except ValueError:
            logger.warning("%s health response is not valid JSON", self.worker_id)
            return False

        is_ready = status == "ready"
        if is_ready:
            logger.info("%s status is ready", self.worker_id)
        else:
            logger.warning("%s status is not ready: %s", self.worker_id, status)
        return is_ready


def _send_completion_request(
    payload: Dict[str, Any],
    timeout: int = 180,
) -> requests.Response:
    """Send a text completion request"""

    headers = {"Content-Type": "application/json"}
    print(f"Sending request: {time.time()}")

    response = requests.post(
        "http://localhost:8000/v1/completions",
        headers=headers,
        json=payload,
        timeout=timeout,
    )
    return response


@pytest.fixture(scope="module")
def runtime_services(request):
    """Module-scoped runtime services for this test file."""
    with NatsServer(request) as nats_process:
        with EtcdServer(request) as etcd_process:
            yield nats_process, etcd_process


@pytest.fixture(scope="module")
def start_services(request, runtime_services):
    """Start frontend and worker processes once for this module's tests."""
    with DynamoFrontendProcess(request):
        logger.info("Frontend started for tests")
        with MockWorkerProcess(request):
            logger.info("Worker started for tests")
            yield


@pytest.mark.usefixtures("start_services")
def test_completion_string_prompt() -> None:
    payload: Dict[str, Any] = {
        "model": TEST_MODEL,
        "prompt": "Tell me about Mars",
        "max_tokens": 2000,
    }

    response = _send_completion_request(payload)

    assert response.status_code == 200, (
        f"Completion request failed with status "
        f"{response.status_code}: {response.text}"
    )


170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
@pytest.mark.usefixtures("start_services")
def test_completion_empty_array_prompt() -> None:
    payload: Dict[str, Any] = {
        "model": TEST_MODEL,
        "prompt": [],
        "max_tokens": 2000,
    }

    response = _send_completion_request(payload)

    assert response.status_code == 400, (
        f"Completion request should failed with status 400 but got"
        f"{response.status_code}: {response.text}"
    )


186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
@pytest.mark.usefixtures("start_services")
def test_completion_single_element_array_prompt() -> None:
    payload: Dict[str, Any] = {
        "model": TEST_MODEL,
        "prompt": ["Tell me about Mars"],
        "max_tokens": 2000,
    }

    response = _send_completion_request(payload)

    assert response.status_code == 200, (
        f"Completion request failed with status "
        f"{response.status_code}: {response.text}"
    )


@pytest.mark.usefixtures("start_services")
def test_completion_multi_element_array_prompt() -> None:
    payload: Dict[str, Any] = {
        "model": TEST_MODEL,
206
207
208
209
210
211
        "prompt": [
            "Tell me about Mars",
            "Tell me about Ceres",
            "Tell me about Jupiter",
        ],
        "max_tokens": 300,
212
213
214
    }

    response = _send_completion_request(payload)
215
216
217
218
219
220
221
222
223
    response_data = response.json()

    assert response.status_code == 200, (
        f"Completion request failed with status "
        f"{response.status_code}: {response.text}"
    )

    expected_choices = len(payload.get("prompt"))  # type: ignore
    choices = len(response_data.get("choices", []))
224
225

    assert (
226
227
        expected_choices == choices
    ), f"Expected {expected_choices} choices, got {choices}"