kube.py 9.33 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Any, Dict
17
from unittest.mock import MagicMock, patch
18
19

import pytest
20
from kubernetes import client
21
22

from dynamo.planner.kube import KubernetesAPI
23
from dynamo.planner.utils.exceptions import DynamoGraphDeploymentNotFoundError
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43


@pytest.fixture
def mock_config():
    with patch("dynamo.planner.kube.config") as mock:
        mock.load_incluster_config = MagicMock()
        yield mock


@pytest.fixture
def mock_custom_api():
    with patch("dynamo.planner.kube.client.CustomObjectsApi") as mock:
        yield mock.return_value


@pytest.fixture
def k8s_api(mock_custom_api, mock_config):
    return KubernetesAPI()


44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
@pytest.fixture
def k8s_api_with_namespace(mock_custom_api, mock_config):
    return KubernetesAPI(k8s_namespace="test-namespace")


def test_kubernetes_api_init_with_namespace(mock_custom_api, mock_config):
    """Test KubernetesAPI initialization with custom namespace"""
    api = KubernetesAPI(k8s_namespace="custom-namespace")
    assert api.current_namespace == "custom-namespace"


def test_kubernetes_api_init_without_namespace(mock_custom_api, mock_config):
    """Test KubernetesAPI initialization without custom namespace"""
    api = KubernetesAPI()
    # Should use the default namespace logic
    assert api.current_namespace == "default"


def test_get_graph_deployment_from_name(k8s_api, mock_custom_api):
    """Test _get_graph_deployment_from_name method"""
    mock_deployment = {"metadata": {"name": "test-deployment"}}
    mock_custom_api.get_namespaced_custom_object.return_value = mock_deployment

    result = k8s_api._get_graph_deployment_from_name("test-deployment")

    assert result == mock_deployment
    mock_custom_api.get_namespaced_custom_object.assert_called_once_with(
        group="nvidia.com",
        version="v1alpha1",
        namespace=k8s_api.current_namespace,
        plural="dynamographdeployments",
        name="test-deployment",
    )


79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def test_update_graph_replicas(k8s_api, mock_custom_api):
    mock_custom_api.patch_namespaced_custom_object.return_value = None

    k8s_api.update_graph_replicas("test-deployment", "test-component", 1)

    mock_custom_api.patch_namespaced_custom_object.assert_called_once_with(
        group="nvidia.com",
        version="v1alpha1",
        namespace=k8s_api.current_namespace,
        plural="dynamographdeployments",
        name="test-deployment",
        body={"spec": {"services": {"test-component": {"replicas": 1}}}},
    )


94
@pytest.mark.asyncio
95
96
97
async def test_is_deployment_ready_true(k8s_api, mock_custom_api):
    """Test is_deployment_ready method when deployment is ready"""
    # Mock the _get_graph_deployment_from_name response
98
99
100
101
102
103
104
105
    mock_deployment: Dict[str, Any] = {
        "status": {
            "conditions": [
                {"type": "Ready", "status": "True", "message": "Deployment is ready"}
            ]
        }
    }

106
107
    result = k8s_api.is_deployment_ready(mock_deployment)
    assert result is True
108
109
110


@pytest.mark.asyncio
111
112
async def test_is_deployment_ready_false(k8s_api, mock_custom_api):
    """Test is_deployment_ready method when deployment is not ready"""
113
114
115
116
117
118
119
120
121
122
123
    mock_deployment: Dict[str, Any] = {
        "status": {
            "conditions": [
                {
                    "type": "Ready",
                    "status": "False",
                    "message": "Deployment is not ready",
                }
            ]
        }
    }
124
125
    result = k8s_api.is_deployment_ready(mock_deployment)
    assert result is False
126
127
128


@pytest.mark.asyncio
129
130
131
132
133
134
135
136
137
138
async def test_wait_for_graph_deployment_ready_success(k8s_api, mock_custom_api):
    """Test wait_for_graph_deployment_ready when deployment becomes ready"""
    # Mock the _get_graph_deployment_from_name response
    mock_deployment: Dict[str, Any] = {
        "status": {
            "conditions": [
                {"type": "Ready", "status": "True", "message": "Deployment is ready"}
            ]
        }
    }
139

140
    # Mock the method on the instance
141
    with patch.object(k8s_api, "get_graph_deployment", return_value=mock_deployment):
142
        # Test with minimal attempts and delay for faster testing
143
144
145
146
        await k8s_api.wait_for_graph_deployment_ready(
            "test-deployment", max_attempts=2, delay_seconds=0.1
        )

147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164

@pytest.mark.asyncio
async def test_wait_for_graph_deployment_ready_timeout(k8s_api, mock_custom_api):
    """Test wait_for_graph_deployment_ready when deployment times out"""
    # Mock the _get_graph_deployment_from_name response with not ready status
    mock_deployment: Dict[str, Any] = {
        "status": {
            "conditions": [
                {
                    "type": "Ready",
                    "status": "False",
                    "message": "Deployment is not ready",
                }
            ]
        }
    }

    # Mock the method on the instance
165
    with patch.object(k8s_api, "get_graph_deployment", return_value=mock_deployment):
166
167
168
169
170
171
172
173
174
175
176
177
178
        # Test with minimal attempts and delay for faster testing
        with pytest.raises(TimeoutError) as exc_info:
            await k8s_api.wait_for_graph_deployment_ready(
                "test-deployment", max_attempts=2, delay_seconds=0.1
            )

        assert "is not ready after" in str(exc_info.value)


@pytest.mark.asyncio
async def test_wait_for_graph_deployment_not_found(k8s_api, mock_custom_api):
    """Test wait_for_graph_deployment_ready when deployment is not found"""

179
180
181
182
183
184
185
186
187
188
189
190
191
192
    mock_custom_api.get_namespaced_custom_object.side_effect = client.ApiException(
        status=404
    )

    # Test with minimal attempts and delay for faster testing
    with pytest.raises(DynamoGraphDeploymentNotFoundError) as exc_info:
        await k8s_api.wait_for_graph_deployment_ready(
            "test-deployment", max_attempts=2, delay_seconds=0.1
        )

    # Validate the exception fields
    exception = exc_info.value
    assert exception.deployment_name == "test-deployment"
    assert exception.namespace == "default"
193
194
195
196


@pytest.mark.asyncio
async def test_wait_for_graph_deployment_no_conditions(k8s_api, mock_custom_api):
197
198
    """Test wait_for_graph_deployment_ready when deployment has no conditions"""
    # Mock the _get_graph_deployment_from_name response with no conditions
199
200
    mock_deployment: Dict[str, Any] = {"status": {}}

201
    with patch.object(k8s_api, "get_graph_deployment", return_value=mock_deployment):
202
203
204
205
206
        # Test with minimal attempts and delay for faster testing
        with pytest.raises(TimeoutError) as exc_info:
            await k8s_api.wait_for_graph_deployment_ready(
                "test-deployment", max_attempts=2, delay_seconds=0.1
            )
207

208
        assert "is not ready after" in str(exc_info.value)
209
210
211
212
213
214


@pytest.mark.asyncio
async def test_wait_for_graph_deployment_ready_on_second_attempt(
    k8s_api, mock_custom_api
):
215
216
    """Test wait_for_graph_deployment_ready when deployment becomes ready on second attempt"""
    # Mock the _get_graph_deployment_from_name response to return not ready first, then ready
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
    mock_deployment_not_ready: Dict[str, Any] = {
        "status": {
            "conditions": [
                {
                    "type": "Ready",
                    "status": "False",
                    "message": "Deployment is not ready",
                }
            ]
        }
    }
    mock_deployment_ready: Dict[str, Any] = {
        "status": {
            "conditions": [
                {"type": "Ready", "status": "True", "message": "Deployment is ready"}
            ]
        }
    }

236
237
238
239
240
241
242
243
244
    with patch.object(
        k8s_api,
        "_get_graph_deployment_from_name",
        side_effect=[mock_deployment_not_ready, mock_deployment_ready],
    ):
        # Test with minimal attempts and delay for faster testing
        await k8s_api.wait_for_graph_deployment_ready(
            "test-deployment", max_attempts=2, delay_seconds=0.1
        )
245
246
247


@pytest.mark.asyncio
248
249
async def test_get_graph_deployment(k8s_api, mock_custom_api):
    """Test get_graph_deployment"""
250
251
    mock_deployment = {"metadata": {"name": "parent-dgd"}}

252
253
254
255
    with patch.object(
        k8s_api, "_get_graph_deployment_from_name", return_value=mock_deployment
    ) as mock_get:
        result = await k8s_api.get_graph_deployment("parent-dgd")
256

257
258
        assert result == mock_deployment
        mock_get.assert_called_once_with("parent-dgd")
259
260
261


@pytest.mark.asyncio
262
263
264
265
266
267
268
async def test_get_graph_deployment_not_found(k8s_api, mock_custom_api):
    """Test get_graph_deployment when deployment is not found"""
    k8s_api.custom_api.get_namespaced_custom_object.side_effect = client.ApiException(
        status=404
    )
    with pytest.raises(DynamoGraphDeploymentNotFoundError) as exc_info:
        await k8s_api.get_graph_deployment("parent-dgd")
269

270
271
272
    exception = exc_info.value
    assert exception.deployment_name == "parent-dgd"
    assert exception.namespace == "default"