Unverified Commit 37adc0a8 authored by julienmancuso's avatar julienmancuso Committed by GitHub
Browse files

feat: update planner to use DYN_PARENT_DGD_K8S_NAME (#2774)


Signed-off-by: default avatarJulien Mancuso <jmancuso@nvidia.com>
parent e28ff8d2
......@@ -14,6 +14,7 @@
# limitations under the License.
import asyncio
import os
from typing import Optional
from kubernetes import client, config
......@@ -53,77 +54,37 @@ class KubernetesAPI:
name=graph_deployment_name,
)
async def get_graph_deployment(
self, component_name: str, dynamo_namespace: str
) -> Optional[dict]:
async def get_parent_graph_deployment(self) -> Optional[dict]:
"""
Get DynamoGraphDeployment by first finding the associated DynamoComponentDeployment
and then retrieving its owner reference.
Get the parent DynamoGraphDeployment using environment variable.
Args:
component_name: The name of the component
dynamo_namespace: The dynamo namespace
Uses DYN_PARENT_DGD_K8S_NAME environment variable and assumes the DGD
is in the same namespace as this component (self.current_namespace).
Returns:
The DynamoGraphDeployment object or None if not found
The DynamoGraphDeployment object or None if env var is not set
"""
try:
# First, find the DynamoComponentDeployment using the component name and namespace labels
label_selector = f"nvidia.com/dynamo-component={component_name},nvidia.com/dynamo-namespace={dynamo_namespace}"
component_deployments = self.custom_api.list_namespaced_custom_object(
group="nvidia.com",
version="v1alpha1",
namespace=self.current_namespace,
plural="dynamocomponentdeployments",
label_selector=label_selector,
)
items = component_deployments.get("items", [])
if not items:
return None
if len(items) > 1:
raise ValueError(
f"Multiple component deployments found for component {component_name} in dynamo namespace {dynamo_namespace}. "
"Expected exactly one deployment."
)
# Get the component deployment and extract the owner reference
component_deployment = items[0]
owner_refs = component_deployment.get("metadata", {}).get(
"ownerReferences", []
)
# Find the DynamoGraphDeployment in the owner references
graph_deployment_ref = None
for ref in owner_refs:
if (
ref.get("apiVersion") == "nvidia.com/v1alpha1"
and ref.get("kind") == "DynamoGraphDeployment"
):
graph_deployment_ref = ref
break
if not graph_deployment_ref:
return None
# Get the actual DynamoGraphDeployment using the name from the owner reference
graph_deployment_name = graph_deployment_ref.get("name")
if not graph_deployment_name:
return None
graph_deployment = self._get_graph_deployment_from_name(
graph_deployment_name
)
dgd_name = os.getenv("DYN_PARENT_DGD_K8S_NAME")
return graph_deployment
if not dgd_name:
return None
try:
return self._get_graph_deployment_from_name(dgd_name)
except client.ApiException as e:
if e.status == 404:
return None
raise
async def get_graph_deployment(self) -> Optional[dict]:
"""
Get the parent DynamoGraphDeployment using environment variable.
Returns:
The DynamoGraphDeployment object or None if env var is not set
"""
return await self.get_parent_graph_deployment()
async def update_graph_replicas(
self, graph_deployment_name: str, component_name: str, replicas: int
) -> None:
......
......@@ -32,13 +32,9 @@ class KubernetesConnector(PlannerConnector):
async def add_component(self, component_name: str, blocking: bool = True):
"""Add a component by increasing its replica count by 1"""
deployment = await self.kube_api.get_graph_deployment(
component_name, self.dynamo_namespace
)
deployment = await self.kube_api.get_graph_deployment()
if deployment is None:
raise ValueError(
f"Graph not found for component {component_name} in dynamo namespace {self.dynamo_namespace}"
)
raise ValueError("Parent DynamoGraphDeployment not found")
# get current replicas or 1 if not found
current_replicas = self._get_current_replicas(deployment, component_name)
......@@ -55,13 +51,9 @@ class KubernetesConnector(PlannerConnector):
async def remove_component(self, component_name: str, blocking: bool = True):
"""Remove a component by decreasing its replica count by 1"""
deployment = await self.kube_api.get_graph_deployment(
component_name, self.dynamo_namespace
)
deployment = await self.kube_api.get_graph_deployment()
if deployment is None:
raise ValueError(
f"Graph {component_name} not found for namespace {self.dynamo_namespace}"
)
raise ValueError("Parent DynamoGraphDeployment not found")
# get current replicas or 1 if not found
current_replicas = self._get_current_replicas(deployment, component_name)
......@@ -76,48 +68,17 @@ class KubernetesConnector(PlannerConnector):
self._get_graph_deployment_name(deployment)
)
async def _validate_components_same_deployment(
self, target_replicas: dict[str, int]
) -> dict:
"""
Validate that all target components belong to the same DynamoGraphDeployment.
"""
async def set_component_replicas(
self, target_replicas: dict[str, int], blocking: bool = True
):
"""Set the replicas for multiple components at once"""
if not target_replicas:
raise ValueError("target_replicas cannot be empty")
# Get deployment for first component
first_component = next(iter(target_replicas))
deployment = await self.kube_api.get_graph_deployment(
first_component, self.dynamo_namespace
)
deployment = await self.kube_api.get_graph_deployment()
if deployment is None:
raise ValueError(
f"Component {first_component} not found in namespace {self.dynamo_namespace}"
)
raise ValueError("Parent DynamoGraphDeployment not found")
# Validate that all components belong to the same DGD
graph_name = deployment["metadata"]["name"]
for component in target_replicas:
comp_deployment = await self.kube_api.get_graph_deployment(
component, self.dynamo_namespace
)
if comp_deployment is None:
raise ValueError(
f"Component {component} not found in namespace {self.dynamo_namespace}"
)
if comp_deployment["metadata"]["name"] != graph_name:
raise ValueError(
f"Component {component} belongs to graph '{comp_deployment['metadata']['name']}' "
f"but expected graph '{graph_name}'. All components must belong to the same GraphDeployment."
)
return deployment
async def set_component_replicas(
self, target_replicas: dict[str, int], blocking: bool = True
):
"""Set the replicas for multiple components at once"""
deployment = await self._validate_components_same_deployment(target_replicas)
if not await self.kube_api.is_deployment_ready(
self._get_graph_deployment_name(deployment)
):
......
......@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from typing import Any, Dict
from unittest.mock import MagicMock, patch
......@@ -245,3 +246,40 @@ async def test_wait_for_graph_deployment_ready_on_second_attempt(
await k8s_api.wait_for_graph_deployment_ready(
"test-deployment", max_attempts=2, delay_seconds=0.1
)
@pytest.mark.asyncio
async def test_get_parent_graph_deployment_with_env_var(k8s_api, mock_custom_api):
"""Test get_parent_graph_deployment with environment variable set"""
mock_deployment = {"metadata": {"name": "parent-dgd"}}
with patch.dict(os.environ, {"DYN_PARENT_DGD_K8S_NAME": "parent-dgd"}):
with patch.object(
k8s_api, "_get_graph_deployment_from_name", return_value=mock_deployment
) as mock_get:
result = await k8s_api.get_parent_graph_deployment()
assert result == mock_deployment
mock_get.assert_called_once_with("parent-dgd")
@pytest.mark.asyncio
async def test_get_parent_graph_deployment_without_env_var(k8s_api, mock_custom_api):
"""Test get_parent_graph_deployment without environment variable"""
with patch.dict(os.environ, {}, clear=True):
result = await k8s_api.get_parent_graph_deployment()
assert result is None
@pytest.mark.asyncio
async def test_get_graph_deployment_delegates_to_parent(k8s_api, mock_custom_api):
"""Test get_graph_deployment delegates to get_parent_graph_deployment"""
mock_deployment = {"metadata": {"name": "parent-dgd"}}
with patch.object(
k8s_api, "get_parent_graph_deployment", return_value=mock_deployment
) as mock_parent:
result = await k8s_api.get_graph_deployment()
assert result == mock_deployment
mock_parent.assert_called_once()
......@@ -26,6 +26,7 @@ def mock_kube_api():
mock_api.get_graph_deployment = AsyncMock()
mock_api.update_graph_replicas = AsyncMock()
mock_api.wait_for_graph_deployment_ready = AsyncMock()
mock_api.is_deployment_ready = AsyncMock()
return mock_api
......@@ -42,7 +43,7 @@ def kubernetes_connector(mock_kube_api_class, monkeypatch):
monkeypatch.setattr(
"dynamo.planner.kubernetes_connector.KubernetesAPI", mock_kube_api_class
)
connector = KubernetesConnector("default")
connector = KubernetesConnector("test-dynamo-namespace", "default")
return connector
......@@ -62,9 +63,7 @@ async def test_add_component_increases_replicas(kubernetes_connector, mock_kube_
await kubernetes_connector.add_component(component_name)
# Assert
mock_kube_api.get_graph_deployment.assert_called_once_with(
component_name, kubernetes_connector.dynamo_namespace
)
mock_kube_api.get_graph_deployment.assert_called_once()
mock_kube_api.update_graph_replicas.assert_called_once_with(
"test-graph", component_name, 2
)
......@@ -100,9 +99,7 @@ async def test_add_component_deployment_not_found(kubernetes_connector, mock_kub
mock_kube_api.get_graph_deployment.return_value = None
# Act & Assert
with pytest.raises(
ValueError, match=f"Graph not found for component {component_name}"
):
with pytest.raises(ValueError, match="Parent DynamoGraphDeployment not found"):
await kubernetes_connector.add_component(component_name)
......@@ -142,3 +139,54 @@ async def test_remove_component_with_zero_replicas(kubernetes_connector, mock_ku
# Assert
mock_kube_api.update_graph_replicas.assert_not_called()
mock_kube_api.wait_for_graph_deployment_ready.assert_not_called()
@pytest.mark.asyncio
async def test_set_component_replicas(kubernetes_connector, mock_kube_api):
# Arrange
target_replicas = {"component1": 3, "component2": 2}
mock_deployment = {
"metadata": {"name": "test-graph"},
"spec": {
"services": {"component1": {"replicas": 1}, "component2": {"replicas": 1}}
},
}
mock_kube_api.get_graph_deployment.return_value = mock_deployment
mock_kube_api.is_deployment_ready.return_value = True
mock_kube_api.update_graph_replicas.return_value = None
mock_kube_api.wait_for_graph_deployment_ready.return_value = None
# Act
await kubernetes_connector.set_component_replicas(target_replicas)
# Assert
mock_kube_api.get_graph_deployment.assert_called_once()
mock_kube_api.is_deployment_ready.assert_called_once_with("test-graph")
# Should be called twice, once for each component
assert mock_kube_api.update_graph_replicas.call_count == 2
mock_kube_api.wait_for_graph_deployment_ready.assert_called_once_with("test-graph")
@pytest.mark.asyncio
async def test_set_component_replicas_deployment_not_found(
kubernetes_connector, mock_kube_api
):
# Arrange
target_replicas = {"component1": 3}
mock_kube_api.get_graph_deployment.return_value = None
# Act & Assert
with pytest.raises(ValueError, match="Parent DynamoGraphDeployment not found"):
await kubernetes_connector.set_component_replicas(target_replicas)
@pytest.mark.asyncio
async def test_set_component_replicas_empty_target_replicas(
kubernetes_connector, mock_kube_api
):
# Arrange
target_replicas: dict[str, int] = {}
# Act & Assert
with pytest.raises(ValueError, match="target_replicas cannot be empty"):
await kubernetes_connector.set_component_replicas(target_replicas)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment