kube.py 12.4 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Any, Dict
17
from unittest.mock import MagicMock, patch
18
19

import pytest
20
from kubernetes import client
21
22

from dynamo.planner.kube import KubernetesAPI
23
from dynamo.planner.utils.exceptions import DynamoGraphDeploymentNotFoundError
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43


@pytest.fixture
def mock_config():
    with patch("dynamo.planner.kube.config") as mock:
        mock.load_incluster_config = MagicMock()
        yield mock


@pytest.fixture
def mock_custom_api():
    with patch("dynamo.planner.kube.client.CustomObjectsApi") as mock:
        yield mock.return_value


@pytest.fixture
def k8s_api(mock_custom_api, mock_config):
    return KubernetesAPI()


44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
@pytest.fixture
def k8s_api_with_namespace(mock_custom_api, mock_config):
    return KubernetesAPI(k8s_namespace="test-namespace")


def test_kubernetes_api_init_with_namespace(mock_custom_api, mock_config):
    """Test KubernetesAPI initialization with custom namespace"""
    api = KubernetesAPI(k8s_namespace="custom-namespace")
    assert api.current_namespace == "custom-namespace"


def test_kubernetes_api_init_without_namespace(mock_custom_api, mock_config):
    """Test KubernetesAPI initialization without custom namespace"""
    api = KubernetesAPI()
    # Should use the default namespace logic
    assert api.current_namespace == "default"


def test_get_graph_deployment_from_name(k8s_api, mock_custom_api):
    """Test _get_graph_deployment_from_name method"""
    mock_deployment = {"metadata": {"name": "test-deployment"}}
    mock_custom_api.get_namespaced_custom_object.return_value = mock_deployment

    result = k8s_api._get_graph_deployment_from_name("test-deployment")

    assert result == mock_deployment
    mock_custom_api.get_namespaced_custom_object.assert_called_once_with(
        group="nvidia.com",
        version="v1alpha1",
        namespace=k8s_api.current_namespace,
        plural="dynamographdeployments",
        name="test-deployment",
    )


79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
def test_update_service_replicas_uses_dgdsa_scale(k8s_api, mock_custom_api):
    """Test that update_service_replicas uses DGDSA Scale API when available"""
    mock_custom_api.patch_namespaced_custom_object_scale.return_value = None

    k8s_api.update_service_replicas("test-deployment", "Frontend", 3)

    # Should use Scale subresource with lowercase adapter name
    mock_custom_api.patch_namespaced_custom_object_scale.assert_called_once_with(
        group="nvidia.com",
        version="v1alpha1",
        namespace=k8s_api.current_namespace,
        plural="dynamographdeploymentscalingadapters",
        name="test-deployment-frontend",  # lowercase service name
        body={"spec": {"replicas": 3}},
    )
    # Should NOT fall back to DGD patch
    mock_custom_api.patch_namespaced_custom_object.assert_not_called()


def test_update_service_replicas_fallback_to_dgd(k8s_api, mock_custom_api):
    """Test that update_service_replicas falls back to DGD when DGDSA not found"""
    # DGDSA doesn't exist (404)
    mock_custom_api.patch_namespaced_custom_object_scale.side_effect = (
        client.ApiException(status=404)
    )
104
105
    mock_custom_api.patch_namespaced_custom_object.return_value = None

106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
    k8s_api.update_service_replicas("test-deployment", "test-component", 1)

    # Should have tried DGDSA first
    mock_custom_api.patch_namespaced_custom_object_scale.assert_called_once()

    # Should fall back to DGD patch
    mock_custom_api.patch_namespaced_custom_object.assert_called_once_with(
        group="nvidia.com",
        version="v1alpha1",
        namespace=k8s_api.current_namespace,
        plural="dynamographdeployments",
        name="test-deployment",
        body={"spec": {"services": {"test-component": {"replicas": 1}}}},
    )


def test_update_service_replicas_propagates_other_errors(k8s_api, mock_custom_api):
    """Test that update_service_replicas propagates non-404 errors"""
    mock_custom_api.patch_namespaced_custom_object_scale.side_effect = (
        client.ApiException(status=500, reason="Internal Server Error")
    )

    with pytest.raises(client.ApiException) as exc_info:
        k8s_api.update_service_replicas("test-deployment", "test-component", 1)

    assert exc_info.value.status == 500
    # Should NOT fall back to DGD
    mock_custom_api.patch_namespaced_custom_object.assert_not_called()


def test_update_graph_replicas_calls_update_service_replicas(k8s_api, mock_custom_api):
    """Test that deprecated update_graph_replicas calls update_service_replicas"""
    mock_custom_api.patch_namespaced_custom_object_scale.return_value = None

    # Use the deprecated method
141
142
    k8s_api.update_graph_replicas("test-deployment", "test-component", 1)

143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
    # Should delegate to update_service_replicas which uses Scale API
    mock_custom_api.patch_namespaced_custom_object_scale.assert_called_once_with(
        group="nvidia.com",
        version="v1alpha1",
        namespace=k8s_api.current_namespace,
        plural="dynamographdeploymentscalingadapters",
        name="test-deployment-test-component",
        body={"spec": {"replicas": 1}},
    )


def test_update_dgd_replicas_directly(k8s_api, mock_custom_api):
    """Test the internal _update_dgd_replicas method"""
    mock_custom_api.patch_namespaced_custom_object.return_value = None

    k8s_api._update_dgd_replicas("test-deployment", "test-component", 1)

160
161
162
163
164
165
166
167
168
169
    mock_custom_api.patch_namespaced_custom_object.assert_called_once_with(
        group="nvidia.com",
        version="v1alpha1",
        namespace=k8s_api.current_namespace,
        plural="dynamographdeployments",
        name="test-deployment",
        body={"spec": {"services": {"test-component": {"replicas": 1}}}},
    )


170
@pytest.mark.asyncio
171
172
173
async def test_is_deployment_ready_true(k8s_api, mock_custom_api):
    """Test is_deployment_ready method when deployment is ready"""
    # Mock the _get_graph_deployment_from_name response
174
175
176
177
178
179
180
181
    mock_deployment: Dict[str, Any] = {
        "status": {
            "conditions": [
                {"type": "Ready", "status": "True", "message": "Deployment is ready"}
            ]
        }
    }

182
183
    result = k8s_api.is_deployment_ready(mock_deployment)
    assert result is True
184
185
186


@pytest.mark.asyncio
187
188
async def test_is_deployment_ready_false(k8s_api, mock_custom_api):
    """Test is_deployment_ready method when deployment is not ready"""
189
190
191
192
193
194
195
196
197
198
199
    mock_deployment: Dict[str, Any] = {
        "status": {
            "conditions": [
                {
                    "type": "Ready",
                    "status": "False",
                    "message": "Deployment is not ready",
                }
            ]
        }
    }
200
201
    result = k8s_api.is_deployment_ready(mock_deployment)
    assert result is False
202
203
204


@pytest.mark.asyncio
205
206
207
208
209
210
211
212
213
214
async def test_wait_for_graph_deployment_ready_success(k8s_api, mock_custom_api):
    """Test wait_for_graph_deployment_ready when deployment becomes ready"""
    # Mock the _get_graph_deployment_from_name response
    mock_deployment: Dict[str, Any] = {
        "status": {
            "conditions": [
                {"type": "Ready", "status": "True", "message": "Deployment is ready"}
            ]
        }
    }
215

216
    # Mock the method on the instance
217
    with patch.object(k8s_api, "get_graph_deployment", return_value=mock_deployment):
218
        # Test with minimal attempts and delay for faster testing
219
220
221
222
        await k8s_api.wait_for_graph_deployment_ready(
            "test-deployment", max_attempts=2, delay_seconds=0.1
        )

223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240

@pytest.mark.asyncio
async def test_wait_for_graph_deployment_ready_timeout(k8s_api, mock_custom_api):
    """Test wait_for_graph_deployment_ready when deployment times out"""
    # Mock the _get_graph_deployment_from_name response with not ready status
    mock_deployment: Dict[str, Any] = {
        "status": {
            "conditions": [
                {
                    "type": "Ready",
                    "status": "False",
                    "message": "Deployment is not ready",
                }
            ]
        }
    }

    # Mock the method on the instance
241
    with patch.object(k8s_api, "get_graph_deployment", return_value=mock_deployment):
242
243
244
245
246
247
248
249
250
251
252
253
254
        # Test with minimal attempts and delay for faster testing
        with pytest.raises(TimeoutError) as exc_info:
            await k8s_api.wait_for_graph_deployment_ready(
                "test-deployment", max_attempts=2, delay_seconds=0.1
            )

        assert "is not ready after" in str(exc_info.value)


@pytest.mark.asyncio
async def test_wait_for_graph_deployment_not_found(k8s_api, mock_custom_api):
    """Test wait_for_graph_deployment_ready when deployment is not found"""

255
256
257
258
259
260
261
262
263
264
265
266
267
268
    mock_custom_api.get_namespaced_custom_object.side_effect = client.ApiException(
        status=404
    )

    # Test with minimal attempts and delay for faster testing
    with pytest.raises(DynamoGraphDeploymentNotFoundError) as exc_info:
        await k8s_api.wait_for_graph_deployment_ready(
            "test-deployment", max_attempts=2, delay_seconds=0.1
        )

    # Validate the exception fields
    exception = exc_info.value
    assert exception.deployment_name == "test-deployment"
    assert exception.namespace == "default"
269
270
271
272


@pytest.mark.asyncio
async def test_wait_for_graph_deployment_no_conditions(k8s_api, mock_custom_api):
273
274
    """Test wait_for_graph_deployment_ready when deployment has no conditions"""
    # Mock the _get_graph_deployment_from_name response with no conditions
275
276
    mock_deployment: Dict[str, Any] = {"status": {}}

277
    with patch.object(k8s_api, "get_graph_deployment", return_value=mock_deployment):
278
279
280
281
282
        # Test with minimal attempts and delay for faster testing
        with pytest.raises(TimeoutError) as exc_info:
            await k8s_api.wait_for_graph_deployment_ready(
                "test-deployment", max_attempts=2, delay_seconds=0.1
            )
283

284
        assert "is not ready after" in str(exc_info.value)
285
286
287
288
289
290


@pytest.mark.asyncio
async def test_wait_for_graph_deployment_ready_on_second_attempt(
    k8s_api, mock_custom_api
):
291
292
    """Test wait_for_graph_deployment_ready when deployment becomes ready on second attempt"""
    # Mock the _get_graph_deployment_from_name response to return not ready first, then ready
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
    mock_deployment_not_ready: Dict[str, Any] = {
        "status": {
            "conditions": [
                {
                    "type": "Ready",
                    "status": "False",
                    "message": "Deployment is not ready",
                }
            ]
        }
    }
    mock_deployment_ready: Dict[str, Any] = {
        "status": {
            "conditions": [
                {"type": "Ready", "status": "True", "message": "Deployment is ready"}
            ]
        }
    }

312
313
314
315
316
317
318
319
320
    with patch.object(
        k8s_api,
        "_get_graph_deployment_from_name",
        side_effect=[mock_deployment_not_ready, mock_deployment_ready],
    ):
        # Test with minimal attempts and delay for faster testing
        await k8s_api.wait_for_graph_deployment_ready(
            "test-deployment", max_attempts=2, delay_seconds=0.1
        )
321
322
323


@pytest.mark.asyncio
324
325
async def test_get_graph_deployment(k8s_api, mock_custom_api):
    """Test get_graph_deployment"""
326
327
    mock_deployment = {"metadata": {"name": "parent-dgd"}}

328
329
330
331
    with patch.object(
        k8s_api, "_get_graph_deployment_from_name", return_value=mock_deployment
    ) as mock_get:
        result = await k8s_api.get_graph_deployment("parent-dgd")
332

333
334
        assert result == mock_deployment
        mock_get.assert_called_once_with("parent-dgd")
335
336
337


@pytest.mark.asyncio
338
339
340
341
342
343
344
async def test_get_graph_deployment_not_found(k8s_api, mock_custom_api):
    """Test get_graph_deployment when deployment is not found"""
    k8s_api.custom_api.get_namespaced_custom_object.side_effect = client.ApiException(
        status=404
    )
    with pytest.raises(DynamoGraphDeploymentNotFoundError) as exc_info:
        await k8s_api.get_graph_deployment("parent-dgd")
345

346
347
348
    exception = exc_info.value
    assert exception.deployment_name == "parent-dgd"
    assert exception.namespace == "default"