Unverified Commit de77d3f9 authored by julienmancuso's avatar julienmancuso Committed by GitHub
Browse files

feat: allow to CRUD dynamo pipelines (#761)

parent e06bfd55
...@@ -13,21 +13,26 @@ ...@@ -13,21 +13,26 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import os
import uuid
from datetime import datetime from datetime import datetime
from typing import Optional from typing import Any, Dict, List, Optional
from fastapi import APIRouter, HTTPException from fastapi import APIRouter, HTTPException, Query
from ..models.schemas import ( from ..models.schemas import (
CreateDeploymentSchema, CreateDeploymentSchema,
DeploymentFullSchema, DeploymentFullSchema,
DeploymentListResponse,
ResourceSchema, ResourceSchema,
create_default_cluster, create_default_cluster,
create_default_user, create_default_user,
) )
from .k8s import create_dynamo_deployment from .k8s import (
create_dynamo_deployment,
delete_dynamo_deployment,
get_dynamo_deployment,
get_namespace,
list_dynamo_deployments,
)
router = APIRouter(prefix="/api/v2/deployments", tags=["deployments"]) router = APIRouter(prefix="/api/v2/deployments", tags=["deployments"])
...@@ -44,19 +49,18 @@ def sanitize_deployment_name(name: Optional[str], dynamo_nim: str) -> str: ...@@ -44,19 +49,18 @@ def sanitize_deployment_name(name: Optional[str], dynamo_nim: str) -> str:
A unique deployment name that is at most 63 characters A unique deployment name that is at most 63 characters
""" """
if name: if name:
# If name is provided, truncate it to 55 chars to leave room for UUID # If name is provided, truncate it to 63
base_name = name[:55] base_name = name[:63]
else: else:
# Generate base name from dynamoNim # Generate base name from dynamoNim
dynamo_nim_parts = dynamo_nim.split(":") dynamo_nim_parts = dynamo_nim.split(":")
if len(dynamo_nim_parts) != 2: if len(dynamo_nim_parts) != 2:
raise ValueError("Invalid dynamoNim format, expected 'name:version'") raise ValueError("Invalid dynamoNim format, expected 'name:version'")
base_name = f"dep-{dynamo_nim_parts[0]}-{dynamo_nim_parts[1]}" base_name = f"dep-{dynamo_nim_parts[0]}-{dynamo_nim_parts[1]}"
# Truncate to 55 chars to leave room for UUID # Truncate to 63 chars
base_name = base_name[:55] base_name = base_name[:63]
# Add UUID and ensure total length is <= 63 return base_name
return f"{base_name}-{uuid.uuid4().hex[:7]}"
@router.post("", response_model=DeploymentFullSchema) @router.post("", response_model=DeploymentFullSchema)
...@@ -75,7 +79,7 @@ async def create_deployment(deployment: CreateDeploymentSchema): ...@@ -75,7 +79,7 @@ async def create_deployment(deployment: CreateDeploymentSchema):
ownership = {"organization_id": "default-org", "user_id": "default-user"} ownership = {"organization_id": "default-org", "user_id": "default-user"}
# Get the k8s namespace from environment variable # Get the k8s namespace from environment variable
kube_namespace = os.getenv("DEFAULT_KUBE_NAMESPACE", "dynamo") kube_namespace = get_namespace()
# Generate deployment name # Generate deployment name
deployment_name = sanitize_deployment_name(deployment.name, deployment.bento) deployment_name = sanitize_deployment_name(deployment.name, deployment.bento)
...@@ -114,7 +118,6 @@ async def create_deployment(deployment: CreateDeploymentSchema): ...@@ -114,7 +118,6 @@ async def create_deployment(deployment: CreateDeploymentSchema):
cluster=cluster, cluster=cluster,
latest_revision=None, latest_revision=None,
manifest=None, manifest=None,
urls=[f"https://{created_crd['metadata']['name']}.dynamo.example.com"],
) )
return deployment_schema return deployment_schema
...@@ -123,3 +126,179 @@ async def create_deployment(deployment: CreateDeploymentSchema): ...@@ -123,3 +126,179 @@ async def create_deployment(deployment: CreateDeploymentSchema):
print("Error creating deployment:") print("Error creating deployment:")
print(e) print(e)
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
@router.get("/{name}", response_model=DeploymentFullSchema)
def get_deployment(name: str) -> DeploymentFullSchema:
try:
kube_namespace = get_namespace()
cr = get_dynamo_deployment(
name=name,
namespace=kube_namespace,
)
deployment_schema = DeploymentFullSchema(
name=name,
created_at=cr["metadata"]["creationTimestamp"],
uid=cr["metadata"]["uid"],
resource_type="deployment",
labels=[],
kube_namespace=kube_namespace,
status=get_deployment_status(cr),
urls=get_urls(cr),
creator=create_default_user(),
cluster=create_default_cluster(create_default_user()),
latest_revision=None,
manifest=None,
)
return deployment_schema
except HTTPException as e:
raise e
except Exception as e:
print("Error retrieving deployment:")
print(e)
raise HTTPException(status_code=500, detail=str(e))
# function to look for a condition with type "Ready" in the status of the deployment
# and return the "message" field
def get_deployment_status(resource: Dict[str, Any]) -> str:
# look for a condition with type "Ready" in the status of the deployment
for condition in resource.get("status", {}).get("conditions", []):
if condition.get("type") == "Ready":
return condition.get("message", "unknown")
return "unknown"
def get_urls(resource: Dict[str, Any]) -> List[str]:
urls = []
for condition in resource.get("status", {}).get("conditions", []):
if condition.get("type") == "EndpointExposed":
urls.append(condition.get("message"))
return urls
@router.delete("/{name}", response_model=DeploymentFullSchema)
def delete_deployment(name: str) -> DeploymentFullSchema:
try:
kube_namespace = get_namespace()
# Get deployment details before deletion
cr = get_dynamo_deployment(name, kube_namespace)
deployment_schema = DeploymentFullSchema(
name=name,
created_at=cr["metadata"]["creationTimestamp"],
uid=cr["metadata"]["uid"],
resource_type="deployment",
labels=[],
kube_namespace=kube_namespace,
status=get_deployment_status(cr),
urls=get_urls(cr),
creator=create_default_user(),
cluster=create_default_cluster(create_default_user()),
latest_revision=None,
manifest=None,
)
# Delete the deployment
delete_dynamo_deployment(name, kube_namespace)
return deployment_schema
except HTTPException as e:
raise e
except Exception as e:
print("Error deleting deployment:")
print(e)
raise HTTPException(status_code=500, detail=str(e))
@router.get("/", response_model=DeploymentListResponse)
@router.get("", response_model=DeploymentListResponse)
def list_deployments(
search: str = Query(default="", description="Search query"),
dev: bool = Query(default=False, description="Filter development deployments"),
q: str = Query(default="", description="Advanced query string"),
all: bool = Query(default=False, description="Return all deployments"),
count: str = Query(default="", description="Number of items to return"),
start: str = Query(default="", description="Starting index"),
cluster: str = Query(default="", description="Filter by cluster name"),
) -> Dict[str, Any]:
"""
List all deployments with optional filtering.
Args:
search: Simple text search
dev: Filter development deployments
q: Advanced query string
all: Whether to return all deployments
count: Number of deployments to return
start: Starting index for pagination
cluster: Filter by cluster name
Returns:
Dict containing paginated deployment list
"""
try:
# Convert count and start to integers if they're not empty
count_val = int(count) if count else None
start_val = int(start) if start else None
if count_val is not None and count_val <= 0:
raise HTTPException(status_code=400, detail="Count must be greater than 0")
if start_val is not None and start_val < 0:
raise HTTPException(status_code=400, detail="Start must be non-negative")
kube_namespace = get_namespace()
crs = list_dynamo_deployments(
namespace=kube_namespace,
label_selector=q,
)
deployments = []
for cr in crs:
deployment_schema = DeploymentFullSchema(
name=cr["metadata"]["name"],
created_at=cr["metadata"]["creationTimestamp"],
uid=cr["metadata"]["uid"],
resource_type="deployment",
labels=[],
kube_namespace=kube_namespace,
status=get_deployment_status(cr),
urls=get_urls(cr),
creator=create_default_user(),
cluster=create_default_cluster(create_default_user()),
latest_revision=None,
manifest=None,
)
# Apply cluster filter if provided
if cluster and cluster != deployment_schema.cluster.name:
continue
# Apply search filter if provided
if search and search.lower() not in deployment_schema.name.lower():
continue
# Apply dev filter if enabled and all is not True
if not all and dev and not deployment_schema.name.startswith("dev-"):
continue
deployments.append(deployment_schema)
# Handle pagination
total = len(deployments)
start_idx = start_val if start_val is not None else 0
if count_val is not None:
deployments = deployments[start_idx : start_idx + count_val]
else:
deployments = deployments[start_idx:]
return {
"start": start_idx,
"count": len(deployments),
"total": total,
"items": deployments,
}
except HTTPException as e:
raise e
except Exception as e:
print("Error listing deployments:")
print(e)
raise HTTPException(status_code=500, detail=str(e))
...@@ -13,11 +13,27 @@ ...@@ -13,11 +13,27 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import os
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional
from fastapi import HTTPException
from kubernetes import client, config from kubernetes import client, config
class K8sResource:
def __init__(self, group: str, version: str, plural: str):
self.group = group
self.version = version
self.plural = plural
DynamoDeployment = K8sResource(
group="nvidia.com",
version="v1alpha1",
plural="dynamodeployments",
)
def create_custom_resource( def create_custom_resource(
group: str, version: str, namespace: str, plural: str, body: Dict[str, Any] group: str, version: str, namespace: str, plural: str, body: Dict[str, Any]
) -> Dict[str, Any]: ) -> Dict[str, Any]:
...@@ -73,9 +89,112 @@ def create_dynamo_deployment( ...@@ -73,9 +89,112 @@ def create_dynamo_deployment(
} }
return create_custom_resource( return create_custom_resource(
group="nvidia.com", group=DynamoDeployment.group,
version="v1alpha1", version=DynamoDeployment.version,
namespace=namespace, namespace=namespace,
plural="dynamodeployments", plural=DynamoDeployment.plural,
body=body, body=body,
) )
def get_dynamo_deployment(name: str, namespace: str) -> Dict[str, Any]:
"""
Get a DynamoDeployment custom resource.
Args:
name: Deployment name
namespace: Target namespace
Returns:
Deployment
Raises:
HTTPException: If the deployment is not found or an error occurs
"""
try:
config.load_incluster_config()
except config.config_exception.ConfigException:
config.load_kube_config()
api = client.CustomObjectsApi()
try:
return api.get_namespaced_custom_object(
group=DynamoDeployment.group,
version=DynamoDeployment.version,
namespace=namespace,
plural=DynamoDeployment.plural,
name=name,
)
except client.rest.ApiException as e:
if e.status == 404:
raise HTTPException(status_code=404, detail="Deployment not found")
else:
raise HTTPException(status_code=500, detail=str(e))
def get_namespace() -> str:
"""
Get the namespace from the environment variable.
"""
return os.getenv("DEFAULT_KUBE_NAMESPACE", "dynamo")
def delete_dynamo_deployment(name: str, namespace: str) -> Dict[str, Any]:
"""
Delete a DynamoDeployment custom resource.
"""
try:
config.load_incluster_config()
except config.config_exception.ConfigException:
config.load_kube_config()
api = client.CustomObjectsApi()
try:
return api.delete_namespaced_custom_object(
group=DynamoDeployment.group,
version=DynamoDeployment.version,
namespace=namespace,
plural=DynamoDeployment.plural,
name=name,
)
except client.rest.ApiException as e:
if e.status == 404:
raise HTTPException(status_code=404, detail="Deployment not found")
else:
raise HTTPException(status_code=500, detail=str(e))
def list_dynamo_deployments(
namespace: str,
label_selector: Optional[str] = None,
) -> List[Dict[str, Any]]:
"""
List DynamoDeployment custom resources.
Args:
namespace: Target namespace
label_selector: Optional label selector for filtering
Returns:
List of deployments
Raises:
HTTPException: If an error occurs during listing
"""
try:
config.load_incluster_config()
except config.config_exception.ConfigException:
config.load_kube_config()
api = client.CustomObjectsApi()
try:
response = api.list_namespaced_custom_object(
group=DynamoDeployment.group,
version=DynamoDeployment.version,
namespace=namespace,
plural=DynamoDeployment.plural,
label_selector=label_selector,
)
return response["items"]
except client.rest.ApiException as e:
raise HTTPException(status_code=500, detail=str(e))
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .deployments import get_deployment_status, get_urls
def test_get_deployment_status():
# Test case 1: Ready condition present with message
resource = {
"status": {"conditions": [{"type": "Ready", "message": "Deployment is ready"}]}
}
assert get_deployment_status(resource) == "Deployment is ready"
# Test case 2: Ready condition not present
resource = {
"status": {
"conditions": [{"type": "Available", "message": "Some other condition"}]
}
}
assert get_deployment_status(resource) == "unknown"
# Test case 3: Empty conditions list
resource = {"status": {"conditions": []}}
assert get_deployment_status(resource) == "unknown"
# Test case 4: No status field
resource = {}
assert get_deployment_status(resource) == "unknown"
# Test case 5: No conditions field in status
resource = {"status": {}}
assert get_deployment_status(resource) == "unknown"
# Test case 6: Ready condition present without message
resource = {"status": {"conditions": [{"type": "Ready"}]}}
assert get_deployment_status(resource) == "unknown"
def test_get_urls():
resource = {
"status": {
"conditions": [
{"type": "EndpointExposed", "message": "https://example.com"}
]
}
}
assert get_urls(resource) == ["https://example.com"]
...@@ -77,6 +77,13 @@ class DeploymentFullSchema(DeploymentSchema): ...@@ -77,6 +77,13 @@ class DeploymentFullSchema(DeploymentSchema):
urls: List[str] = Field(default_factory=list) urls: List[str] = Field(default_factory=list)
class DeploymentListResponse(BaseModel):
start: int
count: int
total: int
items: List[DeploymentFullSchema]
def create_default_user() -> UserSchema: def create_default_user() -> UserSchema:
"""Create a default user schema for testing/demo purposes.""" """Create a default user schema for testing/demo purposes."""
return UserSchema( return UserSchema(
......
...@@ -16,8 +16,6 @@ ...@@ -16,8 +16,6 @@
dynamo-operator: dynamo-operator:
natsAddr: "nats://${RELEASE_NAME}-nats:4222" natsAddr: "nats://${RELEASE_NAME}-nats:4222"
etcdAddr: "${RELEASE_NAME}-etcd:2379" etcdAddr: "${RELEASE_NAME}-etcd:2379"
istioVirtualServiceEnabled: false
ingressControllerClassName: ""
namespaceRestriction: namespaceRestriction:
targetNamespace: ${NAMESPACE} targetNamespace: ${NAMESPACE}
controllerManager: controllerManager:
...@@ -29,7 +27,14 @@ dynamo-operator: ...@@ -29,7 +27,14 @@ dynamo-operator:
- name: ${DOCKER_SECRET_NAME} - name: ${DOCKER_SECRET_NAME}
dynamo: dynamo:
dynamoIngressSuffix: ${DYNAMO_INGRESS_SUFFIX} ingress:
enabled: false
className: nginx
tlsSecretName: my-tls-secret
istio:
enabled: false
gateway: istio-system/ingress-alb
ingressHostSuffix: ${DYNAMO_INGRESS_SUFFIX}
dockerRegistry: dockerRegistry:
server: ${PIPELINES_DOCKER_SERVER} server: ${PIPELINES_DOCKER_SERVER}
username: ${PIPELINES_DOCKER_USERNAME} username: ${PIPELINES_DOCKER_USERNAME}
......
...@@ -23,7 +23,7 @@ version: 25.2.0-rc3 ...@@ -23,7 +23,7 @@ version: 25.2.0-rc3
home: https://nvidia.com home: https://nvidia.com
dependencies: dependencies:
- name: dynamo-operator - name: dynamo-operator
version: 0.1.3 version: 0.1.4
repository: file://components/operator repository: file://components/operator
condition: dynamo-operator.enabled condition: dynamo-operator.enabled
- name: dynamo-api-store - name: dynamo-api-store
......
...@@ -27,7 +27,7 @@ type: application ...@@ -27,7 +27,7 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes # This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version. # to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/) # Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.1.3 version: 0.1.4
# This is the version number of the application being deployed. This version number should be # This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to # incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using. # follow Semantic Versioning. They should reflect the version the application is using.
...@@ -35,5 +35,5 @@ version: 0.1.3 ...@@ -35,5 +35,5 @@ version: 0.1.3
appVersion: "0.1.0" appVersion: "0.1.0"
dependencies: dependencies:
- name: dynamo-crds - name: dynamo-crds
version: 0.1.2 version: 0.1.3
repository: file://charts/dynamo-crds repository: file://charts/dynamo-crds
\ No newline at end of file
...@@ -16,5 +16,5 @@ apiVersion: v2 ...@@ -16,5 +16,5 @@ apiVersion: v2
name: dynamo-crds name: dynamo-crds
description: A Helm chart for CRDs of dynamo operator description: A Helm chart for CRDs of dynamo operator
type: application type: application
version: 0.1.2 version: 0.1.3
dependencies: [] dependencies: []
\ No newline at end of file
...@@ -1748,8 +1748,14 @@ spec: ...@@ -1748,8 +1748,14 @@ spec:
type: object type: object
enabled: enabled:
type: boolean type: boolean
host:
type: string
hostPrefix: hostPrefix:
type: string type: string
hostSuffix:
type: string
ingressControllerClassName:
type: string
labels: labels:
additionalProperties: additionalProperties:
type: string type: string
...@@ -1761,6 +1767,8 @@ spec: ...@@ -1761,6 +1767,8 @@ spec:
type: object type: object
useVirtualService: useVirtualService:
type: boolean type: boolean
virtualServiceGateway:
type: string
type: object type: object
labels: labels:
additionalProperties: additionalProperties:
......
...@@ -1693,8 +1693,14 @@ spec: ...@@ -1693,8 +1693,14 @@ spec:
type: object type: object
enabled: enabled:
type: boolean type: boolean
host:
type: string
hostPrefix: hostPrefix:
type: string type: string
hostSuffix:
type: string
ingressControllerClassName:
type: string
labels: labels:
additionalProperties: additionalProperties:
type: string type: string
...@@ -1706,6 +1712,8 @@ spec: ...@@ -1706,6 +1712,8 @@ spec:
type: object type: object
useVirtualService: useVirtualService:
type: boolean type: boolean
virtualServiceGateway:
type: string
type: object type: object
labels: labels:
additionalProperties: additionalProperties:
......
...@@ -72,12 +72,21 @@ spec: ...@@ -72,12 +72,21 @@ spec:
{{- if .Values.etcdAddr }} {{- if .Values.etcdAddr }}
- --etcdAddr={{ .Values.etcdAddr }} - --etcdAddr={{ .Values.etcdAddr }}
{{- end }} {{- end }}
{{- if .Values.istioVirtualServiceEnabled }} {{- if and .Values.dynamo.istio.enabled .Values.dynamo.istio.gateway }}
- --istio-virtual-service-enabled - --istio-virtual-service-gateway={{ .Values.dynamo.istio.gateway }}
{{- end }} {{- end }}
{{- if .Values.ingressControllerClassName }} {{- if .Values.dynamo.ingress.enabled }}
- --ingress-controller-class-name={{ .Values.ingressControllerClassName }} {{- if .Values.dynamo.ingress.className }}
- --ingress-controller-class-name={{ .Values.dynamo.ingress.className }}
{{- end }} {{- end }}
{{- if .Values.dynamo.ingress.tlsSecretName }}
- --ingress-controller-tls-secret-name={{ .Values.dynamo.ingress.tlsSecretName }}
{{- end }}
{{- end }}
{{- if .Values.dynamo.ingressHostSuffix }}
- --ingress-host-suffix={{ .Values.dynamo.ingressHostSuffix }}
{{- end }}
command: command:
- /manager - /manager
env: env:
...@@ -109,4 +118,4 @@ spec: ...@@ -109,4 +118,4 @@ spec:
securityContext: securityContext:
runAsNonRoot: true runAsNonRoot: true
serviceAccountName: {{ include "dynamo-operator.fullname" . }}-controller-manager serviceAccountName: {{ include "dynamo-operator.fullname" . }}-controller-manager
terminationGracePeriodSeconds: 10 terminationGracePeriodSeconds: 10
\ No newline at end of file
...@@ -20,6 +20,8 @@ ...@@ -20,6 +20,8 @@
package v1alpha1 package v1alpha1
import ( import (
"fmt"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
) )
...@@ -84,3 +86,33 @@ func (s *DynamoDeployment) GetSpec() any { ...@@ -84,3 +86,33 @@ func (s *DynamoDeployment) GetSpec() any {
func (s *DynamoDeployment) SetSpec(spec any) { func (s *DynamoDeployment) SetSpec(spec any) {
s.Spec = spec.(DynamoDeploymentSpec) s.Spec = spec.(DynamoDeploymentSpec)
} }
func (s *DynamoDeployment) SetEndpointStatus(isSecured bool, endpointHost string) {
protocol := "http"
if isSecured {
protocol = "https"
}
s.AddStatusCondition(metav1.Condition{
Type: "EndpointExposed",
Status: metav1.ConditionTrue,
Reason: "EndpointExposed",
Message: fmt.Sprintf("%s://%s", protocol, endpointHost),
LastTransitionTime: metav1.Now(),
})
}
func (s *DynamoDeployment) AddStatusCondition(condition metav1.Condition) {
if s.Status.Conditions == nil {
s.Status.Conditions = []metav1.Condition{}
}
// Check if condition with same type already exists
for i, existingCondition := range s.Status.Conditions {
if existingCondition.Type == condition.Type {
// Replace the existing condition
s.Status.Conditions[i] = condition
return
}
}
// If no matching condition found, append the new one
s.Status.Conditions = append(s.Status.Conditions, condition)
}
...@@ -96,12 +96,16 @@ type IngressTLSSpec struct { ...@@ -96,12 +96,16 @@ type IngressTLSSpec struct {
} }
type IngressSpec struct { type IngressSpec struct {
Enabled bool `json:"enabled,omitempty"` Enabled bool `json:"enabled,omitempty"`
UseVirtualService *bool `json:"useVirtualService,omitempty"` Host string `json:"host,omitempty"`
HostPrefix *string `json:"hostPrefix,omitempty"` UseVirtualService bool `json:"useVirtualService,omitempty"`
Annotations map[string]string `json:"annotations,omitempty"` VirtualServiceGateway *string `json:"virtualServiceGateway,omitempty"`
Labels map[string]string `json:"labels,omitempty"` HostPrefix *string `json:"hostPrefix,omitempty"`
TLS *IngressTLSSpec `json:"tls,omitempty"` Annotations map[string]string `json:"annotations,omitempty"`
Labels map[string]string `json:"labels,omitempty"`
TLS *IngressTLSSpec `json:"tls,omitempty"`
HostSuffix *string `json:"hostSuffix,omitempty"`
IngressControllerClassName *string `json:"ingressControllerClassName,omitempty"`
} }
// DynamoNimDeploymentStatus defines the observed state of DynamoNimDeployment // DynamoNimDeploymentStatus defines the observed state of DynamoNimDeployment
......
//go:build !ignore_autogenerated //go:build !ignore_autogenerated
/* /*
* SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-FileCopyrightText: Copyright (c) 2022 Atalaya Tech, Inc SPDX-License-Identifier: Apache-2.0
* SPDX-License-Identifier: Apache-2.0
* Licensed under the Apache License, Version 2.0 (the "License");
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
* you may not use this file except in compliance with the License. You may obtain a copy of the License at
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and
* See the License for the specific language governing permissions and limitations under the License.
* limitations under the License. */
* Modifications Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES
*/ /*
Copyright 2024.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by controller-gen. DO NOT EDIT. // Code generated by controller-gen. DO NOT EDIT.
...@@ -755,9 +769,9 @@ func (in *ExternalService) DeepCopy() *ExternalService { ...@@ -755,9 +769,9 @@ func (in *ExternalService) DeepCopy() *ExternalService {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *IngressSpec) DeepCopyInto(out *IngressSpec) { func (in *IngressSpec) DeepCopyInto(out *IngressSpec) {
*out = *in *out = *in
if in.UseVirtualService != nil { if in.VirtualServiceGateway != nil {
in, out := &in.UseVirtualService, &out.UseVirtualService in, out := &in.VirtualServiceGateway, &out.VirtualServiceGateway
*out = new(bool) *out = new(string)
**out = **in **out = **in
} }
if in.HostPrefix != nil { if in.HostPrefix != nil {
...@@ -784,6 +798,16 @@ func (in *IngressSpec) DeepCopyInto(out *IngressSpec) { ...@@ -784,6 +798,16 @@ func (in *IngressSpec) DeepCopyInto(out *IngressSpec) {
*out = new(IngressTLSSpec) *out = new(IngressTLSSpec)
**out = **in **out = **in
} }
if in.HostSuffix != nil {
in, out := &in.HostSuffix, &out.HostSuffix
*out = new(string)
**out = **in
}
if in.IngressControllerClassName != nil {
in, out := &in.IngressControllerClassName, &out.IngressControllerClassName
*out = new(string)
**out = **in
}
} }
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new IngressSpec. // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new IngressSpec.
......
...@@ -70,8 +70,10 @@ func main() { ...@@ -70,8 +70,10 @@ func main() {
var leaderElectionID string var leaderElectionID string
var natsAddr string var natsAddr string
var etcdAddr string var etcdAddr string
var istioVirtualServiceEnabled bool var istioVirtualServiceGateway string
var ingressControllerClassName string var ingressControllerClassName string
var ingressControllerTLSSecretName string
var ingressHostSuffix string
flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.") flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.") flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
flag.BoolVar(&enableLeaderElection, "leader-elect", false, flag.BoolVar(&enableLeaderElection, "leader-elect", false,
...@@ -87,10 +89,14 @@ func main() { ...@@ -87,10 +89,14 @@ func main() {
"Id to use for the leader election.") "Id to use for the leader election.")
flag.StringVar(&natsAddr, "natsAddr", "", "address of the NATS server") flag.StringVar(&natsAddr, "natsAddr", "", "address of the NATS server")
flag.StringVar(&etcdAddr, "etcdAddr", "", "address of the etcd server") flag.StringVar(&etcdAddr, "etcdAddr", "", "address of the etcd server")
flag.BoolVar(&istioVirtualServiceEnabled, "istio-virtual-service-enabled", false, flag.StringVar(&istioVirtualServiceGateway, "istio-virtual-service-gateway", "",
"If set, the istio virtual service will be enabled for the ingress") "The name of the istio virtual service gateway to use")
flag.StringVar(&ingressControllerClassName, "ingress-controller-class-name", "", flag.StringVar(&ingressControllerClassName, "ingress-controller-class-name", "",
"The name of the ingress controller class to use") "The name of the ingress controller class to use")
flag.StringVar(&ingressControllerTLSSecretName, "ingress-controller-tls-secret-name", "",
"The name of the ingress controller TLS secret to use")
flag.StringVar(&ingressHostSuffix, "ingress-host-suffix", "",
"The suffix to use for the ingress host")
opts := zap.Options{ opts := zap.Options{
Development: true, Development: true,
} }
...@@ -171,15 +177,14 @@ func main() { ...@@ -171,15 +177,14 @@ func main() {
os.Exit(1) os.Exit(1)
} }
if err = (&controller.DynamoNimDeploymentReconciler{ if err = (&controller.DynamoNimDeploymentReconciler{
Client: mgr.GetClient(), Client: mgr.GetClient(),
Scheme: mgr.GetScheme(), Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("yatai-deployment"), Recorder: mgr.GetEventRecorderFor("yatai-deployment"),
Config: ctrlConfig, Config: ctrlConfig,
NatsAddr: natsAddr, NatsAddr: natsAddr,
EtcdAddr: etcdAddr, EtcdAddr: etcdAddr,
EtcdStorage: etcd.NewStorage(cli), EtcdStorage: etcd.NewStorage(cli),
IstioVirtualServiceEnabled: istioVirtualServiceEnabled, UseVirtualService: istioVirtualServiceGateway != "",
IngressControllerClassName: ingressControllerClassName,
}).SetupWithManager(mgr); err != nil { }).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "DynamoNimDeployment") setupLog.Error(err, "unable to create controller", "controller", "DynamoNimDeployment")
os.Exit(1) os.Exit(1)
...@@ -194,10 +199,14 @@ func main() { ...@@ -194,10 +199,14 @@ func main() {
os.Exit(1) os.Exit(1)
} }
if err = (&controller.DynamoDeploymentReconciler{ if err = (&controller.DynamoDeploymentReconciler{
Client: mgr.GetClient(), Client: mgr.GetClient(),
Scheme: mgr.GetScheme(), Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("dynamodeployment"), Recorder: mgr.GetEventRecorderFor("dynamodeployment"),
Config: ctrlConfig, Config: ctrlConfig,
VirtualServiceGateway: istioVirtualServiceGateway,
IngressControllerClassName: ingressControllerClassName,
IngressControllerTLSSecret: ingressControllerTLSSecretName,
IngressHostSuffix: ingressHostSuffix,
}).SetupWithManager(mgr); err != nil { }).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "DynamoDeployment") setupLog.Error(err, "unable to create controller", "controller", "DynamoDeployment")
os.Exit(1) os.Exit(1)
......
...@@ -1748,8 +1748,14 @@ spec: ...@@ -1748,8 +1748,14 @@ spec:
type: object type: object
enabled: enabled:
type: boolean type: boolean
host:
type: string
hostPrefix: hostPrefix:
type: string type: string
hostSuffix:
type: string
ingressControllerClassName:
type: string
labels: labels:
additionalProperties: additionalProperties:
type: string type: string
...@@ -1761,6 +1767,8 @@ spec: ...@@ -1761,6 +1767,8 @@ spec:
type: object type: object
useVirtualService: useVirtualService:
type: boolean type: boolean
virtualServiceGateway:
type: string
type: object type: object
labels: labels:
additionalProperties: additionalProperties:
......
...@@ -1693,8 +1693,14 @@ spec: ...@@ -1693,8 +1693,14 @@ spec:
type: object type: object
enabled: enabled:
type: boolean type: boolean
host:
type: string
hostPrefix: hostPrefix:
type: string type: string
hostSuffix:
type: string
ingressControllerClassName:
type: string
labels: labels:
additionalProperties: additionalProperties:
type: string type: string
...@@ -1706,6 +1712,8 @@ spec: ...@@ -1706,6 +1712,8 @@ spec:
type: object type: object
useVirtualService: useVirtualService:
type: boolean type: boolean
virtualServiceGateway:
type: string
type: object type: object
labels: labels:
additionalProperties: additionalProperties:
......
...@@ -18,6 +18,8 @@ ...@@ -18,6 +18,8 @@
package controller package controller
import ( import (
"fmt"
"github.com/ai-dynamo/dynamo/deploy/dynamo/operator/api/v1alpha1" "github.com/ai-dynamo/dynamo/deploy/dynamo/operator/api/v1alpha1"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
...@@ -76,3 +78,15 @@ func getPvcName(crd metav1.Object, defaultName *string) string { ...@@ -76,3 +78,15 @@ func getPvcName(crd metav1.Object, defaultName *string) string {
} }
return crd.GetName() return crd.GetName()
} }
func getIngressHost(ingressSpec v1alpha1.IngressSpec) string {
host := ingressSpec.Host
if ingressSpec.HostPrefix != nil {
host = *ingressSpec.HostPrefix + host
}
ingressSuffix := DefaultIngressSuffix
if ingressSpec.HostSuffix != nil {
ingressSuffix = *ingressSpec.HostSuffix
}
return fmt.Sprintf("%s.%s", host, ingressSuffix)
}
...@@ -53,9 +53,13 @@ type etcdStorage interface { ...@@ -53,9 +53,13 @@ type etcdStorage interface {
// DynamoDeploymentReconciler reconciles a DynamoDeployment object // DynamoDeploymentReconciler reconciles a DynamoDeployment object
type DynamoDeploymentReconciler struct { type DynamoDeploymentReconciler struct {
client.Client client.Client
Scheme *runtime.Scheme Scheme *runtime.Scheme
Config commonController.Config Config commonController.Config
Recorder record.EventRecorder Recorder record.EventRecorder
VirtualServiceGateway string
IngressControllerClassName string
IngressControllerTLSSecret string
IngressHostSuffix string
} }
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamodeployments,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=nvidia.com,resources=dynamodeployments,verbs=get;list;watch;create;update;patch;delete
...@@ -94,15 +98,13 @@ func (r *DynamoDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Req ...@@ -94,15 +98,13 @@ func (r *DynamoDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Req
message = err.Error() message = err.Error()
} }
// update the CRD status condition // update the CRD status condition
dynamoDeployment.Status.Conditions = []metav1.Condition{ dynamoDeployment.AddStatusCondition(metav1.Condition{
{ Type: "Ready",
Type: "Ready", Status: readyStatus,
Status: readyStatus, Reason: reason,
Reason: reason, Message: message,
Message: message, LastTransitionTime: metav1.Now(),
LastTransitionTime: metav1.Now(), })
},
}
err = r.Status().Update(ctx, dynamoDeployment) err = r.Status().Update(ctx, dynamoDeployment)
if err != nil { if err != nil {
logger.Error(err, "Unable to update the CRD status", "crd", req.NamespacedName) logger.Error(err, "Unable to update the CRD status", "crd", req.NamespacedName)
...@@ -127,7 +129,7 @@ func (r *DynamoDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Req ...@@ -127,7 +129,7 @@ func (r *DynamoDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Req
} }
// generate the DynamoNimDeployments from the config // generate the DynamoNimDeployments from the config
dynamoNimDeployments, err := nim.GenerateDynamoNIMDeployments(ctx, dynamoDeployment, dynamoNIMConfig) dynamoNimDeployments, err := nim.GenerateDynamoNIMDeployments(ctx, dynamoDeployment, dynamoNIMConfig, r.generateDefaultIngressSpec(dynamoDeployment))
if err != nil { if err != nil {
reason = "failed_to_generate_the_DynamoNimDeployments" reason = "failed_to_generate_the_DynamoNimDeployments"
return ctrl.Result{}, err return ctrl.Result{}, err
...@@ -142,6 +144,9 @@ func (r *DynamoDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Req ...@@ -142,6 +144,9 @@ func (r *DynamoDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Req
return ctrl.Result{}, err return ctrl.Result{}, err
} }
} }
if deployment.Spec.Ingress.Enabled {
dynamoDeployment.SetEndpointStatus((r.isEndpointSecured()), getIngressHost(deployment.Spec.Ingress))
}
} }
// Set common env vars on each of the dynamoNimDeployments // Set common env vars on each of the dynamoNimDeployments
...@@ -203,6 +208,33 @@ func (r *DynamoDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Req ...@@ -203,6 +208,33 @@ func (r *DynamoDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Req
} }
func (r *DynamoDeploymentReconciler) generateDefaultIngressSpec(dynamoDeployment *nvidiacomv1alpha1.DynamoDeployment) *nvidiacomv1alpha1.IngressSpec {
res := &nvidiacomv1alpha1.IngressSpec{
Enabled: r.VirtualServiceGateway != "" || r.IngressControllerClassName != "",
Host: dynamoDeployment.Name,
UseVirtualService: r.VirtualServiceGateway != "",
}
if r.IngressControllerClassName != "" {
res.IngressControllerClassName = &r.IngressControllerClassName
}
if r.IngressControllerTLSSecret != "" {
res.TLS = &nvidiacomv1alpha1.IngressTLSSpec{
SecretName: r.IngressControllerTLSSecret,
}
}
if r.IngressHostSuffix != "" {
res.HostSuffix = &r.IngressHostSuffix
}
if r.VirtualServiceGateway != "" {
res.VirtualServiceGateway = &r.VirtualServiceGateway
}
return res
}
func (r *DynamoDeploymentReconciler) isEndpointSecured() bool {
return r.IngressControllerTLSSecret != ""
}
func mergeEnvs(common, specific []corev1.EnvVar) []corev1.EnvVar { func mergeEnvs(common, specific []corev1.EnvVar) []corev1.EnvVar {
envMap := make(map[string]corev1.EnvVar) envMap := make(map[string]corev1.EnvVar)
......
...@@ -93,14 +93,13 @@ var ServicePortHTTPNonProxy = commonconsts.BentoServicePort + 1 ...@@ -93,14 +93,13 @@ var ServicePortHTTPNonProxy = commonconsts.BentoServicePort + 1
// DynamoNimDeploymentReconciler reconciles a DynamoNimDeployment object // DynamoNimDeploymentReconciler reconciles a DynamoNimDeployment object
type DynamoNimDeploymentReconciler struct { type DynamoNimDeploymentReconciler struct {
client.Client client.Client
Scheme *runtime.Scheme Scheme *runtime.Scheme
Recorder record.EventRecorder Recorder record.EventRecorder
Config controller_common.Config Config controller_common.Config
NatsAddr string NatsAddr string
EtcdAddr string EtcdAddr string
EtcdStorage etcdStorage EtcdStorage etcdStorage
IngressControllerClassName string UseVirtualService bool
IstioVirtualServiceEnabled bool
} }
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamonimdeployments,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=nvidia.com,resources=dynamonimdeployments,verbs=get;list;watch;create;update;patch;delete
...@@ -818,16 +817,17 @@ func (r *DynamoNimDeploymentReconciler) generateIngress(ctx context.Context, opt ...@@ -818,16 +817,17 @@ func (r *DynamoNimDeploymentReconciler) generateIngress(ctx context.Context, opt
}, },
} }
if !opt.dynamoNimDeployment.Spec.Ingress.Enabled || r.IngressControllerClassName == "" { if !opt.dynamoNimDeployment.Spec.Ingress.Enabled || opt.dynamoNimDeployment.Spec.Ingress.IngressControllerClassName == nil {
log.Info("Ingress is not enabled") log.Info("Ingress is not enabled")
return ingress, true, nil return ingress, true, nil
} }
host := getIngressHost(opt.dynamoNimDeployment.Spec.Ingress)
ingress.Spec = networkingv1.IngressSpec{ ingress.Spec = networkingv1.IngressSpec{
IngressClassName: &r.IngressControllerClassName, IngressClassName: opt.dynamoNimDeployment.Spec.Ingress.IngressControllerClassName,
Rules: []networkingv1.IngressRule{ Rules: []networkingv1.IngressRule{
{ {
Host: getIngressHost(opt.dynamoNimDeployment), Host: host,
IngressRuleValue: networkingv1.IngressRuleValue{ IngressRuleValue: networkingv1.IngressRuleValue{
HTTP: &networkingv1.HTTPIngressRuleValue{ HTTP: &networkingv1.HTTPIngressRuleValue{
Paths: []networkingv1.HTTPIngressPath{ Paths: []networkingv1.HTTPIngressPath{
...@@ -850,6 +850,15 @@ func (r *DynamoNimDeploymentReconciler) generateIngress(ctx context.Context, opt ...@@ -850,6 +850,15 @@ func (r *DynamoNimDeploymentReconciler) generateIngress(ctx context.Context, opt
}, },
} }
if opt.dynamoNimDeployment.Spec.Ingress.TLS != nil {
ingress.Spec.TLS = []networkingv1.IngressTLS{
{
Hosts: []string{host},
SecretName: opt.dynamoNimDeployment.Spec.Ingress.TLS.SecretName,
},
}
}
return ingress, false, nil return ingress, false, nil
} }
...@@ -864,7 +873,7 @@ func (r *DynamoNimDeploymentReconciler) generateVirtualService(ctx context.Conte ...@@ -864,7 +873,7 @@ func (r *DynamoNimDeploymentReconciler) generateVirtualService(ctx context.Conte
}, },
} }
vsEnabled := opt.dynamoNimDeployment.Spec.Ingress.Enabled && r.IstioVirtualServiceEnabled vsEnabled := opt.dynamoNimDeployment.Spec.Ingress.Enabled && opt.dynamoNimDeployment.Spec.Ingress.UseVirtualService && opt.dynamoNimDeployment.Spec.Ingress.VirtualServiceGateway != nil
if !vsEnabled { if !vsEnabled {
log.Info("VirtualService is not enabled") log.Info("VirtualService is not enabled")
return vs, true, nil return vs, true, nil
...@@ -872,9 +881,9 @@ func (r *DynamoNimDeploymentReconciler) generateVirtualService(ctx context.Conte ...@@ -872,9 +881,9 @@ func (r *DynamoNimDeploymentReconciler) generateVirtualService(ctx context.Conte
vs.Spec = istioNetworking.VirtualService{ vs.Spec = istioNetworking.VirtualService{
Hosts: []string{ Hosts: []string{
getIngressHost(opt.dynamoNimDeployment), getIngressHost(opt.dynamoNimDeployment.Spec.Ingress),
}, },
Gateways: []string{"istio-system/ingress-alb"}, Gateways: []string{*opt.dynamoNimDeployment.Spec.Ingress.VirtualServiceGateway},
Http: []*istioNetworking.HTTPRoute{ Http: []*istioNetworking.HTTPRoute{
{ {
Match: []*istioNetworking.HTTPMatchRequest{ Match: []*istioNetworking.HTTPMatchRequest{
...@@ -1058,18 +1067,6 @@ type generateResourceOption struct { ...@@ -1058,18 +1067,6 @@ type generateResourceOption struct {
isGenericService bool isGenericService bool
} }
func getIngressHost(dynamoNimDeployment *v1alpha1.DynamoNimDeployment) string {
vsName := dynamoNimDeployment.Name
if dynamoNimDeployment.Spec.Ingress.HostPrefix != nil {
vsName = *dynamoNimDeployment.Spec.Ingress.HostPrefix + vsName
}
ingressSuffix, found := os.LookupEnv("DYNAMO_INGRESS_SUFFIX")
if !found || ingressSuffix == "" {
ingressSuffix = DefaultIngressSuffix
}
return fmt.Sprintf("%s.%s", vsName, ingressSuffix)
}
func (r *DynamoNimDeploymentReconciler) generateHPA(ctx context.Context, opt generateResourceOption) (*autoscalingv2.HorizontalPodAutoscaler, bool, error) { func (r *DynamoNimDeploymentReconciler) generateHPA(ctx context.Context, opt generateResourceOption) (*autoscalingv2.HorizontalPodAutoscaler, bool, error) {
labels := r.getKubeLabels(opt.dynamoNimDeployment, opt.dynamoNim) labels := r.getKubeLabels(opt.dynamoNimDeployment, opt.dynamoNim)
...@@ -2161,7 +2158,7 @@ func (r *DynamoNimDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error ...@@ -2161,7 +2158,7 @@ func (r *DynamoNimDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error
return reqs return reqs
})) }))
if r.IstioVirtualServiceEnabled { if r.UseVirtualService {
m.Owns(&networkingv1beta1.VirtualService{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})) m.Owns(&networkingv1beta1.VirtualService{}, builder.WithPredicates(predicate.GenerationChangedPredicate{}))
} }
m.Owns(&autoscalingv2.HorizontalPodAutoscaler{}) m.Owns(&autoscalingv2.HorizontalPodAutoscaler{})
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment