test_prompt_embeds.py 7.1 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

"""
End-to-end tests for prompt embeddings support in Dynamo.

These tests validate behavior that cannot be covered by Rust unit tests:
- Streaming responses with embeddings
- Python-side tensor decoding errors
- Usage statistics from worker (the v2.0.4 bug fix)
- Large payload handling through NATS
- Concurrent request handling

Validation tests (base64, size limits, empty prompt) are covered by Rust unit tests
in lib/llm/src/protocols/openai/completions.rs
"""

import base64
import concurrent.futures
import io
import logging

import pytest
import torch
from openai import OpenAI

logger = logging.getLogger(__name__)

# Test model - small and fast for CI
TEST_MODEL = "Qwen/Qwen3-0.6B"


@pytest.fixture
def dynamo_client():
    """Create OpenAI client pointing to Dynamo frontend."""
    return OpenAI(
        api_key="EMPTY",
        base_url="http://localhost:8000/v1",
    )


def create_embeddings_base64(shape: tuple[int, ...]) -> str:
    """Create random embeddings tensor and return as base64-encoded PyTorch format."""
    embeddings = torch.randn(*shape, dtype=torch.float32)
    buffer = io.BytesIO()
    torch.save(embeddings, buffer)
    buffer.seek(0)
    return base64.b64encode(buffer.read()).decode("utf-8")


@pytest.mark.integration
@pytest.mark.vllm
@pytest.mark.nightly
@pytest.mark.gpu_1
@pytest.mark.model(TEST_MODEL)
class TestPromptEmbedsE2E:
    """
    End-to-end tests for prompt embeddings.

    These tests require a running Dynamo instance with vLLM backend.
    They validate behavior that Rust unit tests cannot cover.
    """

    def test_streaming_with_embeddings(self, dynamo_client):
        """
        Test streaming responses work correctly with embeddings.

        This is E2E only - Rust tests can't verify streaming behavior.
        """
        embeddings_base64 = create_embeddings_base64((10, 1024))

        stream = dynamo_client.completions.create(
            model=TEST_MODEL,
            prompt="",
            max_tokens=10,
            stream=True,
            extra_body={"prompt_embeds": embeddings_base64},
        )

        chunks = list(stream)

        assert len(chunks) > 0, "Should receive at least one chunk"
        # Last chunk should have finish_reason
        if chunks[-1].choices:
            assert chunks[-1].choices[0].finish_reason is not None

    def test_invalid_tensor_data_rejected(self, dynamo_client):
        """
        Test that invalid tensor data is properly rejected by Python decoder.

        This tests the Python-side torch.load() error handling, which
        Rust validation cannot cover (Rust only checks base64 and size).
        """
        # Create data that passes Rust validation (valid base64, >100 bytes)
        # but fails Python torch.load()
        invalid_data = b"this is not a valid pytorch tensor format!" * 10
        invalid_base64 = base64.b64encode(invalid_data).decode("utf-8")

        with pytest.raises(Exception) as exc_info:
            dynamo_client.completions.create(
                model=TEST_MODEL,
                prompt="",
                max_tokens=5,
                extra_body={"prompt_embeds": invalid_base64},
            )

        error_msg = str(exc_info.value).lower()
        assert any(
            keyword in error_msg
            for keyword in ["pytorch", "tensor", "invalid", "decode", "error"]
        ), f"Expected tensor decode error, got: {error_msg}"

    def test_usage_prompt_tokens_not_zero(self, dynamo_client):
        """
        CRITICAL REGRESSION TEST: Ensure prompt_tokens is correctly reported.

        This validates the v2.0.4 fix where prompt_tokens was incorrectly
        reported as 0 when using embeddings. The worker extracts sequence
        length from tensor shape and includes it in completion_usage.

        Rust tests cannot verify this - it requires E2E validation.
        """
        sequence_length = 20
        embeddings_base64 = create_embeddings_base64((sequence_length, 1024))

        response = dynamo_client.completions.create(
            model=TEST_MODEL,
            prompt="",
            max_tokens=3,
            extra_body={"prompt_embeds": embeddings_base64},
        )

        assert response.usage is not None, "Should have usage statistics"
        assert (
            response.usage.prompt_tokens != 0
        ), "BUG REGRESSION: prompt_tokens is 0! This was the bug in v2.0.3."
        assert (
            response.usage.prompt_tokens == sequence_length
        ), f"Expected prompt_tokens={sequence_length}, got {response.usage.prompt_tokens}"
        assert response.usage.total_tokens == (
            response.usage.prompt_tokens + response.usage.completion_tokens
        ), "total_tokens should equal prompt_tokens + completion_tokens"

    def test_large_embeddings_through_nats(self, dynamo_client):
        """
        Test large embeddings are handled correctly through NATS.

        This validates the NATS max_payload configuration (15MB) handles
        large embedding payloads. Rust unit tests can't test this E2E path.
        """
        # Create ~7MB embeddings (well under 10MB limit, but large enough to stress NATS)
        large_shape = (1700, 1024)  # ~6.6MB of float32 data
        large_embeds = torch.randn(large_shape, dtype=torch.float32)

        buffer = io.BytesIO()
        torch.save(large_embeds, buffer)
        buffer.seek(0)
        large_bytes = buffer.read()
        large_base64 = base64.b64encode(large_bytes).decode("utf-8")

        logger.info(
            f"Testing large embeddings: {len(large_bytes)/1024/1024:.2f}MB decoded"
        )

        response = dynamo_client.completions.create(
            model=TEST_MODEL,
            prompt="",
            max_tokens=5,
            extra_body={"prompt_embeds": large_base64},
        )

        assert response.choices, "Large embeddings should produce valid response"
        assert len(large_bytes) < 10 * 1024 * 1024, "Test data should be under 10MB"

    def test_concurrent_embeddings_requests(self, dynamo_client):
        """
        Test concurrent requests with embeddings are handled correctly.

        This validates the worker can handle multiple embedding requests
        simultaneously without race conditions or resource conflicts.
        """
        embeddings_base64 = create_embeddings_base64((10, 1024))

        def send_request():
            return dynamo_client.completions.create(
                model=TEST_MODEL,
                prompt="",
                max_tokens=5,
                extra_body={"prompt_embeds": embeddings_base64},
            )

        # Send 5 concurrent requests
        with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
            futures = [executor.submit(send_request) for _ in range(5)]
            results = [f.result() for f in concurrent.futures.as_completed(futures)]

        assert len(results) == 5, "All concurrent requests should complete"
        for response in results:
            assert response.choices, "Each response should have choices"
            assert len(response.choices[0].text) > 0, "Each response should have text"