Unverified Commit 9573d34c authored by Julien Mancuso's avatar Julien Mancuso Committed by GitHub
Browse files

feat: make planner use DGD Scaling Adapters (#4825)


Signed-off-by: default avatarJulien Mancuso <jmancuso@nvidia.com>
parent 01bfbea1
......@@ -78,11 +78,48 @@ class KubernetesAPI:
)
raise
def update_graph_replicas(
self, graph_deployment_name: str, component_name: str, replicas: int
def update_service_replicas(
self, graph_deployment_name: str, service_name: str, replicas: int
) -> None:
"""
Update replicas for a service using Scale subresource when DGDSA exists.
Falls back to DGD patch for backward compatibility with older operators.
Args:
graph_deployment_name: Name of the DynamoGraphDeployment
service_name: Name of the service in DGD.spec.services
replicas: Desired number of replicas
"""
# DGDSA naming convention: <dgd-name>-<lowercase-service-name>
adapter_name = f"{graph_deployment_name}-{service_name.lower()}"
try:
# Try to scale via DGDSA Scale subresource
self.custom_api.patch_namespaced_custom_object_scale(
group="nvidia.com",
version="v1alpha1",
namespace=self.current_namespace,
plural="dynamographdeploymentscalingadapters",
name=adapter_name,
body={"spec": {"replicas": replicas}},
)
logger.info(f"Scaled DGDSA {adapter_name} to {replicas} replicas")
except client.ApiException as e:
if e.status == 404:
# DGDSA doesn't exist - fall back to DGD patch (old operator)
logger.info(
f"DGDSA {adapter_name} not found, falling back to DGD update"
)
self._update_dgd_replicas(graph_deployment_name, service_name, replicas)
else:
raise
def _update_dgd_replicas(
self, graph_deployment_name: str, service_name: str, replicas: int
) -> None:
"""Update the replicas count for a component in a DynamoGraphDeployment"""
patch = {"spec": {"services": {component_name: {"replicas": replicas}}}}
"""Update replicas directly in DGD (fallback for old operators)"""
patch = {"spec": {"services": {service_name: {"replicas": replicas}}}}
self.custom_api.patch_namespaced_custom_object(
group="nvidia.com",
version="v1alpha1",
......@@ -91,6 +128,20 @@ class KubernetesAPI:
name=graph_deployment_name,
body=patch,
)
logger.info(
f"Updated DGD {graph_deployment_name} service {service_name} to {replicas} replicas"
)
def update_graph_replicas(
self, graph_deployment_name: str, component_name: str, replicas: int
) -> None:
"""
Update replicas for a service. Now uses DGDSA when available.
Deprecated: Use update_service_replicas() instead for clarity.
This method is kept for backward compatibility.
"""
self.update_service_replicas(graph_deployment_name, component_name, replicas)
def is_deployment_ready(self, deployment: dict) -> bool:
"""Check if a graph deployment is ready"""
......
......@@ -39,6 +39,9 @@ rules:
- apiGroups: ["nvidia.com"]
resources: ["dynamocomponentdeployments", "dynamographdeployments"]
verbs: ["get", "list", "create", "update", "patch"]
- apiGroups: ["nvidia.com"]
resources: ["dynamographdeploymentscalingadapters/scale"]
verbs: ["patch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
......@@ -68,4 +71,7 @@ rules:
- apiGroups: ["nvidia.com"]
resources: ["dynamocomponentdeployments", "dynamographdeployments"]
verbs: ["get", "list", "create", "update", "patch"]
{{- end }}
\ No newline at end of file
- apiGroups: ["nvidia.com"]
resources: ["dynamographdeploymentscalingadapters/scale"]
verbs: ["patch"]
{{- end }}
......@@ -76,11 +76,87 @@ def test_get_graph_deployment_from_name(k8s_api, mock_custom_api):
)
def test_update_graph_replicas(k8s_api, mock_custom_api):
def test_update_service_replicas_uses_dgdsa_scale(k8s_api, mock_custom_api):
"""Test that update_service_replicas uses DGDSA Scale API when available"""
mock_custom_api.patch_namespaced_custom_object_scale.return_value = None
k8s_api.update_service_replicas("test-deployment", "Frontend", 3)
# Should use Scale subresource with lowercase adapter name
mock_custom_api.patch_namespaced_custom_object_scale.assert_called_once_with(
group="nvidia.com",
version="v1alpha1",
namespace=k8s_api.current_namespace,
plural="dynamographdeploymentscalingadapters",
name="test-deployment-frontend", # lowercase service name
body={"spec": {"replicas": 3}},
)
# Should NOT fall back to DGD patch
mock_custom_api.patch_namespaced_custom_object.assert_not_called()
def test_update_service_replicas_fallback_to_dgd(k8s_api, mock_custom_api):
"""Test that update_service_replicas falls back to DGD when DGDSA not found"""
# DGDSA doesn't exist (404)
mock_custom_api.patch_namespaced_custom_object_scale.side_effect = (
client.ApiException(status=404)
)
mock_custom_api.patch_namespaced_custom_object.return_value = None
k8s_api.update_service_replicas("test-deployment", "test-component", 1)
# Should have tried DGDSA first
mock_custom_api.patch_namespaced_custom_object_scale.assert_called_once()
# Should fall back to DGD patch
mock_custom_api.patch_namespaced_custom_object.assert_called_once_with(
group="nvidia.com",
version="v1alpha1",
namespace=k8s_api.current_namespace,
plural="dynamographdeployments",
name="test-deployment",
body={"spec": {"services": {"test-component": {"replicas": 1}}}},
)
def test_update_service_replicas_propagates_other_errors(k8s_api, mock_custom_api):
"""Test that update_service_replicas propagates non-404 errors"""
mock_custom_api.patch_namespaced_custom_object_scale.side_effect = (
client.ApiException(status=500, reason="Internal Server Error")
)
with pytest.raises(client.ApiException) as exc_info:
k8s_api.update_service_replicas("test-deployment", "test-component", 1)
assert exc_info.value.status == 500
# Should NOT fall back to DGD
mock_custom_api.patch_namespaced_custom_object.assert_not_called()
def test_update_graph_replicas_calls_update_service_replicas(k8s_api, mock_custom_api):
"""Test that deprecated update_graph_replicas calls update_service_replicas"""
mock_custom_api.patch_namespaced_custom_object_scale.return_value = None
# Use the deprecated method
k8s_api.update_graph_replicas("test-deployment", "test-component", 1)
# Should delegate to update_service_replicas which uses Scale API
mock_custom_api.patch_namespaced_custom_object_scale.assert_called_once_with(
group="nvidia.com",
version="v1alpha1",
namespace=k8s_api.current_namespace,
plural="dynamographdeploymentscalingadapters",
name="test-deployment-test-component",
body={"spec": {"replicas": 1}},
)
def test_update_dgd_replicas_directly(k8s_api, mock_custom_api):
"""Test the internal _update_dgd_replicas method"""
mock_custom_api.patch_namespaced_custom_object.return_value = None
k8s_api._update_dgd_replicas("test-deployment", "test-component", 1)
mock_custom_api.patch_namespaced_custom_object.assert_called_once_with(
group="nvidia.com",
version="v1alpha1",
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment