"examples/vscode:/vscode.git/clone" did not exist on "4851c202bbfb5dadc5a3b3b4409d3f083f4e8e90"
Unverified Commit 14e1d446 authored by julienmancuso's avatar julienmancuso Committed by GitHub
Browse files

fix: add blocking mode for k8s connector in planner (#1176)

parent 3d4fe574
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import asyncio
from typing import Optional from typing import Optional
from kubernetes import client, config from kubernetes import client, config
...@@ -125,3 +126,39 @@ class KubernetesAPI: ...@@ -125,3 +126,39 @@ class KubernetesAPI:
name=graph_deployment_name, name=graph_deployment_name,
body=patch, body=patch,
) )
async def wait_for_graph_deployment_ready(
self,
graph_deployment_name: str,
max_attempts: int = 60, # default: 10 minutes total
delay_seconds: int = 10, # default: check every 10 seconds
) -> None:
"""Wait for a graph deployment to be ready"""
for attempt in range(max_attempts):
await asyncio.sleep(delay_seconds)
graph_deployment = await self.get_graph_deployment(
graph_deployment_name, self.current_namespace
)
if not graph_deployment:
raise ValueError(f"Graph deployment {graph_deployment_name} not found")
conditions = graph_deployment.get("status", {}).get("conditions", [])
ready_condition = next(
(c for c in conditions if c.get("type") == "Ready"), None
)
if ready_condition and ready_condition.get("status") == "True":
return # Deployment is ready
print(
f"[Attempt {attempt + 1}/{max_attempts}] "
f"(status: {ready_condition.get('status') if ready_condition else 'N/A'}, "
f"message: {ready_condition.get('message') if ready_condition else 'no condition found'})"
)
# Raise after all attempts exhausted (without additional delay)
raise TimeoutError(
f"Graph deployment '{graph_deployment_name}' "
f"is not ready after {max_attempts * delay_seconds} seconds"
)
...@@ -22,7 +22,7 @@ class KubernetesConnector(PlannerConnector): ...@@ -22,7 +22,7 @@ class KubernetesConnector(PlannerConnector):
self.kube_api = KubernetesAPI() self.kube_api = KubernetesAPI()
self.namespace = namespace self.namespace = namespace
async def add_component(self, component_name: str): async def add_component(self, component_name: str, blocking: bool = True):
"""Add a component by increasing its replica count by 1""" """Add a component by increasing its replica count by 1"""
deployment = await self.kube_api.get_graph_deployment( deployment = await self.kube_api.get_graph_deployment(
component_name, self.namespace component_name, self.namespace
...@@ -38,6 +38,10 @@ class KubernetesConnector(PlannerConnector): ...@@ -38,6 +38,10 @@ class KubernetesConnector(PlannerConnector):
component_name, component_name,
current_replicas + 1, current_replicas + 1,
) )
if blocking:
await self.kube_api.wait_for_graph_deployment_ready(
self._get_graph_deployment_name(deployment)
)
async def remove_component(self, component_name: str): async def remove_component(self, component_name: str):
"""Remove a component by decreasing its replica count by 1""" """Remove a component by decreasing its replica count by 1"""
......
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any, Dict
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from dynamo.planner.kube import KubernetesAPI
@pytest.fixture
def mock_config():
with patch("dynamo.planner.kube.config") as mock:
mock.load_incluster_config = MagicMock()
yield mock
@pytest.fixture
def mock_custom_api():
with patch("dynamo.planner.kube.client.CustomObjectsApi") as mock:
yield mock.return_value
@pytest.fixture
def k8s_api(mock_custom_api, mock_config):
return KubernetesAPI()
@pytest.mark.asyncio
async def test_wait_for_graph_deployment_ready_success(k8s_api, mock_custom_api):
# Mock the get_graph_deployment response
mock_deployment: Dict[str, Any] = {
"status": {
"conditions": [
{"type": "Ready", "status": "True", "message": "Deployment is ready"}
]
}
}
k8s_api.get_graph_deployment = AsyncMock(return_value=mock_deployment)
# Test with minimal attempts and delay for faster testing
await k8s_api.wait_for_graph_deployment_ready(
"test-deployment", max_attempts=2, delay_seconds=0.1
)
# Verify get_graph_deployment was called
k8s_api.get_graph_deployment.assert_called_once_with(
"test-deployment", k8s_api.current_namespace
)
@pytest.mark.asyncio
async def test_wait_for_graph_deployment_ready_timeout(k8s_api, mock_custom_api):
# Mock the get_graph_deployment response with not ready status
mock_deployment: Dict[str, Any] = {
"status": {
"conditions": [
{
"type": "Ready",
"status": "False",
"message": "Deployment is not ready",
}
]
}
}
k8s_api.get_graph_deployment = AsyncMock(return_value=mock_deployment)
# Test with minimal attempts and delay for faster testing
with pytest.raises(TimeoutError) as exc_info:
await k8s_api.wait_for_graph_deployment_ready(
"test-deployment", max_attempts=2, delay_seconds=0.1
)
assert "is not ready after" in str(exc_info.value)
assert k8s_api.get_graph_deployment.call_count == 2
@pytest.mark.asyncio
async def test_wait_for_graph_deployment_not_found(k8s_api, mock_custom_api):
# Mock the get_graph_deployment response to return None
k8s_api.get_graph_deployment = AsyncMock(return_value=None)
# Test with minimal attempts and delay for faster testing
with pytest.raises(ValueError) as exc_info:
await k8s_api.wait_for_graph_deployment_ready(
"test-deployment", max_attempts=2, delay_seconds=0.1
)
assert "not found" in str(exc_info.value)
assert k8s_api.get_graph_deployment.call_count == 1
@pytest.mark.asyncio
async def test_wait_for_graph_deployment_no_conditions(k8s_api, mock_custom_api):
# Mock the get_graph_deployment response with no conditions
mock_deployment: Dict[str, Any] = {"status": {}}
k8s_api.get_graph_deployment = AsyncMock(return_value=mock_deployment)
# Test with minimal attempts and delay for faster testing
with pytest.raises(TimeoutError) as exc_info:
await k8s_api.wait_for_graph_deployment_ready(
"test-deployment", max_attempts=2, delay_seconds=0.1
)
assert "is not ready after" in str(exc_info.value)
assert k8s_api.get_graph_deployment.call_count == 2
@pytest.mark.asyncio
async def test_wait_for_graph_deployment_ready_on_second_attempt(
k8s_api, mock_custom_api
):
# Mock the get_graph_deployment response to return not ready first, then ready
mock_deployment_not_ready: Dict[str, Any] = {
"status": {
"conditions": [
{
"type": "Ready",
"status": "False",
"message": "Deployment is not ready",
}
]
}
}
mock_deployment_ready: Dict[str, Any] = {
"status": {
"conditions": [
{"type": "Ready", "status": "True", "message": "Deployment is ready"}
]
}
}
k8s_api.get_graph_deployment = AsyncMock(
side_effect=[mock_deployment_not_ready, mock_deployment_ready]
)
# Test with minimal attempts and delay for faster testing
await k8s_api.wait_for_graph_deployment_ready(
"test-deployment", max_attempts=2, delay_seconds=0.1
)
assert k8s_api.get_graph_deployment.call_count == 2
...@@ -25,6 +25,7 @@ def mock_kube_api(): ...@@ -25,6 +25,7 @@ def mock_kube_api():
mock_api = Mock() mock_api = Mock()
mock_api.get_graph_deployment = AsyncMock() mock_api.get_graph_deployment = AsyncMock()
mock_api.update_graph_replicas = AsyncMock() mock_api.update_graph_replicas = AsyncMock()
mock_api.wait_for_graph_deployment_ready = AsyncMock()
return mock_api return mock_api
...@@ -41,9 +42,7 @@ def kubernetes_connector(mock_kube_api_class, monkeypatch): ...@@ -41,9 +42,7 @@ def kubernetes_connector(mock_kube_api_class, monkeypatch):
monkeypatch.setattr( monkeypatch.setattr(
"dynamo.planner.kubernetes_connector.KubernetesAPI", mock_kube_api_class "dynamo.planner.kubernetes_connector.KubernetesAPI", mock_kube_api_class
) )
connector = KubernetesConnector() connector = KubernetesConnector("default")
# Set the namespace attribute that's being accessed in the error
connector.namespace = "default"
return connector return connector
...@@ -56,15 +55,20 @@ async def test_add_component_increases_replicas(kubernetes_connector, mock_kube_ ...@@ -56,15 +55,20 @@ async def test_add_component_increases_replicas(kubernetes_connector, mock_kube_
"spec": {"services": {"test-component": {"replicas": 1}}}, "spec": {"services": {"test-component": {"replicas": 1}}},
} }
mock_kube_api.get_graph_deployment.return_value = mock_deployment mock_kube_api.get_graph_deployment.return_value = mock_deployment
mock_kube_api.update_graph_replicas.return_value = None
mock_kube_api.wait_for_graph_deployment_ready.return_value = None
# Act # Act
await kubernetes_connector.add_component(component_name) await kubernetes_connector.add_component(component_name)
# Assert # Assert
mock_kube_api.get_graph_deployment.assert_called_once_with(component_name) mock_kube_api.get_graph_deployment.assert_called_once_with(
component_name, kubernetes_connector.namespace
)
mock_kube_api.update_graph_replicas.assert_called_once_with( mock_kube_api.update_graph_replicas.assert_called_once_with(
"test-graph", component_name, 2 "test-graph", component_name, 2
) )
mock_kube_api.wait_for_graph_deployment_ready.assert_called_once_with("test-graph")
@pytest.mark.asyncio @pytest.mark.asyncio
...@@ -86,6 +90,7 @@ async def test_add_component_with_no_replicas_specified( ...@@ -86,6 +90,7 @@ async def test_add_component_with_no_replicas_specified(
mock_kube_api.update_graph_replicas.assert_called_once_with( mock_kube_api.update_graph_replicas.assert_called_once_with(
"test-graph", component_name, 2 "test-graph", component_name, 2
) )
mock_kube_api.wait_for_graph_deployment_ready.assert_called_once_with("test-graph")
@pytest.mark.asyncio @pytest.mark.asyncio
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment