Commit 044e12e1 authored by julienmancuso's avatar julienmancuso Committed by GitHub
Browse files

fix: fix dynamoNimDeployment status (#551)

parent 0ff6f53d
......@@ -89,7 +89,7 @@ envsubst '${NAMESPACE} ${NGC_TOKEN} ${CI_COMMIT_SHA} ${RELEASE_NAME} ${DYNAMO_IN
echo ""
echo "Generated values file saved as generated-values.yaml"
Build dependencies before installation
# Build dependencies before installation
echo "Building helm dependencies..."
cd platform
retry_command "$HELM_CMD dep build" 5 5
......
......@@ -19,6 +19,7 @@ package controller
import (
"context"
"fmt"
"strings"
"dario.cat/mergo"
......@@ -70,6 +71,7 @@ func (r *DynamoDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Req
var err error
reason := "undefined"
message := ""
readyStatus := metav1.ConditionFalse
// retrieve the CRD
dynamoDeployment := &nvidiacomv1alpha1.DynamoDeployment{}
......@@ -82,7 +84,6 @@ func (r *DynamoDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Req
}
defer func() {
message := ""
if err != nil {
dynamoDeployment.SetState(FailedState)
message = err.Error()
......@@ -149,7 +150,7 @@ func (r *DynamoDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Req
return ctrl.Result{}, err
}
allAreReady := true
notReadyDeployments := []string{}
// reconcile the DynamoNimDeployments
for serviceName, dynamoNimDeployment := range dynamoNimDeployments {
logger.Info("Reconciling the DynamoNimDeployment", "serviceName", serviceName, "dynamoNimDeployment", dynamoNimDeployment)
......@@ -163,13 +164,17 @@ func (r *DynamoDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Req
return ctrl.Result{}, err
}
if !dynamoNimDeployment.Status.IsReady() {
allAreReady = false
notReadyDeployments = append(notReadyDeployments, dynamoNimDeployment.Name)
}
}
if allAreReady {
if len(notReadyDeployments) == 0 {
dynamoDeployment.SetState(ReadyState)
reason = "all_deployments_are_ready"
message = "All deployments are ready"
readyStatus = metav1.ConditionTrue
} else {
reason = "some_deployments_are_not_ready"
message = fmt.Sprintf("The following deployments are not ready: %v", notReadyDeployments)
dynamoDeployment.SetState(PendingState)
}
......
......@@ -23,6 +23,7 @@ import (
"context"
"fmt"
"os"
"reflect"
"sort"
"strconv"
"strings"
......@@ -37,7 +38,6 @@ import (
"emperror.dev/errors"
dynamoCommon "github.com/ai-dynamo/dynamo/deploy/dynamo/operator/api/dynamo/common"
"github.com/ai-dynamo/dynamo/deploy/dynamo/operator/api/dynamo/schemas"
yataiclient "github.com/ai-dynamo/dynamo/deploy/dynamo/operator/api/dynamo/yatai-client"
"github.com/ai-dynamo/dynamo/deploy/dynamo/operator/api/v1alpha1"
commonconfig "github.com/ai-dynamo/dynamo/deploy/dynamo/operator/internal/config"
commonconsts "github.com/ai-dynamo/dynamo/deploy/dynamo/operator/internal/consts"
......@@ -45,7 +45,6 @@ import (
"github.com/ai-dynamo/dynamo/deploy/dynamo/operator/internal/envoy"
"github.com/cisco-open/k8s-objectmatcher/patch"
"github.com/huandu/xstrings"
"github.com/jinzhu/copier"
istioNetworking "istio.io/api/networking/v1beta1"
networkingv1beta1 "istio.io/client-go/pkg/apis/networking/v1beta1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
......@@ -59,7 +58,9 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
......@@ -188,12 +189,6 @@ func (r *DynamoNimDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.
}
}()
yataiClient, clusterName, err := r.getYataiClientWithAuth(ctx, dynamoNimDeployment)
if err != nil {
err = errors.Wrap(err, "get yatai client with auth")
return
}
dynamoNimFoundCondition := meta.FindStatusCondition(dynamoNimDeployment.Status.Conditions, v1alpha1.DynamoDeploymentConditionTypeDynamoNimFound)
if dynamoNimFoundCondition != nil && dynamoNimFoundCondition.Status == metav1.ConditionUnknown {
logs.Info(fmt.Sprintf("Getting Dynamo NIM %s", dynamoNimDeployment.Spec.DynamoNim))
......@@ -335,11 +330,9 @@ func (r *DynamoNimDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.
}
// create or update api-server deployment
modified_, err := r.createOrUpdateOrDeleteDeployments(ctx, createOrUpdateOrDeleteDeploymentsOption{
yataiClient: yataiClient,
modified_, deployment, err := r.createOrUpdateOrDeleteDeployments(ctx, generateResourceOption{
dynamoNimDeployment: dynamoNimDeployment,
dynamoNim: dynamoNimCR,
clusterName: clusterName,
})
if err != nil {
return
......@@ -350,7 +343,10 @@ func (r *DynamoNimDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.
}
// create or update api-server hpa
modified_, err = r.createOrUpdateHPA(ctx, dynamoNimDeployment, dynamoNimCR)
modified_, _, err = createOrUpdateResource(ctx, r, generateResourceOption{
dynamoNimDeployment: dynamoNimDeployment,
dynamoNim: dynamoNimCR,
}, r.generateHPA)
if err != nil {
return
}
......@@ -360,7 +356,7 @@ func (r *DynamoNimDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.
}
// create or update api-server service
modified_, err = r.createOrUpdateOrDeleteServices(ctx, createOrUpdateOrDeleteServicesOption{
modified_, err = r.createOrUpdateOrDeleteServices(ctx, generateResourceOption{
dynamoNimDeployment: dynamoNimDeployment,
dynamoNim: dynamoNimCR,
})
......@@ -373,11 +369,10 @@ func (r *DynamoNimDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.
}
// create or update api-server ingresses
modified_, err = r.createOrUpdateIngresses(ctx, createOrUpdateIngressOption{
yataiClient: yataiClient,
modified_, _, err = createOrUpdateResource(ctx, r, generateResourceOption{
dynamoNimDeployment: dynamoNimDeployment,
dynamoNim: dynamoNimCR,
})
}, r.generateVirtualService)
if err != nil {
return
}
......@@ -392,15 +387,76 @@ func (r *DynamoNimDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.
logs.Info("Finished reconciling.")
r.Recorder.Eventf(dynamoNimDeployment, corev1.EventTypeNormal, "Update", "All resources updated!")
dynamoNimDeployment, err = r.setStatusConditions(ctx, req,
err = r.computeAvailableStatusCondition(ctx, req, deployment)
return
}
func (r *DynamoNimDeploymentReconciler) computeAvailableStatusCondition(ctx context.Context, req ctrl.Request, deployment *appsv1.Deployment) error {
logs := log.FromContext(ctx)
if IsDeploymentReady(deployment) {
logs.Info("Deployment is ready. Setting available status condition to true.")
_, err := r.setStatusConditions(ctx, req,
metav1.Condition{
Type: v1alpha1.DynamoDeploymentConditionTypeAvailable,
Status: metav1.ConditionTrue,
Reason: "Reconciling",
Message: "Reconciling",
Reason: "DeploymentReady",
Message: "Deployment is ready",
},
)
return
return err
} else {
logs.Info("Deployment is not ready. Setting available status condition to false.")
_, err := r.setStatusConditions(ctx, req,
metav1.Condition{
Type: v1alpha1.DynamoDeploymentConditionTypeAvailable,
Status: metav1.ConditionFalse,
Reason: "DeploymentNotReady",
Message: "Deployment is not ready",
},
)
return err
}
}
// IsDeploymentReady determines if a Kubernetes Deployment is fully ready and available.
// It checks various status fields to ensure all replicas are available and the deployment
// configuration has been fully applied.
func IsDeploymentReady(deployment *appsv1.Deployment) bool {
if deployment == nil {
return false
}
// Paused deployments should not be considered ready
if deployment.Spec.Paused {
return false
}
// Default to 1 replica if not specified
desiredReplicas := int32(1)
if deployment.Spec.Replicas != nil {
desiredReplicas = *deployment.Spec.Replicas
}
// Special case: if no replicas are desired, the deployment is considered ready
if desiredReplicas == 0 {
return true
}
status := deployment.Status
// Check all basic status requirements:
// 1. ObservedGeneration: Deployment controller has observed the latest configuration
// 2. UpdatedReplicas: All replicas have been updated to the latest version
// 3. AvailableReplicas: All desired replicas are available (schedulable and healthy)
if status.ObservedGeneration < deployment.Generation ||
status.UpdatedReplicas < desiredReplicas ||
status.AvailableReplicas < desiredReplicas {
return false
}
// Finally, check for the DeploymentAvailable condition
// This is Kubernetes' own assessment that the deployment is available
for _, cond := range deployment.Status.Conditions {
if cond.Type == appsv1.DeploymentAvailable && cond.Status == corev1.ConditionTrue {
return true
}
}
// If we get here, the basic checks passed but the Available condition wasn't found
return false
}
func (r *DynamoNimDeploymentReconciler) reconcilePVC(ctx context.Context, crd *v1alpha1.DynamoNimDeployment) (*corev1.PersistentVolumeClaim, error) {
......@@ -440,7 +496,8 @@ func (r *DynamoNimDeploymentReconciler) reconcilePVC(ctx context.Context, crd *v
func (r *DynamoNimDeploymentReconciler) setStatusConditions(ctx context.Context, req ctrl.Request, conditions ...metav1.Condition) (dynamoNimDeployment *v1alpha1.DynamoNimDeployment, err error) {
dynamoNimDeployment = &v1alpha1.DynamoNimDeployment{}
for i := 0; i < 3; i++ {
maxRetries := 3
for range maxRetries - 1 {
if err = r.Get(ctx, req.NamespacedName, dynamoNimDeployment); err != nil {
err = errors.Wrap(err, "Failed to re-fetch DynamoNimDeployment")
return
......@@ -449,7 +506,11 @@ func (r *DynamoNimDeploymentReconciler) setStatusConditions(ctx context.Context,
meta.SetStatusCondition(&dynamoNimDeployment.Status.Conditions, condition)
}
if err = r.Status().Update(ctx, dynamoNimDeployment); err != nil {
if k8serrors.IsConflict(err) {
time.Sleep(100 * time.Millisecond)
continue
}
break
} else {
break
}
......@@ -465,295 +526,153 @@ func (r *DynamoNimDeploymentReconciler) setStatusConditions(ctx context.Context,
return
}
var cachedYataiConf *commonconfig.YataiConfig
//nolint:nakedret
func (r *DynamoNimDeploymentReconciler) getYataiClient(ctx context.Context) (yataiClient **yataiclient.YataiClient, clusterName *string, err error) {
var yataiConf *commonconfig.YataiConfig
if cachedYataiConf != nil {
yataiConf = cachedYataiConf
} else {
yataiConf, err = commonconfig.GetYataiConfig(ctx)
isNotFound := k8serrors.IsNotFound(err)
if err != nil && !isNotFound {
err = errors.Wrap(err, "get yatai config")
return
}
if isNotFound {
return
}
cachedYataiConf = yataiConf
}
yataiEndpoint := yataiConf.Endpoint
yataiAPIToken := yataiConf.ApiToken
if yataiEndpoint == "" {
return
}
clusterName_ := yataiConf.ClusterName
if clusterName_ == "" {
clusterName_ = DefaultClusterName
}
yataiClient_ := yataiclient.NewYataiClient(yataiEndpoint, fmt.Sprintf("%s:%s:%s", commonconsts.YataiDeploymentComponentName, clusterName_, yataiAPIToken))
yataiClient = &yataiClient_
clusterName = &clusterName_
return
}
func (r *DynamoNimDeploymentReconciler) getYataiClientWithAuth(ctx context.Context, dynamoNimDeployment *v1alpha1.DynamoNimDeployment) (**yataiclient.YataiClient, *string, error) {
orgId, ok := dynamoNimDeployment.Labels[commonconsts.NgcOrganizationHeaderName]
if !ok {
orgId = commonconsts.DefaultOrgId
}
userId, ok := dynamoNimDeployment.Labels[commonconsts.NgcUserHeaderName]
if !ok {
userId = commonconsts.DefaultUserId
}
auth := yataiclient.DynamoAuthHeaders{
OrgId: orgId,
UserId: userId,
}
client, clusterName, err := r.getYataiClient(ctx)
if err != nil {
return nil, nil, err
}
(*client).SetAuth(auth)
return client, clusterName, err
}
type createOrUpdateOrDeleteDeploymentsOption struct {
yataiClient **yataiclient.YataiClient
dynamoNimDeployment *v1alpha1.DynamoNimDeployment
dynamoNim *v1alpha1.DynamoNim
clusterName *string
}
//nolint:nakedret
func (r *DynamoNimDeploymentReconciler) createOrUpdateOrDeleteDeployments(ctx context.Context, opt createOrUpdateOrDeleteDeploymentsOption) (modified bool, err error) {
func (r *DynamoNimDeploymentReconciler) createOrUpdateOrDeleteDeployments(ctx context.Context, opt generateResourceOption) (modified bool, depl *appsv1.Deployment, err error) {
containsStealingTrafficDebugModeEnabled := checkIfContainsStealingTrafficDebugModeEnabled(opt.dynamoNimDeployment)
modified, err = r.createOrUpdateDeployment(ctx, createOrUpdateDeploymentOption{
createOrUpdateOrDeleteDeploymentsOption: opt,
// create the main deployment
modified, depl, err = createOrUpdateResource(ctx, r, generateResourceOption{
dynamoNimDeployment: opt.dynamoNimDeployment,
dynamoNim: opt.dynamoNim,
isStealingTrafficDebugModeEnabled: false,
containsStealingTrafficDebugModeEnabled: containsStealingTrafficDebugModeEnabled,
})
}, r.generateDeployment)
if err != nil {
err = errors.Wrap(err, "create or update deployment")
return
}
if containsStealingTrafficDebugModeEnabled {
modified, err = r.createOrUpdateDeployment(ctx, createOrUpdateDeploymentOption{
createOrUpdateOrDeleteDeploymentsOption: opt,
// create the debug deployment
modified2, _, err := createOrUpdateResource(ctx, r, generateResourceOption{
dynamoNimDeployment: opt.dynamoNimDeployment,
dynamoNim: opt.dynamoNim,
isStealingTrafficDebugModeEnabled: true,
containsStealingTrafficDebugModeEnabled: containsStealingTrafficDebugModeEnabled,
})
}, r.generateDeployment)
if err != nil {
err = errors.Wrap(err, "create or update deployment")
return
err = errors.Wrap(err, "create or update debug deployment")
}
} else {
debugDeploymentName := r.getKubeName(opt.dynamoNimDeployment, opt.dynamoNim, true)
debugDeployment := &appsv1.Deployment{}
err = r.Get(ctx, types.NamespacedName{Name: debugDeploymentName, Namespace: opt.dynamoNimDeployment.Namespace}, debugDeployment)
isNotFound := k8serrors.IsNotFound(err)
if err != nil && !isNotFound {
err = errors.Wrap(err, "get deployment")
modified = modified || modified2
return
}
err = nil
if !isNotFound {
err = r.Delete(ctx, debugDeployment)
if err != nil {
err = errors.Wrap(err, "delete deployment")
return
}
modified = true
}
}
return
}
type createOrUpdateDeploymentOption struct {
createOrUpdateOrDeleteDeploymentsOption
isStealingTrafficDebugModeEnabled bool
containsStealingTrafficDebugModeEnabled bool
}
//nolint:nakedret
func (r *DynamoNimDeploymentReconciler) createOrUpdateDeployment(ctx context.Context, opt createOrUpdateDeploymentOption) (modified bool, err error) {
func createOrUpdateResource[T client.Object](ctx context.Context, r *DynamoNimDeploymentReconciler, opt generateResourceOption, generateResource func(ctx context.Context, opt generateResourceOption) (T, bool, error)) (modified bool, res T, err error) {
logs := log.FromContext(ctx)
deployment, err := r.generateDeployment(ctx, generateDeploymentOption{
dynamoNimDeployment: opt.dynamoNimDeployment,
dynamoNim: opt.dynamoNim,
yataiClient: opt.yataiClient,
clusterName: opt.clusterName,
isStealingTrafficDebugModeEnabled: opt.isStealingTrafficDebugModeEnabled,
containsStealingTrafficDebugModeEnabled: opt.containsStealingTrafficDebugModeEnabled,
})
resource, toDelete, err := generateResource(ctx, opt)
if err != nil {
return
}
resourceNamespace := resource.GetNamespace()
resourceName := resource.GetName()
resourceType := reflect.TypeOf(resource).Elem().Name()
logs = logs.WithValues("namespace", resourceNamespace, "resourceName", resourceName, "resourceType", resourceType)
logs = logs.WithValues("namespace", deployment.Namespace, "deploymentName", deployment.Name)
deploymentNamespacedName := fmt.Sprintf("%s/%s", deployment.Namespace, deployment.Name)
r.Recorder.Eventf(opt.dynamoNimDeployment, corev1.EventTypeNormal, "GetDeployment", "Getting Deployment %s", deploymentNamespacedName)
oldDeployment := &appsv1.Deployment{}
err = r.Get(ctx, types.NamespacedName{Name: deployment.Name, Namespace: deployment.Namespace}, oldDeployment)
oldDeploymentIsNotFound := k8serrors.IsNotFound(err)
if err != nil && !oldDeploymentIsNotFound {
r.Recorder.Eventf(opt.dynamoNimDeployment, corev1.EventTypeWarning, "GetDeployment", "Failed to get Deployment %s: %s", deploymentNamespacedName, err)
logs.Error(err, "Failed to get Deployment.")
// Retrieve the GroupVersionKind (GVK) of the desired object
gvk, err := apiutil.GVKForObject(resource, r.Client.Scheme())
if err != nil {
logs.Error(err, "Failed to get GVK for object")
return
}
if oldDeploymentIsNotFound {
logs.Info("Deployment not found. Creating a new one.")
err = errors.Wrapf(patch.DefaultAnnotator.SetLastAppliedAnnotation(deployment), "set last applied annotation for deployment %s", deployment.Name)
// Create a new instance of the object
obj, err := r.Client.Scheme().New(gvk)
if err != nil {
logs.Error(err, "Failed to set last applied annotation.")
r.Recorder.Eventf(opt.dynamoNimDeployment, corev1.EventTypeWarning, "SetLastAppliedAnnotation", "Failed to set last applied annotation for Deployment %s: %s", deploymentNamespacedName, err)
logs.Error(err, "Failed to create a new object for GVK")
return
}
r.Recorder.Eventf(opt.dynamoNimDeployment, corev1.EventTypeNormal, "CreateDeployment", "Creating a new Deployment %s", deploymentNamespacedName)
err = r.Create(ctx, deployment)
if err != nil {
logs.Error(err, "Failed to create Deployment.")
r.Recorder.Eventf(opt.dynamoNimDeployment, corev1.EventTypeWarning, "CreateDeployment", "Failed to create Deployment %s: %s", deploymentNamespacedName, err)
// Type assertion to ensure the object implements client.Object
oldResource, ok := obj.(T)
if !ok {
return
}
logs.Info("Deployment created.")
r.Recorder.Eventf(opt.dynamoNimDeployment, corev1.EventTypeNormal, "CreateDeployment", "Created Deployment %s", deploymentNamespacedName)
modified = true
} else {
logs.Info("Deployment found.")
var patchResult *patch.PatchResult
patchResult, err = patch.DefaultPatchMaker.Calculate(oldDeployment, deployment)
if err != nil {
logs.Error(err, "Failed to calculate patch.")
r.Recorder.Eventf(opt.dynamoNimDeployment, corev1.EventTypeWarning, "CalculatePatch", "Failed to calculate patch for Deployment %s: %s", deploymentNamespacedName, err)
err = r.Get(ctx, types.NamespacedName{Name: resourceName, Namespace: resourceNamespace}, oldResource)
oldResourceIsNotFound := k8serrors.IsNotFound(err)
if err != nil && !oldResourceIsNotFound {
r.Recorder.Eventf(opt.dynamoNimDeployment, corev1.EventTypeWarning, fmt.Sprintf("Get%s", resourceType), "Failed to get %s %s: %s", resourceType, resourceNamespace, err)
logs.Error(err, "Failed to get HPA.")
return
}
err = nil
if !patchResult.IsEmpty() {
logs.Info("Deployment spec is different. Updating Deployment.")
if oldResourceIsNotFound {
if toDelete {
logs.Info("Resource not found. Nothing to do.")
return
}
logs.Info("Resource not found. Creating a new one.")
err = errors.Wrapf(patch.DefaultAnnotator.SetLastAppliedAnnotation(deployment), "set last applied annotation for deployment %s", deployment.Name)
err = errors.Wrapf(patch.DefaultAnnotator.SetLastAppliedAnnotation(resource), "set last applied annotation for resource %s", resourceName)
if err != nil {
logs.Error(err, "Failed to set last applied annotation.")
r.Recorder.Eventf(opt.dynamoNimDeployment, corev1.EventTypeWarning, "SetLastAppliedAnnotation", "Failed to set last applied annotation for Deployment %s: %s", deploymentNamespacedName, err)
r.Recorder.Eventf(opt.dynamoNimDeployment, corev1.EventTypeWarning, "SetLastAppliedAnnotation", "Failed to set last applied annotation for %s %s: %s", resourceType, resourceNamespace, err)
return
}
r.Recorder.Eventf(opt.dynamoNimDeployment, corev1.EventTypeNormal, "UpdateDeployment", "Updating Deployment %s", deploymentNamespacedName)
err = r.Update(ctx, deployment)
r.Recorder.Eventf(opt.dynamoNimDeployment, corev1.EventTypeNormal, fmt.Sprintf("Create%s", resourceType), "Creating a new %s %s", resourceType, resourceNamespace)
err = r.Create(ctx, resource)
if err != nil {
logs.Error(err, "Failed to update Deployment.")
r.Recorder.Eventf(opt.dynamoNimDeployment, corev1.EventTypeWarning, "UpdateDeployment", "Failed to update Deployment %s: %s", deploymentNamespacedName, err)
logs.Error(err, "Failed to create Resource.")
r.Recorder.Eventf(opt.dynamoNimDeployment, corev1.EventTypeWarning, fmt.Sprintf("Create%s", resourceType), "Failed to create %s %s: %s", resourceType, resourceNamespace, err)
return
}
logs.Info("Deployment updated.")
r.Recorder.Eventf(opt.dynamoNimDeployment, corev1.EventTypeNormal, "UpdateDeployment", "Updated Deployment %s", deploymentNamespacedName)
logs.Info(fmt.Sprintf("%s created.", resourceType))
r.Recorder.Eventf(opt.dynamoNimDeployment, corev1.EventTypeNormal, fmt.Sprintf("Create%s", resourceType), "Created %s %s", resourceType, resourceNamespace)
modified = true
res = resource
} else {
logs.Info("Deployment spec is the same. Skipping update.")
r.Recorder.Eventf(opt.dynamoNimDeployment, corev1.EventTypeNormal, "UpdateDeployment", "Skipping update Deployment %s", deploymentNamespacedName)
}
}
return
}
//nolint:nakedret
func (r *DynamoNimDeploymentReconciler) createOrUpdateHPA(ctx context.Context, dynamoNimDeployment *v1alpha1.DynamoNimDeployment, dynamoNim *v1alpha1.DynamoNim) (modified bool, err error) {
logs := log.FromContext(ctx)
hpa, err := r.generateHPA(dynamoNimDeployment, dynamoNim)
logs.Info(fmt.Sprintf("%s found.", resourceType))
if toDelete {
logs.Info(fmt.Sprintf("%s not found. Deleting the existing one.", resourceType))
err = r.Delete(ctx, oldResource)
if err != nil {
logs.Error(err, fmt.Sprintf("Failed to delete %s.", resourceType))
r.Recorder.Eventf(opt.dynamoNimDeployment, corev1.EventTypeWarning, fmt.Sprintf("Delete%s", resourceType), "Failed to delete %s %s: %s", resourceType, resourceNamespace, err)
return
}
logs = logs.WithValues("namespace", hpa.Namespace, "hpaName", hpa.Name)
hpaNamespacedName := fmt.Sprintf("%s/%s", hpa.Namespace, hpa.Name)
r.Recorder.Eventf(dynamoNimDeployment, corev1.EventTypeNormal, "GetHPA", "Getting HPA %s", hpaNamespacedName)
oldHPA, err := r.getHPA(ctx, hpa)
oldHPAIsNotFound := k8serrors.IsNotFound(err)
if err != nil && !oldHPAIsNotFound {
r.Recorder.Eventf(dynamoNimDeployment, corev1.EventTypeWarning, "GetHPA", "Failed to get HPA %s: %s", hpaNamespacedName, err)
logs.Error(err, "Failed to get HPA.")
return
}
if oldHPAIsNotFound {
logs.Info("HPA not found. Creating a new one.")
err = errors.Wrapf(patch.DefaultAnnotator.SetLastAppliedAnnotation(hpa), "set last applied annotation for hpa %s", hpa.Name)
if err != nil {
logs.Error(err, "Failed to set last applied annotation.")
r.Recorder.Eventf(dynamoNimDeployment, corev1.EventTypeWarning, "SetLastAppliedAnnotation", "Failed to set last applied annotation for HPA %s: %s", hpaNamespacedName, err)
return
}
r.Recorder.Eventf(dynamoNimDeployment, corev1.EventTypeNormal, "CreateHPA", "Creating a new HPA %s", hpaNamespacedName)
err = r.Create(ctx, hpa)
if err != nil {
logs.Error(err, "Failed to create HPA.")
r.Recorder.Eventf(dynamoNimDeployment, corev1.EventTypeWarning, "CreateHPA", "Failed to create HPA %s: %s", hpaNamespacedName, err)
logs.Info(fmt.Sprintf("%s deleted.", resourceType))
r.Recorder.Eventf(opt.dynamoNimDeployment, corev1.EventTypeNormal, fmt.Sprintf("Delete%s", resourceType), "Deleted %s %s", resourceType, resourceNamespace)
modified = true
return
}
logs.Info("HPA created.")
r.Recorder.Eventf(dynamoNimDeployment, corev1.EventTypeNormal, "CreateHPA", "Created HPA %s", hpaNamespacedName)
modified = true
} else {
logs.Info("HPA found.")
var patchResult *patch.PatchResult
patchResult, err = patch.DefaultPatchMaker.Calculate(oldHPA, hpa)
patchResult, err = patch.DefaultPatchMaker.Calculate(oldResource, resource)
if err != nil {
logs.Error(err, "Failed to calculate patch.")
r.Recorder.Eventf(dynamoNimDeployment, corev1.EventTypeWarning, "CalculatePatch", "Failed to calculate patch for HPA %s: %s", hpaNamespacedName, err)
r.Recorder.Eventf(opt.dynamoNimDeployment, corev1.EventTypeWarning, fmt.Sprintf("CalculatePatch%s", resourceType), "Failed to calculate patch for %s %s: %s", resourceType, resourceNamespace, err)
return
}
if !patchResult.IsEmpty() {
logs.Info(fmt.Sprintf("HPA spec is different. Updating HPA. The patch result is: %s", patchResult.String()))
logs.Info(fmt.Sprintf("%s spec is different. Updating %s. The patch result is: %s", resourceType, resourceType, patchResult.String()))
err = errors.Wrapf(patch.DefaultAnnotator.SetLastAppliedAnnotation(hpa), "set last applied annotation for hpa %s", hpa.Name)
err = errors.Wrapf(patch.DefaultAnnotator.SetLastAppliedAnnotation(resource), "set last applied annotation for resource %s", resourceName)
if err != nil {
logs.Error(err, "Failed to set last applied annotation.")
r.Recorder.Eventf(dynamoNimDeployment, corev1.EventTypeWarning, "SetLastAppliedAnnotation", "Failed to set last applied annotation for HPA %s: %s", hpaNamespacedName, err)
r.Recorder.Eventf(opt.dynamoNimDeployment, corev1.EventTypeWarning, fmt.Sprintf("SetLastAppliedAnnotation%s", resourceType), "Failed to set last applied annotation for %s %s: %s", resourceType, resourceNamespace, err)
return
}
r.Recorder.Eventf(dynamoNimDeployment, corev1.EventTypeNormal, "UpdateHPA", "Updating HPA %s", hpaNamespacedName)
err = r.Update(ctx, hpa)
r.Recorder.Eventf(opt.dynamoNimDeployment, corev1.EventTypeNormal, fmt.Sprintf("Update%s", resourceType), "Updating %s %s", resourceType, resourceNamespace)
resource.SetResourceVersion(oldResource.GetResourceVersion())
err = r.Update(ctx, resource)
if err != nil {
logs.Error(err, "Failed to update HPA.")
r.Recorder.Eventf(dynamoNimDeployment, corev1.EventTypeWarning, "UpdateHPA", "Failed to update HPA %s: %s", hpaNamespacedName, err)
logs.Error(err, fmt.Sprintf("Failed to update %s.", resourceType))
r.Recorder.Eventf(opt.dynamoNimDeployment, corev1.EventTypeWarning, fmt.Sprintf("Update%s", resourceType), "Failed to update %s %s: %s", resourceType, resourceNamespace, err)
return
}
logs.Info("HPA updated.")
r.Recorder.Eventf(dynamoNimDeployment, corev1.EventTypeNormal, "UpdateHPA", "Updated HPA %s", hpaNamespacedName)
logs.Info(fmt.Sprintf("%s updated.", resourceType))
r.Recorder.Eventf(opt.dynamoNimDeployment, corev1.EventTypeNormal, fmt.Sprintf("Update%s", resourceType), "Updated %s %s", resourceType, resourceNamespace)
modified = true
res = resource
} else {
logs.Info("HPA spec is the same. Skipping update.")
r.Recorder.Eventf(dynamoNimDeployment, corev1.EventTypeNormal, "UpdateHPA", "Skipping update HPA %s", hpaNamespacedName)
logs.Info(fmt.Sprintf("%s spec is the same. Skipping update.", resourceType))
r.Recorder.Eventf(opt.dynamoNimDeployment, corev1.EventTypeNormal, fmt.Sprintf("Update%s", resourceType), "Skipping update %s %s", resourceType, resourceNamespace)
res = oldResource
}
}
return
}
......@@ -794,199 +713,60 @@ func checkIfContainsStealingTrafficDebugModeEnabled(dynamoNimDeployment *v1alpha
return checkIfIsStealingTrafficDebugModeEnabled(dynamoNimDeployment.Spec.Annotations)
}
type createOrUpdateOrDeleteServicesOption struct {
dynamoNimDeployment *v1alpha1.DynamoNimDeployment
dynamoNim *v1alpha1.DynamoNim
}
//nolint:nakedret
func (r *DynamoNimDeploymentReconciler) createOrUpdateOrDeleteServices(ctx context.Context, opt createOrUpdateOrDeleteServicesOption) (modified bool, err error) {
func (r *DynamoNimDeploymentReconciler) createOrUpdateOrDeleteServices(ctx context.Context, opt generateResourceOption) (modified bool, err error) {
resourceAnnotations := getResourceAnnotations(opt.dynamoNimDeployment)
isDebugPodReceiveProductionTrafficEnabled := checkIfIsDebugPodReceiveProductionTrafficEnabled(resourceAnnotations)
containsStealingTrafficDebugModeEnabled := checkIfContainsStealingTrafficDebugModeEnabled(opt.dynamoNimDeployment)
modified, err = r.createOrUpdateService(ctx, createOrUpdateServiceOption{
// main generic service
modified, _, err = createOrUpdateResource(ctx, r, generateResourceOption{
dynamoNimDeployment: opt.dynamoNimDeployment,
dynamoNim: opt.dynamoNim,
isStealingTrafficDebugModeEnabled: false,
isDebugPodReceiveProductionTraffic: isDebugPodReceiveProductionTrafficEnabled,
containsStealingTrafficDebugModeEnabled: containsStealingTrafficDebugModeEnabled,
isGenericService: true,
})
}, r.generateService)
if err != nil {
return
}
if containsStealingTrafficDebugModeEnabled {
var modified_ bool
modified_, err = r.createOrUpdateService(ctx, createOrUpdateServiceOption{
// debug production service (if enabled)
modified_, _, err := createOrUpdateResource(ctx, r, generateResourceOption{
dynamoNimDeployment: opt.dynamoNimDeployment,
dynamoNim: opt.dynamoNim,
isStealingTrafficDebugModeEnabled: false,
isDebugPodReceiveProductionTraffic: isDebugPodReceiveProductionTrafficEnabled,
containsStealingTrafficDebugModeEnabled: containsStealingTrafficDebugModeEnabled,
isGenericService: false,
})
}, r.generateService)
if err != nil {
return
}
if modified_ {
modified = true
}
modified_, err = r.createOrUpdateService(ctx, createOrUpdateServiceOption{
modified = modified || modified_
// debug service (if enabled)
modified_, _, err = createOrUpdateResource(ctx, r, generateResourceOption{
dynamoNimDeployment: opt.dynamoNimDeployment,
dynamoNim: opt.dynamoNim,
isStealingTrafficDebugModeEnabled: true,
isDebugPodReceiveProductionTraffic: isDebugPodReceiveProductionTrafficEnabled,
containsStealingTrafficDebugModeEnabled: containsStealingTrafficDebugModeEnabled,
isGenericService: false,
})
if err != nil {
return
}
if modified_ {
modified = true
}
} else {
productionServiceName := r.getServiceName(opt.dynamoNimDeployment, opt.dynamoNim, false)
svc := &corev1.Service{}
err = r.Get(ctx, types.NamespacedName{Name: productionServiceName, Namespace: opt.dynamoNimDeployment.Namespace}, svc)
isNotFound := k8serrors.IsNotFound(err)
if err != nil && !isNotFound {
err = errors.Wrapf(err, "Failed to get service %s", productionServiceName)
return
}
if !isNotFound {
modified = true
err = r.Delete(ctx, svc)
if err != nil {
err = errors.Wrapf(err, "Failed to delete service %s", productionServiceName)
return
}
}
debugServiceName := r.getServiceName(opt.dynamoNimDeployment, opt.dynamoNim, true)
svc = &corev1.Service{}
err = r.Get(ctx, types.NamespacedName{Name: debugServiceName, Namespace: opt.dynamoNimDeployment.Namespace}, svc)
isNotFound = k8serrors.IsNotFound(err)
if err != nil && !isNotFound {
err = errors.Wrapf(err, "Failed to get service %s", debugServiceName)
return
}
err = nil
if !isNotFound {
modified = true
err = r.Delete(ctx, svc)
if err != nil {
err = errors.Wrapf(err, "Failed to delete service %s", debugServiceName)
return
}
}
}
return
}
type createOrUpdateServiceOption struct {
dynamoNimDeployment *v1alpha1.DynamoNimDeployment
dynamoNim *v1alpha1.DynamoNim
isStealingTrafficDebugModeEnabled bool
isDebugPodReceiveProductionTraffic bool
containsStealingTrafficDebugModeEnabled bool
isGenericService bool
}
//nolint:nakedret
func (r *DynamoNimDeploymentReconciler) createOrUpdateService(ctx context.Context, opt createOrUpdateServiceOption) (modified bool, err error) {
logs := log.FromContext(ctx)
// nolint: gosimple
service, err := r.generateService(generateServiceOption(opt))
if err != nil {
return
}
logs = logs.WithValues("namespace", service.Namespace, "serviceName", service.Name, "serviceSelector", service.Spec.Selector)
serviceNamespacedName := fmt.Sprintf("%s/%s", service.Namespace, service.Name)
r.Recorder.Eventf(opt.dynamoNimDeployment, corev1.EventTypeNormal, "GetService", "Getting Service %s", serviceNamespacedName)
oldService := &corev1.Service{}
err = r.Get(ctx, types.NamespacedName{Name: service.Name, Namespace: service.Namespace}, oldService)
oldServiceIsNotFound := k8serrors.IsNotFound(err)
if err != nil && !oldServiceIsNotFound {
r.Recorder.Eventf(opt.dynamoNimDeployment, corev1.EventTypeWarning, "GetService", "Failed to get Service %s: %s", serviceNamespacedName, err)
logs.Error(err, "Failed to get Service.")
return
}
if oldServiceIsNotFound {
logs.Info("Service not found. Creating a new one.")
err = errors.Wrapf(patch.DefaultAnnotator.SetLastAppliedAnnotation(service), "set last applied annotation for service %s", service.Name)
if err != nil {
logs.Error(err, "Failed to set last applied annotation.")
r.Recorder.Eventf(opt.dynamoNimDeployment, corev1.EventTypeWarning, "SetLastAppliedAnnotation", "Failed to set last applied annotation for Service %s: %s", serviceNamespacedName, err)
return
}
r.Recorder.Eventf(opt.dynamoNimDeployment, corev1.EventTypeNormal, "CreateService", "Creating a new Service %s", serviceNamespacedName)
err = r.Create(ctx, service)
}, r.generateService)
if err != nil {
logs.Error(err, "Failed to create Service.")
r.Recorder.Eventf(opt.dynamoNimDeployment, corev1.EventTypeWarning, "CreateService", "Failed to create Service %s: %s", serviceNamespacedName, err)
return
}
logs.Info("Service created.")
r.Recorder.Eventf(opt.dynamoNimDeployment, corev1.EventTypeNormal, "CreateService", "Created Service %s", serviceNamespacedName)
modified = true
} else {
logs.Info("Service found.")
var patchResult *patch.PatchResult
patchResult, err = patch.DefaultPatchMaker.Calculate(oldService, service)
if err != nil {
logs.Error(err, "Failed to calculate patch.")
r.Recorder.Eventf(opt.dynamoNimDeployment, corev1.EventTypeWarning, "CalculatePatch", "Failed to calculate patch for Service %s: %s", serviceNamespacedName, err)
return
}
if !patchResult.IsEmpty() {
logs.Info("Service spec is different. Updating Service.")
err = errors.Wrapf(patch.DefaultAnnotator.SetLastAppliedAnnotation(service), "set last applied annotation for service %s", service.Name)
if err != nil {
logs.Error(err, "Failed to set last applied annotation.")
r.Recorder.Eventf(opt.dynamoNimDeployment, corev1.EventTypeWarning, "SetLastAppliedAnnotation", "Failed to set last applied annotation for Service %s: %s", serviceNamespacedName, err)
return
}
r.Recorder.Eventf(opt.dynamoNimDeployment, corev1.EventTypeNormal, "UpdateService", "Updating Service %s", serviceNamespacedName)
oldService.Annotations = service.Annotations
oldService.Labels = service.Labels
oldService.Spec = service.Spec
err = r.Update(ctx, oldService)
if err != nil {
logs.Error(err, "Failed to update Service.")
r.Recorder.Eventf(opt.dynamoNimDeployment, corev1.EventTypeWarning, "UpdateService", "Failed to update Service %s: %s", serviceNamespacedName, err)
return
}
logs.Info("Service updated.")
r.Recorder.Eventf(opt.dynamoNimDeployment, corev1.EventTypeNormal, "UpdateService", "Updated Service %s", serviceNamespacedName)
modified = true
} else {
logs = logs.WithValues("oldServiceSelector", oldService.Spec.Selector)
logs.Info("Service spec is the same. Skipping update.")
r.Recorder.Eventf(opt.dynamoNimDeployment, corev1.EventTypeNormal, "UpdateService", "Skipping update Service %s", serviceNamespacedName)
}
}
modified = modified || modified_
return
}
func (r *DynamoNimDeploymentReconciler) createOrUpdateVirtualService(ctx context.Context, dynamoNimDeployment *v1alpha1.DynamoNimDeployment) (bool, error) {
func (r *DynamoNimDeploymentReconciler) generateVirtualService(ctx context.Context, opt generateResourceOption) (*networkingv1beta1.VirtualService, bool, error) {
log := log.FromContext(ctx)
log.Info("Starting createOrUpdateVirtualService")
vsName := dynamoNimDeployment.Name
if dynamoNimDeployment.Spec.Ingress.HostPrefix != nil {
vsName = *dynamoNimDeployment.Spec.Ingress.HostPrefix + vsName
log.Info("Starting generateVirtualService")
vsName := opt.dynamoNimDeployment.Name
if opt.dynamoNimDeployment.Spec.Ingress.HostPrefix != nil {
vsName = *opt.dynamoNimDeployment.Spec.Ingress.HostPrefix + vsName
}
ingressSuffix, found := os.LookupEnv("DYNAMO_INGRESS_SUFFIX")
if !found || ingressSuffix == "" {
......@@ -994,10 +774,18 @@ func (r *DynamoNimDeploymentReconciler) createOrUpdateVirtualService(ctx context
}
vs := &networkingv1beta1.VirtualService{
ObjectMeta: metav1.ObjectMeta{
Name: dynamoNimDeployment.Name,
Namespace: dynamoNimDeployment.Namespace,
Name: opt.dynamoNimDeployment.Name,
Namespace: opt.dynamoNimDeployment.Namespace,
},
Spec: istioNetworking.VirtualService{
}
vsEnabled := opt.dynamoNimDeployment.Spec.Ingress.Enabled && opt.dynamoNimDeployment.Spec.Ingress.UseVirtualService != nil && *opt.dynamoNimDeployment.Spec.Ingress.UseVirtualService
if !vsEnabled {
log.Info("VirtualService is not enabled")
return vs, true, nil
}
vs.Spec = istioNetworking.VirtualService{
Hosts: []string{
fmt.Sprintf("%s.%s", vsName, ingressSuffix),
},
......@@ -1014,7 +802,7 @@ func (r *DynamoNimDeploymentReconciler) createOrUpdateVirtualService(ctx context
Route: []*istioNetworking.HTTPRouteDestination{
{
Destination: &istioNetworking.Destination{
Host: dynamoNimDeployment.Name,
Host: opt.dynamoNimDeployment.Name,
Port: &istioNetworking.PortSelector{
Number: 3000,
},
......@@ -1023,75 +811,8 @@ func (r *DynamoNimDeploymentReconciler) createOrUpdateVirtualService(ctx context
},
},
},
},
}
log.Info("VirtualService object constructed", "VirtualService", vs)
oldVS := &networkingv1beta1.VirtualService{}
err := r.Get(ctx, types.NamespacedName{Name: vs.Name, Namespace: vs.Namespace}, oldVS)
if client.IgnoreNotFound(err) != nil {
log.Error(err, "Failed to get VirtualService")
return false, err
}
vsEnabled := dynamoNimDeployment.Spec.Ingress.Enabled && dynamoNimDeployment.Spec.Ingress.UseVirtualService != nil && *dynamoNimDeployment.Spec.Ingress.UseVirtualService
if err := ctrl.SetControllerReference(dynamoNimDeployment, vs, r.Scheme); err != nil {
log.Error(err, "Failed to set controller reference for the VirtualService")
return false, err
}
if err != nil {
if vsEnabled {
log.Info("VirtualService not found, creating new one")
if err := r.Create(ctx, vs); err != nil {
log.Error(err, "Failed to create VirtualService")
return false, err
}
log.Info("VirtualService created successfully", "VirtualService", vs)
return true, nil
}
return false, nil
}
if !vsEnabled {
log.Info("VirtualService found, deleting", "OldVirtualService", oldVS)
if err := r.Delete(ctx, oldVS); err != nil {
log.Error(err, "Failed to delete VirtualService")
return false, err
}
return true, err
}
log.Info("VirtualService found, updating", "OldVirtualService", oldVS)
vs.ObjectMeta.ResourceVersion = oldVS.ObjectMeta.ResourceVersion
if err := r.Update(ctx, vs); err != nil {
log.Error(err, "Failed to update VirtualService")
return false, err
}
log.Info("VirtualService updated successfully", "VirtualService", oldVS)
return true, nil
}
type createOrUpdateIngressOption struct {
yataiClient **yataiclient.YataiClient
dynamoNimDeployment *v1alpha1.DynamoNimDeployment
dynamoNim *v1alpha1.DynamoNim
}
//nolint:nakedret
func (r *DynamoNimDeploymentReconciler) createOrUpdateIngresses(ctx context.Context, opt createOrUpdateIngressOption) (modified bool, err error) {
dynamoNimDeployment := opt.dynamoNimDeployment
modified, err = r.createOrUpdateVirtualService(ctx, dynamoNimDeployment)
if err != nil {
return false, err
}
return modified, nil
return vs, false, nil
}
func (r *DynamoNimDeploymentReconciler) getKubeName(dynamoNimDeployment *v1alpha1.DynamoNimDeployment, _ *v1alpha1.DynamoNim, debug bool) string {
......@@ -1146,31 +867,36 @@ func (r *DynamoNimDeploymentReconciler) getKubeAnnotations(dynamoNimDeployment *
return annotations
}
type generateDeploymentOption struct {
dynamoNimDeployment *v1alpha1.DynamoNimDeployment
dynamoNim *v1alpha1.DynamoNim
yataiClient **yataiclient.YataiClient
clusterName *string
isStealingTrafficDebugModeEnabled bool
containsStealingTrafficDebugModeEnabled bool
}
//nolint:nakedret
func (r *DynamoNimDeploymentReconciler) generateDeployment(ctx context.Context, opt generateDeploymentOption) (kubeDeployment *appsv1.Deployment, err error) {
func (r *DynamoNimDeploymentReconciler) generateDeployment(ctx context.Context, opt generateResourceOption) (kubeDeployment *appsv1.Deployment, toDelete bool, err error) {
kubeNs := opt.dynamoNimDeployment.Namespace
// nolint: gosimple
podTemplateSpec, err := r.generatePodTemplateSpec(ctx, generatePodTemplateSpecOption(opt))
if err != nil {
return
}
labels := r.getKubeLabels(opt.dynamoNimDeployment, opt.dynamoNim)
annotations := r.getKubeAnnotations(opt.dynamoNimDeployment, opt.dynamoNim)
kubeName := r.getKubeName(opt.dynamoNimDeployment, opt.dynamoNim, opt.isStealingTrafficDebugModeEnabled)
kubeDeployment = &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: kubeName,
Namespace: kubeNs,
Labels: labels,
Annotations: annotations,
},
}
if opt.isStealingTrafficDebugModeEnabled && !opt.containsStealingTrafficDebugModeEnabled {
// if stealing traffic debug mode is enabked but disabled in the deployment, we need to delete the deployment
return kubeDeployment, true, nil
}
// nolint: gosimple
podTemplateSpec, err := r.generatePodTemplateSpec(ctx, opt)
if err != nil {
return
}
defaultMaxSurge := intstr.FromString("25%")
defaultMaxUnavailable := intstr.FromString("25%")
......@@ -1224,14 +950,7 @@ func (r *DynamoNimDeploymentReconciler) generateDeployment(ctx context.Context,
replicas = &[]int32{int32(1)}[0]
}
kubeDeployment = &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: kubeName,
Namespace: kubeNs,
Labels: labels,
Annotations: annotations,
},
Spec: appsv1.DeploymentSpec{
kubeDeployment.Spec = appsv1.DeploymentSpec{
Replicas: replicas,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
......@@ -1240,7 +959,6 @@ func (r *DynamoNimDeploymentReconciler) generateDeployment(ctx context.Context,
},
Template: *podTemplateSpec,
Strategy: strategy,
},
}
err = ctrl.SetControllerReference(opt.dynamoNimDeployment, kubeDeployment, r.Scheme)
......@@ -1251,25 +969,25 @@ func (r *DynamoNimDeploymentReconciler) generateDeployment(ctx context.Context,
return
}
func (r *DynamoNimDeploymentReconciler) generateHPA(dynamoNimDeployment *v1alpha1.DynamoNimDeployment, dynamoNim *v1alpha1.DynamoNim) (*autoscalingv2.HorizontalPodAutoscaler, error) {
labels := r.getKubeLabels(dynamoNimDeployment, dynamoNim)
annotations := r.getKubeAnnotations(dynamoNimDeployment, dynamoNim)
type generateResourceOption struct {
dynamoNimDeployment *v1alpha1.DynamoNimDeployment
dynamoNim *v1alpha1.DynamoNim
isStealingTrafficDebugModeEnabled bool
containsStealingTrafficDebugModeEnabled bool
isDebugPodReceiveProductionTraffic bool
isGenericService bool
}
kubeName := r.getKubeName(dynamoNimDeployment, dynamoNim, false)
func (r *DynamoNimDeploymentReconciler) generateHPA(ctx context.Context, opt generateResourceOption) (*autoscalingv2.HorizontalPodAutoscaler, bool, error) {
labels := r.getKubeLabels(opt.dynamoNimDeployment, opt.dynamoNim)
kubeNs := dynamoNimDeployment.Namespace
annotations := r.getKubeAnnotations(opt.dynamoNimDeployment, opt.dynamoNim)
hpaConf := dynamoNimDeployment.Spec.Autoscaling
kubeName := r.getKubeName(opt.dynamoNimDeployment, opt.dynamoNim, false)
if hpaConf == nil {
hpaConf = &v1alpha1.Autoscaling{
MinReplicas: 1,
MaxReplicas: 1,
}
}
kubeNs := opt.dynamoNimDeployment.Namespace
minReplica := int32(hpaConf.MinReplicas)
hpaConf := opt.dynamoNimDeployment.Spec.Autoscaling
kubeHpa := &autoscalingv2.HorizontalPodAutoscaler{
ObjectMeta: metav1.ObjectMeta{
......@@ -1278,7 +996,16 @@ func (r *DynamoNimDeploymentReconciler) generateHPA(dynamoNimDeployment *v1alpha
Labels: labels,
Annotations: annotations,
},
Spec: autoscalingv2.HorizontalPodAutoscalerSpec{
}
if hpaConf == nil || !hpaConf.Enabled {
// if hpa is not enabled, we need to delete the hpa
return kubeHpa, true, nil
}
minReplica := int32(hpaConf.MinReplicas)
kubeHpa.Spec = autoscalingv2.HorizontalPodAutoscalerSpec{
MinReplicas: &minReplica,
MaxReplicas: int32(hpaConf.MaxReplicas),
ScaleTargetRef: autoscalingv2.CrossVersionObjectReference{
......@@ -1287,7 +1014,6 @@ func (r *DynamoNimDeploymentReconciler) generateHPA(dynamoNimDeployment *v1alpha
Name: kubeName,
},
Metrics: hpaConf.Metrics,
},
}
if len(kubeHpa.Spec.Metrics) == 0 {
......@@ -1306,26 +1032,12 @@ func (r *DynamoNimDeploymentReconciler) generateHPA(dynamoNimDeployment *v1alpha
}
}
err := ctrl.SetControllerReference(dynamoNimDeployment, kubeHpa, r.Scheme)
err := ctrl.SetControllerReference(opt.dynamoNimDeployment, kubeHpa, r.Scheme)
if err != nil {
return nil, errors.Wrapf(err, "set hpa %s controller reference", kubeName)
return nil, false, errors.Wrapf(err, "set hpa %s controller reference", kubeName)
}
return kubeHpa, err
}
func (r *DynamoNimDeploymentReconciler) getHPA(ctx context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler) (client.Object, error) {
name, ns := hpa.Name, hpa.Namespace
obj := &autoscalingv2.HorizontalPodAutoscaler{}
err := r.Get(ctx, types.NamespacedName{Name: name, Namespace: ns}, obj)
if err == nil {
legacyStatus := &autoscalingv2.HorizontalPodAutoscalerStatus{}
if err := copier.Copy(legacyStatus, obj.Status); err != nil {
return nil, err
}
obj.Status = *legacyStatus
}
return obj, err
return kubeHpa, false, err
}
func getDynamoNimRepositoryNameAndDynamoNimVersion(dynamoNim *v1alpha1.DynamoNim) (repositoryName string, version string) {
......@@ -1334,17 +1046,8 @@ func getDynamoNimRepositoryNameAndDynamoNimVersion(dynamoNim *v1alpha1.DynamoNim
return
}
type generatePodTemplateSpecOption struct {
dynamoNimDeployment *v1alpha1.DynamoNimDeployment
dynamoNim *v1alpha1.DynamoNim
yataiClient **yataiclient.YataiClient
clusterName *string
isStealingTrafficDebugModeEnabled bool
containsStealingTrafficDebugModeEnabled bool
}
//nolint:gocyclo,nakedret
func (r *DynamoNimDeploymentReconciler) generatePodTemplateSpec(ctx context.Context, opt generatePodTemplateSpecOption) (podTemplateSpec *corev1.PodTemplateSpec, err error) {
func (r *DynamoNimDeploymentReconciler) generatePodTemplateSpec(ctx context.Context, opt generateResourceOption) (podTemplateSpec *corev1.PodTemplateSpec, err error) {
dynamoNimRepositoryName, _ := getDynamoNimRepositoryNameAndDynamoNimVersion(opt.dynamoNim)
podLabels := r.getKubeLabels(opt.dynamoNimDeployment, opt.dynamoNim)
if opt.isStealingTrafficDebugModeEnabled {
......@@ -2203,17 +1906,8 @@ func getResourcesConfig(resources *dynamoCommon.Resources) (corev1.ResourceRequi
return currentResources, nil
}
type generateServiceOption struct {
dynamoNimDeployment *v1alpha1.DynamoNimDeployment
dynamoNim *v1alpha1.DynamoNim
isStealingTrafficDebugModeEnabled bool
isDebugPodReceiveProductionTraffic bool
containsStealingTrafficDebugModeEnabled bool
isGenericService bool
}
//nolint:nakedret
func (r *DynamoNimDeploymentReconciler) generateService(opt generateServiceOption) (kubeService *corev1.Service, err error) {
func (r *DynamoNimDeploymentReconciler) generateService(ctx context.Context, opt generateResourceOption) (kubeService *corev1.Service, toDelete bool, err error) {
var kubeName string
if opt.isGenericService {
kubeName = r.getGenericServiceName(opt.dynamoNimDeployment, opt.dynamoNim)
......@@ -2221,6 +1915,20 @@ func (r *DynamoNimDeploymentReconciler) generateService(opt generateServiceOptio
kubeName = r.getServiceName(opt.dynamoNimDeployment, opt.dynamoNim, opt.isStealingTrafficDebugModeEnabled)
}
kubeNs := opt.dynamoNimDeployment.Namespace
kubeService = &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: kubeName,
Namespace: kubeNs,
},
}
if !opt.isGenericService && !opt.containsStealingTrafficDebugModeEnabled {
// if it's not a generic service and not contains stealing traffic debug mode enabled, we don't need to create the service
return kubeService, true, nil
}
labels := r.getKubeLabels(opt.dynamoNimDeployment, opt.dynamoNim)
selector := make(map[string]string)
......@@ -2261,17 +1969,9 @@ func (r *DynamoNimDeploymentReconciler) generateService(opt generateServiceOptio
annotations := r.getKubeAnnotations(opt.dynamoNimDeployment, opt.dynamoNim)
kubeNs := opt.dynamoNimDeployment.Namespace
kubeService = &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: kubeName,
Namespace: kubeNs,
Labels: labels,
Annotations: annotations,
},
Spec: spec,
}
kubeService.ObjectMeta.Annotations = annotations
kubeService.ObjectMeta.Labels = labels
kubeService.Spec = spec
err = ctrl.SetControllerReference(opt.dynamoNimDeployment, kubeService, r.Scheme)
if err != nil {
......@@ -2304,7 +2004,13 @@ func (r *DynamoNimDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error
m := ctrl.NewControllerManagedBy(mgr).
For(&v1alpha1.DynamoNimDeployment{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Owns(&appsv1.Deployment{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Owns(&appsv1.Deployment{}, builder.WithPredicates(predicate.Funcs{
// ignore creation cause we don't want to be called again after we create the deployment
CreateFunc: func(ce event.CreateEvent) bool { return false },
DeleteFunc: func(de event.DeleteEvent) bool { return true },
UpdateFunc: func(de event.UpdateEvent) bool { return true },
GenericFunc: func(ge event.GenericEvent) bool { return true },
})).
Owns(&corev1.Service{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Owns(&networkingv1beta1.VirtualService{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Owns(&networkingv1.Ingress{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
......
/*
* SPDX-FileCopyrightText: Copyright (c) 2022 Atalaya Tech. Inc
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Modifications Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES
*/
package controller
import (
"testing"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func TestIsDeploymentReady(t *testing.T) {
type args struct {
deployment *appsv1.Deployment
}
tests := []struct {
name string
args args
want bool
}{
{
name: "deployment is nil",
args: args{
deployment: nil,
},
want: false,
},
{
name: "not ready",
args: args{
deployment: &appsv1.Deployment{
Spec: appsv1.DeploymentSpec{},
Status: appsv1.DeploymentStatus{
Conditions: []appsv1.DeploymentCondition{
{
Type: appsv1.DeploymentAvailable,
Status: corev1.ConditionFalse,
},
},
},
},
},
want: false,
},
{
name: "not ready (paused)",
args: args{
deployment: &appsv1.Deployment{
Spec: appsv1.DeploymentSpec{
Paused: true,
},
},
},
want: false,
},
{
name: "ready",
args: args{
deployment: &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Generation: 1,
},
Spec: appsv1.DeploymentSpec{
Replicas: &[]int32{1}[0],
},
Status: appsv1.DeploymentStatus{
ObservedGeneration: 1,
UpdatedReplicas: 1,
AvailableReplicas: 1,
Conditions: []appsv1.DeploymentCondition{
{
Type: appsv1.DeploymentAvailable,
Status: corev1.ConditionTrue,
},
},
},
},
},
want: true,
},
{
name: "ready (no desired replicas)",
args: args{
deployment: &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Generation: 1,
},
Spec: appsv1.DeploymentSpec{
Replicas: &[]int32{0}[0],
},
},
},
want: true,
},
{
name: "not ready (condition false)",
args: args{
deployment: &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Generation: 1,
},
Spec: appsv1.DeploymentSpec{
Replicas: &[]int32{1}[0],
},
Status: appsv1.DeploymentStatus{
ObservedGeneration: 1,
UpdatedReplicas: 1,
AvailableReplicas: 1,
Conditions: []appsv1.DeploymentCondition{
{
Type: appsv1.DeploymentAvailable,
Status: corev1.ConditionFalse,
},
},
},
},
},
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := IsDeploymentReady(tt.args.deployment); got != tt.want {
t.Errorf("IsDeploymentReady() = %v, want %v", got, tt.want)
}
})
}
}
......@@ -80,8 +80,8 @@ func SyncResource[T Resource](ctx context.Context, c client.Client, desired T, n
// Check if the Spec has changed and update if necessary
if IsSpecChanged(current, desired) {
// update the spec of the current object with the desired spec
current.SetSpec(desired.GetSpec())
if err := c.Update(ctx, current); err != nil {
desired.SetResourceVersion(current.GetResourceVersion())
if err := c.Update(ctx, desired); err != nil {
return desired, fmt.Errorf("failed to update resource: %w", err)
}
}
......
......@@ -258,11 +258,13 @@ func GenerateDynamoNIMDeployments(parentDynamoDeployment *v1alpha1.DynamoDeploym
},
}
}
if service.Config.Autoscaling != nil {
deployment.Spec.Autoscaling = &v1alpha1.Autoscaling{
MinReplicas: service.Config.Autoscaling.MinReplicas,
MaxReplicas: service.Config.Autoscaling.MaxReplicas,
Enabled: false,
}
if service.Config.Autoscaling != nil {
deployment.Spec.Autoscaling.Enabled = true
deployment.Spec.Autoscaling.MinReplicas = service.Config.Autoscaling.MinReplicas
deployment.Spec.Autoscaling.MaxReplicas = service.Config.Autoscaling.MaxReplicas
}
deployments[service.Name] = deployment
}
......
......@@ -110,6 +110,7 @@ func TestGenerateDynamoNIMDeployments(t *testing.T) {
},
},
Autoscaling: &v1alpha1.Autoscaling{
Enabled: true,
MinReplicas: 1,
MaxReplicas: 5,
},
......@@ -130,6 +131,9 @@ func TestGenerateDynamoNIMDeployments(t *testing.T) {
DynamoNim: "dynamonim--ac4e234",
DynamoTag: "dynamonim:MyService1",
ServiceName: "service2",
Autoscaling: &v1alpha1.Autoscaling{
Enabled: false,
},
},
},
},
......@@ -206,6 +210,7 @@ func TestGenerateDynamoNIMDeployments(t *testing.T) {
},
},
Autoscaling: &v1alpha1.Autoscaling{
Enabled: true,
MinReplicas: 1,
MaxReplicas: 5,
},
......@@ -230,6 +235,9 @@ func TestGenerateDynamoNIMDeployments(t *testing.T) {
DynamoNim: "dynamonim--ac4e234",
DynamoTag: "dynamonim:MyService2",
ServiceName: "service2",
Autoscaling: &v1alpha1.Autoscaling{
Enabled: false,
},
},
},
},
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment