test_dgdr_validation.py 12.5 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

"""
Webhook validation and version conversion tests for DGDR v1beta1.

These tests verify that:
- The admission webhook correctly accepts/rejects DGDR specs (TestDGDRValidation)
- v1alpha1 resources are transparently converted to v1beta1 (TestDGDRVersionConversion)

No GPU or cluster profiling is required (gpu_0 only).  The only prerequisite is a
running Kubernetes cluster with the Dynamo operator CRDs and webhooks installed.

Run:
  pytest tests/dgdr/test_dgdr_validation.py -m gpu_0 -v --dgdr-namespace=default --dgdr-image=<image>

Test markers:
  gpu_0       No GPU required
  nightly     Requires live K8s cluster (not run in general pre-merge CI)
  integration Integration-level (uses live webhook)
"""

from __future__ import annotations

import json
import logging

import pytest
import yaml
from kubernetes_asyncio.client import exceptions as k8s_exceptions

from tests.dgdr.conftest import (
    DGDR_API_VERSION,
    DGDR_SHORT_NAME,
    _run_kubectl,
    build_dgdr_manifest,
    unique_dgdr_name,
)
from tests.utils.managed_deployment import ManagedDGDR

logger = logging.getLogger(__name__)


# ---------------------------------------------------------------------------
# ── Group 1: Webhook Validation (gpu_0, no profiling required) ──────────────
# ---------------------------------------------------------------------------


@pytest.mark.gpu_0
@pytest.mark.nightly
@pytest.mark.integration
@pytest.mark.k8s
class TestDGDRValidation:
    """
    Tests that verify the admission webhook correctly validates DGDR specs
    before they are persisted.  These tests use server-side dry-run so no
    resources are actually created.
    """

    def test_missing_model_rejected(
        self, managed_dgdr: ManagedDGDR, dgdr_image: str
    ) -> None:
        """
        A DGDR without spec.model must be rejected by the webhook.
        The model field is the only hard-required spec field in v1beta1.
        """
        manifest = build_dgdr_manifest(
            unique_dgdr_name("no-model"),
            model="",  # intentionally empty
            image=dgdr_image,
        )
        # Clear model so the field is absent
        del manifest["spec"]["model"]

        with pytest.raises(k8s_exceptions.ApiException):
            managed_dgdr.run(managed_dgdr.server_dry_run(manifest))

    def test_thorough_with_auto_backend_rejected(
        self, managed_dgdr: ManagedDGDR, dgdr_image: str, dgdr_model: str
    ) -> None:
        """
        searchStrategy: thorough + backend: auto must be rejected.
        'thorough' sweeps real GPU engines and requires a concrete backend.
        """
        manifest = build_dgdr_manifest(
            unique_dgdr_name("thorough-auto"),
            model=dgdr_model,
            image=dgdr_image,
            backend="auto",
            search_strategy="thorough",
        )
        with pytest.raises(k8s_exceptions.ApiException) as exc_info:
            managed_dgdr.run(managed_dgdr.server_dry_run(manifest))
        error_body = str(exc_info.value)
        assert (
            "auto" in error_body.lower()
            or "backend" in error_body.lower()
            or "thorough" in error_body.lower()
        ), f"Error message should mention backend/thorough incompatibility. Got: {error_body}"

    def test_invalid_backend_rejected(
        self, managed_dgdr: ManagedDGDR, dgdr_image: str, dgdr_model: str
    ) -> None:
        """
        An unknown backend value must be rejected by the admission webhook.
        Valid values: auto, vllm, sglang, trtllm.
        """
        manifest = build_dgdr_manifest(
            unique_dgdr_name("bad-backend"),
            model=dgdr_model,
            image=dgdr_image,
            backend="unknown_backend",
        )
        with pytest.raises(k8s_exceptions.ApiException):
            managed_dgdr.run(managed_dgdr.server_dry_run(manifest))

    def test_invalid_search_strategy_rejected(
        self, managed_dgdr: ManagedDGDR, dgdr_image: str, dgdr_model: str
    ) -> None:
        """
        An unknown searchStrategy value must be rejected by the admission webhook.
        """
        manifest = build_dgdr_manifest(
            unique_dgdr_name("bad-strategy"),
            model=dgdr_model,
            image=dgdr_image,
            search_strategy="superfast",  # not a valid strategy
        )
        with pytest.raises(k8s_exceptions.ApiException):
            managed_dgdr.run(managed_dgdr.server_dry_run(manifest))

    def test_invalid_optimization_type_rejected(
        self, managed_dgdr: ManagedDGDR, dgdr_image: str, dgdr_model: str
    ) -> None:
        """
        An invalid sla.optimizationType value must be rejected by the
        admission webhook. Valid values: latency, throughput.
        """
        manifest = build_dgdr_manifest(
            unique_dgdr_name("bad-opt-type"),
            model=dgdr_model,
            image=dgdr_image,
            sla={"optimizationType": "cost"},  # not valid
        )
        with pytest.raises(k8s_exceptions.ApiException):
            managed_dgdr.run(managed_dgdr.server_dry_run(manifest))

    def test_valid_minimal_dgdr_accepted(
        self, managed_dgdr: ManagedDGDR, dgdr_image: str, dgdr_model: str
    ) -> None:
        """
        A DGDR with only the required fields (model + image) must pass validation.
        All other fields have defaults and are optional.
        """
        manifest = build_dgdr_manifest(
            unique_dgdr_name("valid-minimal"),
            model=dgdr_model,
            image=dgdr_image,
        )
        # Should not raise — accepted by the webhook
        managed_dgdr.run(managed_dgdr.server_dry_run(manifest))

    def test_valid_full_spec_accepted(
        self, managed_dgdr: ManagedDGDR, dgdr_image: str, dgdr_model: str
    ) -> None:
        """
        A fully-specified v1beta1 DGDR should pass webhook validation.
        Exercises every top-level optional field.
        """
        manifest = build_dgdr_manifest(
            unique_dgdr_name("valid-full"),
            model=dgdr_model,
            image=dgdr_image,
            backend="vllm",
            search_strategy="rapid",
            sla={"ttft": 200.0, "itl": 20.0},
            workload={"isl": 3000, "osl": 150},
            features={
                "planner": {"plannerPreDeploymentSweeping": "rapid"},
                "mocker": {"enabled": False},
            },
            hardware={"numGpusPerNode": 8},
            auto_apply=True,
        )
        # Should not raise — accepted by the webhook
        managed_dgdr.run(managed_dgdr.server_dry_run(manifest))

    def test_v1beta1_is_storage_version(self, dgdr_namespace: str) -> None:
        """
        The CRD's storage version must be v1beta1 (it is the conversion hub).
        """
        result = _run_kubectl(
            [
                "get",
                "crd",
                "dynamographdeploymentrequests.nvidia.com",
                "-o",
                "jsonpath={.status.storedVersions}",
            ],
            check=False,
        )
        assert result.returncode == 0, f"Failed to get CRD: {result.stderr}"
        assert (
            "v1beta1" in result.stdout
        ), f"v1beta1 should be the storage version. Got: {result.stdout}"

    def test_kubectl_shortname_dgdr_works(self, dgdr_namespace: str) -> None:
        """
        kubectl get dgdr must work (tests the shortName 'dgdr' in the CRD).
        """
        result = _run_kubectl(
            ["get", DGDR_SHORT_NAME, "-n", dgdr_namespace, "--ignore-not-found"],
            check=False,
        )
        assert (
            result.returncode == 0
        ), f"kubectl get dgdr failed (shortname may not be registered). stderr: {result.stderr}"

    def test_kubectl_get_columns_schema(
        self, dgdr_namespace: str, dgdr_image: str, dgdr_model: str, dgdr_factory
    ) -> None:
        """
        kubectl get dgdr should output the columns defined in the CRD:
        NAME, MODEL, BACKEND, PHASE, PROFILING, DGD, AGE.
        """
        name = unique_dgdr_name("col-test")
        manifest = build_dgdr_manifest(name, model=dgdr_model, image=dgdr_image)
        dgdr_factory(manifest)

        result = _run_kubectl(
            ["get", DGDR_SHORT_NAME, name, "-n", dgdr_namespace],
            check=False,
        )
        assert result.returncode == 0, f"kubectl get dgdr failed: {result.stderr}"

        header = (
            result.stdout.splitlines()[0].upper() if result.stdout.splitlines() else ""
        )
        expected_columns = {"NAME", "MODEL", "BACKEND", "PHASE"}
        for col in expected_columns:
            assert (
                col in header
            ), f"Expected column {col!r} in kubectl output header. Got: {header}"


# ---------------------------------------------------------------------------
# ── Group 2: v1alpha1 → v1beta1 Version Conversion ─────────────────────────
# ---------------------------------------------------------------------------


@pytest.mark.gpu_0
@pytest.mark.nightly
@pytest.mark.integration
@pytest.mark.k8s
class TestDGDRVersionConversion:
    """
    Tests that v1alpha1 DGDR resources can be submitted and are stored
    transparently as v1beta1 (conversion hub).  No profiling required.
    """

    def test_v1alpha1_dgdr_can_be_applied(
        self, dgdr_namespace: str, dgdr_image: str, dgdr_model: str, dgdr_factory
    ) -> None:
        """
        A v1alpha1 DynamoGraphDeploymentRequest should be accepted and
        automatically converted to v1beta1 storage by the conversion webhook.

        Note: v1alpha1 manifests use a different spec shape (profilingConfig
        instead of image) so we must use kubectl here rather than the
        v1beta1-only ManagedDGDR client.
        """
        name = unique_dgdr_name("v1a1")
        v1alpha1_manifest = {
            "apiVersion": "nvidia.com/v1alpha1",
            "kind": "DynamoGraphDeploymentRequest",
            "metadata": {"name": name},
            "spec": {
                "model": dgdr_model,
                "backend": "vllm",
                "profilingConfig": {
                    "profilerImage": dgdr_image,
                },
            },
        }
        yaml_str = yaml.dump(v1alpha1_manifest)
        result = _run_kubectl(
            ["apply", "-n", dgdr_namespace, "-f", "-"], input=yaml_str, check=False
        )
        if result.returncode == 0:
            # Register for cleanup without re-creating (resource already exists)
            dgdr_factory.register_for_cleanup(name)
        # Either accepted (0) or rejected for a known conversion reason – just not a 500
        assert result.returncode in (
            0,
            1,
        ), f"Unexpected error applying v1alpha1 DGDR: {result.stderr}"

    def test_v1beta1_get_on_v1alpha1_object(
        self,
        managed_dgdr: ManagedDGDR,
        dgdr_namespace: str,
        dgdr_image: str,
        dgdr_model: str,
        dgdr_factory,
    ) -> None:
        """
        A resource stored as v1beta1 must be retrievable as v1alpha1 via conversion.
        """
        name = unique_dgdr_name("conv-get")
        manifest = build_dgdr_manifest(name, model=dgdr_model, image=dgdr_image)
        dgdr_factory(manifest)

        # Retrieve as v1beta1 (storage version) via ManagedDGDR
        obj_v1beta1 = managed_dgdr.run(managed_dgdr.get(name))
        assert obj_v1beta1 is not None
        assert obj_v1beta1["apiVersion"] == DGDR_API_VERSION

        # Retrieve as v1alpha1 (should trigger conversion webhook).
        # Must use kubectl here since ManagedDGDR targets v1beta1 only.
        result = _run_kubectl(
            [
                "get",
                "dynamographdeploymentrequests.v1alpha1.nvidia.com",
                name,
                "-n",
                dgdr_namespace,
                "-o",
                "json",
            ],
            check=False,
        )
        # If the conversion webhook is working, we get a 200 with v1alpha1 resource.
        # If not registered, we may get a 404 - that is also acceptable here as
        # some cluster configs only register v1beta1.
        assert result.returncode in (
            0,
            1,
        ), f"Unexpected failure getting v1alpha1 DGDR: {result.stderr}"
        if result.returncode == 0:
            obj_v1alpha1 = json.loads(result.stdout)
            assert (
                obj_v1alpha1["apiVersion"] == "nvidia.com/v1alpha1"
            ), "Retrieved object should have v1alpha1 apiVersion"