kube.py 9.91 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

16
import os
17
from typing import Any, Dict
18
from unittest.mock import MagicMock, patch
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

import pytest

from dynamo.planner.kube import KubernetesAPI


@pytest.fixture
def mock_config():
    with patch("dynamo.planner.kube.config") as mock:
        mock.load_incluster_config = MagicMock()
        yield mock


@pytest.fixture
def mock_custom_api():
    with patch("dynamo.planner.kube.client.CustomObjectsApi") as mock:
        yield mock.return_value


@pytest.fixture
def k8s_api(mock_custom_api, mock_config):
    return KubernetesAPI()


43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
@pytest.fixture
def k8s_api_with_namespace(mock_custom_api, mock_config):
    return KubernetesAPI(k8s_namespace="test-namespace")


def test_kubernetes_api_init_with_namespace(mock_custom_api, mock_config):
    """Test KubernetesAPI initialization with custom namespace"""
    api = KubernetesAPI(k8s_namespace="custom-namespace")
    assert api.current_namespace == "custom-namespace"


def test_kubernetes_api_init_without_namespace(mock_custom_api, mock_config):
    """Test KubernetesAPI initialization without custom namespace"""
    api = KubernetesAPI()
    # Should use the default namespace logic
    assert api.current_namespace == "default"


def test_get_graph_deployment_from_name(k8s_api, mock_custom_api):
    """Test _get_graph_deployment_from_name method"""
    mock_deployment = {"metadata": {"name": "test-deployment"}}
    mock_custom_api.get_namespaced_custom_object.return_value = mock_deployment

    result = k8s_api._get_graph_deployment_from_name("test-deployment")

    assert result == mock_deployment
    mock_custom_api.get_namespaced_custom_object.assert_called_once_with(
        group="nvidia.com",
        version="v1alpha1",
        namespace=k8s_api.current_namespace,
        plural="dynamographdeployments",
        name="test-deployment",
    )


78
@pytest.mark.asyncio
79
80
81
async def test_is_deployment_ready_true(k8s_api, mock_custom_api):
    """Test is_deployment_ready method when deployment is ready"""
    # Mock the _get_graph_deployment_from_name response
82
83
84
85
86
87
88
89
    mock_deployment: Dict[str, Any] = {
        "status": {
            "conditions": [
                {"type": "Ready", "status": "True", "message": "Deployment is ready"}
            ]
        }
    }

90
91
92
93
94
95
    # Mock the method on the instance
    with patch.object(
        k8s_api, "_get_graph_deployment_from_name", return_value=mock_deployment
    ):
        result = await k8s_api.is_deployment_ready("test-deployment")
        assert result is True
96
97
98


@pytest.mark.asyncio
99
100
async def test_is_deployment_ready_false(k8s_api, mock_custom_api):
    """Test is_deployment_ready method when deployment is not ready"""
101
102
103
104
105
106
107
108
109
110
111
112
    mock_deployment: Dict[str, Any] = {
        "status": {
            "conditions": [
                {
                    "type": "Ready",
                    "status": "False",
                    "message": "Deployment is not ready",
                }
            ]
        }
    }

113
114
115
116
117
118
    # Mock the method on the instance
    with patch.object(
        k8s_api, "_get_graph_deployment_from_name", return_value=mock_deployment
    ):
        result = await k8s_api.is_deployment_ready("test-deployment")
        assert result is False
119

120
121
122
123
124
125
126
127
128
129

@pytest.mark.asyncio
async def test_is_deployment_ready_not_found(k8s_api, mock_custom_api):
    """Test is_deployment_ready method when deployment is not found"""
    # Mock the method on the instance
    with patch.object(k8s_api, "_get_graph_deployment_from_name", return_value=None):
        with pytest.raises(ValueError) as exc_info:
            await k8s_api.is_deployment_ready("test-deployment")

        assert "not found" in str(exc_info.value)
130
131
132


@pytest.mark.asyncio
133
134
135
136
137
138
139
140
141
142
async def test_wait_for_graph_deployment_ready_success(k8s_api, mock_custom_api):
    """Test wait_for_graph_deployment_ready when deployment becomes ready"""
    # Mock the _get_graph_deployment_from_name response
    mock_deployment: Dict[str, Any] = {
        "status": {
            "conditions": [
                {"type": "Ready", "status": "True", "message": "Deployment is ready"}
            ]
        }
    }
143

144
145
146
147
148
    # Mock the method on the instance
    with patch.object(
        k8s_api, "_get_graph_deployment_from_name", return_value=mock_deployment
    ):
        # Test with minimal attempts and delay for faster testing
149
150
151
152
        await k8s_api.wait_for_graph_deployment_ready(
            "test-deployment", max_attempts=2, delay_seconds=0.1
        )

153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194

@pytest.mark.asyncio
async def test_wait_for_graph_deployment_ready_timeout(k8s_api, mock_custom_api):
    """Test wait_for_graph_deployment_ready when deployment times out"""
    # Mock the _get_graph_deployment_from_name response with not ready status
    mock_deployment: Dict[str, Any] = {
        "status": {
            "conditions": [
                {
                    "type": "Ready",
                    "status": "False",
                    "message": "Deployment is not ready",
                }
            ]
        }
    }

    # Mock the method on the instance
    with patch.object(
        k8s_api, "_get_graph_deployment_from_name", return_value=mock_deployment
    ):
        # Test with minimal attempts and delay for faster testing
        with pytest.raises(TimeoutError) as exc_info:
            await k8s_api.wait_for_graph_deployment_ready(
                "test-deployment", max_attempts=2, delay_seconds=0.1
            )

        assert "is not ready after" in str(exc_info.value)


@pytest.mark.asyncio
async def test_wait_for_graph_deployment_not_found(k8s_api, mock_custom_api):
    """Test wait_for_graph_deployment_ready when deployment is not found"""
    # Mock the _get_graph_deployment_from_name response to return None
    with patch.object(k8s_api, "_get_graph_deployment_from_name", return_value=None):
        # Test with minimal attempts and delay for faster testing
        with pytest.raises(ValueError) as exc_info:
            await k8s_api.wait_for_graph_deployment_ready(
                "test-deployment", max_attempts=2, delay_seconds=0.1
            )

        assert "not found" in str(exc_info.value)
195
196
197
198


@pytest.mark.asyncio
async def test_wait_for_graph_deployment_no_conditions(k8s_api, mock_custom_api):
199
200
    """Test wait_for_graph_deployment_ready when deployment has no conditions"""
    # Mock the _get_graph_deployment_from_name response with no conditions
201
202
    mock_deployment: Dict[str, Any] = {"status": {}}

203
204
205
206
207
208
209
210
    with patch.object(
        k8s_api, "_get_graph_deployment_from_name", return_value=mock_deployment
    ):
        # Test with minimal attempts and delay for faster testing
        with pytest.raises(TimeoutError) as exc_info:
            await k8s_api.wait_for_graph_deployment_ready(
                "test-deployment", max_attempts=2, delay_seconds=0.1
            )
211

212
        assert "is not ready after" in str(exc_info.value)
213
214
215
216
217
218


@pytest.mark.asyncio
async def test_wait_for_graph_deployment_ready_on_second_attempt(
    k8s_api, mock_custom_api
):
219
220
    """Test wait_for_graph_deployment_ready when deployment becomes ready on second attempt"""
    # Mock the _get_graph_deployment_from_name response to return not ready first, then ready
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
    mock_deployment_not_ready: Dict[str, Any] = {
        "status": {
            "conditions": [
                {
                    "type": "Ready",
                    "status": "False",
                    "message": "Deployment is not ready",
                }
            ]
        }
    }
    mock_deployment_ready: Dict[str, Any] = {
        "status": {
            "conditions": [
                {"type": "Ready", "status": "True", "message": "Deployment is ready"}
            ]
        }
    }

240
241
242
243
244
245
246
247
248
    with patch.object(
        k8s_api,
        "_get_graph_deployment_from_name",
        side_effect=[mock_deployment_not_ready, mock_deployment_ready],
    ):
        # Test with minimal attempts and delay for faster testing
        await k8s_api.wait_for_graph_deployment_ready(
            "test-deployment", max_attempts=2, delay_seconds=0.1
        )
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285


@pytest.mark.asyncio
async def test_get_parent_graph_deployment_with_env_var(k8s_api, mock_custom_api):
    """Test get_parent_graph_deployment with environment variable set"""
    mock_deployment = {"metadata": {"name": "parent-dgd"}}

    with patch.dict(os.environ, {"DYN_PARENT_DGD_K8S_NAME": "parent-dgd"}):
        with patch.object(
            k8s_api, "_get_graph_deployment_from_name", return_value=mock_deployment
        ) as mock_get:
            result = await k8s_api.get_parent_graph_deployment()

            assert result == mock_deployment
            mock_get.assert_called_once_with("parent-dgd")


@pytest.mark.asyncio
async def test_get_parent_graph_deployment_without_env_var(k8s_api, mock_custom_api):
    """Test get_parent_graph_deployment without environment variable"""
    with patch.dict(os.environ, {}, clear=True):
        result = await k8s_api.get_parent_graph_deployment()
        assert result is None


@pytest.mark.asyncio
async def test_get_graph_deployment_delegates_to_parent(k8s_api, mock_custom_api):
    """Test get_graph_deployment delegates to get_parent_graph_deployment"""
    mock_deployment = {"metadata": {"name": "parent-dgd"}}

    with patch.object(
        k8s_api, "get_parent_graph_deployment", return_value=mock_deployment
    ) as mock_parent:
        result = await k8s_api.get_graph_deployment()

        assert result == mock_deployment
        mock_parent.assert_called_once()