/* * SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. * SPDX-License-Identifier: Apache-2.0 * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package controller import ( "context" "fmt" "sort" "strings" grovev1alpha1 "github.com/ai-dynamo/grove/operator/api/core/v1alpha1" "k8s.io/apimachinery/pkg/api/errors" "github.com/ai-dynamo/dynamo/deploy/operator/internal/checkpoint" "github.com/ai-dynamo/dynamo/deploy/operator/internal/discovery" "github.com/ai-dynamo/dynamo/deploy/operator/internal/secret" networkingv1beta1 "istio.io/client-go/pkg/apis/networking/v1beta1" corev1 "k8s.io/api/core/v1" networkingv1 "k8s.io/api/networking/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/scale" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" configv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/config/v1alpha1" nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1" "github.com/ai-dynamo/dynamo/deploy/operator/internal/consts" commoncontroller "github.com/ai-dynamo/dynamo/deploy/operator/internal/controller_common" "github.com/ai-dynamo/dynamo/deploy/operator/internal/dynamo" "github.com/ai-dynamo/dynamo/deploy/operator/internal/dynamo/epp" "github.com/ai-dynamo/dynamo/deploy/operator/internal/observability" rbacv1 "k8s.io/api/rbac/v1" gaiev1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" ) type Reason string type Message string // rbacManager interface for managing RBAC resources type rbacManager interface { EnsureServiceAccountWithRBAC(ctx context.Context, targetNamespace, serviceAccountName, clusterRoleName string) error } // DynamoGraphDeploymentReconciler reconciles a DynamoGraphDeployment object type DynamoGraphDeploymentReconciler struct { client.Client Config *configv1alpha1.OperatorConfiguration RuntimeConfig *commoncontroller.RuntimeConfig Recorder record.EventRecorder DockerSecretRetriever dockerSecretRetriever ScaleClient scale.ScalesGetter MPISecretReplicator *secret.SecretReplicator RBACManager rbacManager } // +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments/status,verbs=get;update;patch // +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments/finalizers,verbs=update // +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeploymentscalingadapters,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=grove.io,resources=podcliquesets,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=grove.io,resources=podcliques/scale,verbs=get;update;patch // +kubebuilder:rbac:groups=grove.io,resources=podcliquescalinggroups/scale,verbs=get;update;patch // +kubebuilder:rbac:groups=scheduling.run.ai,resources=queues,verbs=get;list // +kubebuilder:rbac:groups=inference.networking.k8s.io,resources=inferencepools,verbs=get;list;watch;create;update;patch;delete // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. // TODO(user): Modify the Reconcile function to compare the state specified by // the DynamoGraphDeployment object against the actual cluster state, and then // perform operations to make the cluster state reflect the state specified by // the user. // // For more details, check Reconcile and its Result here: // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.19.1/pkg/reconcile func (r *DynamoGraphDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, err error) { logger := log.FromContext(ctx) reason := Reason("undefined") message := Message("") state := nvidiacomv1alpha1.DGDStatePending // retrieve the CRD dynamoDeployment := &nvidiacomv1alpha1.DynamoGraphDeployment{} if err = r.Get(ctx, req.NamespacedName, dynamoDeployment); err != nil { return ctrl.Result{}, client.IgnoreNotFound(err) } defer func() { // Skip status update if DGD is being deleted if !dynamoDeployment.GetDeletionTimestamp().IsZero() { logger.Info("Reconciliation done - skipping status update for deleted resource") return } if err != nil { state = nvidiacomv1alpha1.DGDStateFailed message = Message(err.Error()) logger.Error(err, "Reconciliation failed") } dynamoDeployment.SetState(state) readyStatus := metav1.ConditionFalse if state == nvidiacomv1alpha1.DGDStateSuccessful { readyStatus = metav1.ConditionTrue } // Update Ready condition dynamoDeployment.AddStatusCondition(metav1.Condition{ Type: "Ready", Status: readyStatus, Reason: string(reason), Message: string(message), LastTransitionTime: metav1.Now(), }) // Only set ObservedGeneration when reconciliation succeeded (no error), // so it accurately reflects the last successfully processed generation. if err == nil { dynamoDeployment.Status.ObservedGeneration = dynamoDeployment.Generation } updateErr := r.Status().Update(ctx, dynamoDeployment) if updateErr != nil { logger.Error(updateErr, "Unable to update the CRD status", "crd", req.NamespacedName, "state", state, "reason", reason, "message", message) // Set err to trigger requeue if err == nil { err = updateErr } } logger.Info("Reconciliation done") }() // Handle finalizer deleted, err := commoncontroller.HandleFinalizer(ctx, dynamoDeployment, r.Client, r) if err != nil { logger.Error(err, "failed to handle the finalizer") reason = "failed_to_handle_the_finalizer" return ctrl.Result{}, err } if deleted { return ctrl.Result{}, nil } if r.supportsManagedRollingUpdate(dynamoDeployment) { if err = r.initializeWorkerHashIfNeeded(ctx, dynamoDeployment); err != nil { logger.Error(err, "Failed to initialize worker hash") reason = "failed_to_initialize_worker_hash" return ctrl.Result{}, err } if r.isRollingUpdateInProgress(dynamoDeployment) || r.shouldTriggerRollingUpdate(dynamoDeployment) { if err = r.reconcileRollingUpdate(ctx, dynamoDeployment); err != nil { logger.Error(err, "Failed to reconcile rolling update") state = nvidiacomv1alpha1.DGDStateFailed reason = Reason("RollingUpdateFailed") message = Message(err.Error()) return ctrl.Result{}, err } } } else { // For unsupported pathways, log if a rolling update would have been triggered if r.shouldTriggerRollingUpdate(dynamoDeployment) { logger.Info("Worker spec change detected but rolling update not supported for this pathway", "isGrove", r.isGrovePathway(dynamoDeployment), "hasMultinode", dynamoDeployment.HasAnyMultinodeService()) r.Recorder.Event(dynamoDeployment, corev1.EventTypeWarning, "RollingUpdateNotSupported", "Worker spec changed but custom rolling updates are not supported for Grove/multinode deployments") // Update the hash to prevent repeated warnings hash := dynamo.ComputeDGDWorkersSpecHash(dynamoDeployment) r.setCurrentWorkerHash(dynamoDeployment, hash) if updateErr := r.Update(ctx, dynamoDeployment); updateErr != nil { logger.Error(updateErr, "Failed to update worker hash for unsupported pathway") } } } reconcileResult, err := r.reconcileResources(ctx, dynamoDeployment) state = reconcileResult.State reason = reconcileResult.Reason message = reconcileResult.Message dynamoDeployment.Status.Services = reconcileResult.ServiceStatus dynamoDeployment.Status.Restart = reconcileResult.RestartStatus if err != nil { logger.Error(err, "failed to reconcile the resources") reason = "failed_to_reconcile_the_resources" return ctrl.Result{}, err } // Override state based on rolling update status if a rolling update is in progress if dynamoDeployment.Status.RollingUpdate != nil { switch dynamoDeployment.Status.RollingUpdate.Phase { case nvidiacomv1alpha1.RollingUpdatePhaseCompleted: // Keep the reconcileResult state (should be Ready if resources are ready) case nvidiacomv1alpha1.RollingUpdatePhasePending, nvidiacomv1alpha1.RollingUpdatePhaseInProgress: // Rolling update in progress - resources are being transitioned if state != nvidiacomv1alpha1.DGDStateFailed { state = nvidiacomv1alpha1.DGDStatePending reason = "rolling_update_in_progress" message = "Rolling update in progress" } } } return ctrl.Result{}, nil } type Resource interface { IsReady() (ready bool, reason string) GetName() string GetServiceStatuses() map[string]nvidiacomv1alpha1.ServiceReplicaStatus } type ReconcileResult struct { State nvidiacomv1alpha1.DGDState Reason Reason Message Message ServiceStatus map[string]nvidiacomv1alpha1.ServiceReplicaStatus RestartStatus *nvidiacomv1alpha1.RestartStatus } func (r *DynamoGraphDeploymentReconciler) reconcileResources(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) (ReconcileResult, error) { logger := log.FromContext(ctx) // Ensure planner RBAC exists in cluster-wide mode if r.Config.Namespace.Restricted == "" { if r.RBACManager == nil { return ReconcileResult{}, fmt.Errorf("RBAC manager not initialized in cluster-wide mode") } if r.Config.RBAC.PlannerClusterRoleName == "" { return ReconcileResult{}, fmt.Errorf("planner ClusterRole name is required in cluster-wide mode") } if err := r.RBACManager.EnsureServiceAccountWithRBAC( ctx, dynamoDeployment.Namespace, consts.PlannerServiceAccountName, r.Config.RBAC.PlannerClusterRoleName, ); err != nil { logger.Error(err, "Failed to ensure planner RBAC") return ReconcileResult{}, fmt.Errorf("failed to ensure planner RBAC: %w", err) } // Ensure EPP RBAC exists in cluster-wide mode if EPP service is present if dynamoDeployment.HasEPPService() { if r.Config.RBAC.EPPClusterRoleName == "" { return ReconcileResult{}, fmt.Errorf("EPP ClusterRole name is required in cluster-wide mode when EPP service is present") } if err := r.RBACManager.EnsureServiceAccountWithRBAC( ctx, dynamoDeployment.Namespace, consts.EPPServiceAccountName, r.Config.RBAC.EPPClusterRoleName, ); err != nil { logger.Error(err, "Failed to ensure EPP RBAC") return ReconcileResult{}, fmt.Errorf("failed to ensure EPP RBAC: %w", err) } } } // Reconcile top-level PVCs first err := r.reconcilePVCs(ctx, dynamoDeployment) if err != nil { logger.Error(err, "Failed to reconcile top-level PVCs") return ReconcileResult{}, fmt.Errorf("failed to reconcile top-level PVCs: %w", err) } // Reconcile checkpoints for services with checkpointing enabled checkpointStatuses, checkpointInfos, err := r.reconcileCheckpoints(ctx, dynamoDeployment) if err != nil { logger.Error(err, "Failed to reconcile checkpoints") return ReconcileResult{}, fmt.Errorf("failed to reconcile checkpoints: %w", err) } dynamoDeployment.Status.Checkpoints = checkpointStatuses // Reconcile DynamoGraphDeploymentScalingAdapters for each service err = r.reconcileScalingAdapters(ctx, dynamoDeployment) if err != nil { logger.Error(err, "Failed to reconcile scaling adapters") return ReconcileResult{}, fmt.Errorf("failed to reconcile scaling adapters: %w", err) } // Reconcile the SA, Role and RoleBinding if k8s discovery is enabled err = r.reconcileK8sDiscoveryResources(ctx, dynamoDeployment) if err != nil { logger.Error(err, "Failed to reconcile K8s discovery resources") return ReconcileResult{}, fmt.Errorf("failed to reconcile K8s discovery resources: %w", err) } // Reconcile EPP resources (ConfigMaps, Services, InferencePools) if EPP service exists err = r.reconcileEPPResources(ctx, dynamoDeployment) if err != nil { logger.Error(err, "Failed to reconcile EPP resources") return ReconcileResult{}, fmt.Errorf("failed to reconcile EPP resources: %w", err) } // Determine if any service is multinode hasMultinode := dynamoDeployment.HasAnyMultinodeService() // Always ensure MPI SSH secret is available in this namespace if r.MPISecretReplicator != nil { err := r.MPISecretReplicator.Replicate(ctx, dynamoDeployment.Namespace) if err != nil { logger.Error(err, "Failed to replicate MPI secret", "namespace", dynamoDeployment.Namespace) return ReconcileResult{}, fmt.Errorf("failed to replicate MPI secret: %w", err) } } // return error early if Grove and LWS is not available for multinode if !r.isGrovePathway(dynamoDeployment) && hasMultinode && !r.RuntimeConfig.LWSEnabled { err := fmt.Errorf("no multinode orchestrator available") logger.Error(err, err.Error(), "hasMultinode", hasMultinode, "lwsEnabled", r.RuntimeConfig.LWSEnabled) return ReconcileResult{}, fmt.Errorf("failed to reconcile Dynamo components deployments: %w", err) } restartStatus := r.computeRestartStatus(ctx, dynamoDeployment) restartState := dynamo.DetermineRestartState(dynamoDeployment, restartStatus) var result ReconcileResult if r.isGrovePathway(dynamoDeployment) { logger.Info("Reconciling Grove resources", "hasMultinode", hasMultinode, "lwsEnabled", r.RuntimeConfig.LWSEnabled) result, err = r.reconcileGroveResources(ctx, dynamoDeployment, restartState, checkpointInfos) } else { logger.Info("Reconciling Dynamo components deployments", "hasMultinode", hasMultinode, "lwsEnabled", r.RuntimeConfig.LWSEnabled) result, err = r.reconcileDynamoComponentsDeployments(ctx, dynamoDeployment, restartState) } if err != nil { logger.Error(err, "Failed to reconcile Dynamo components deployments") return ReconcileResult{}, fmt.Errorf("failed to reconcile Dynamo components deployments: %w", err) } result.RestartStatus = restartStatus return result, nil } func (r *DynamoGraphDeploymentReconciler) isGrovePathway(dgd *nvidiacomv1alpha1.DynamoGraphDeployment) bool { // Orchestrator selection via single boolean annotation: nvidia.com/enable-grove // Unset or not "false": Grove if available; else component mode // "false": component mode (multinode -> LWS; single-node -> standard) enableGrove := true if dgd.Annotations != nil && strings.ToLower(dgd.Annotations[consts.KubeAnnotationEnableGrove]) == consts.KubeLabelValueFalse { enableGrove = false } return enableGrove && r.RuntimeConfig.GroveEnabled } func (r *DynamoGraphDeploymentReconciler) getUpdatedInProgress(ctx context.Context, dgd *nvidiacomv1alpha1.DynamoGraphDeployment, inProgress []string) []string { if r.isGrovePathway(dgd) { return r.getUpdatedInProgressForGrove(ctx, dgd, inProgress) } return r.getUpdatedInProgressForComponent(ctx, dgd, inProgress) } // getUpgdatedInProgressForGrove checks which services are still in progress. func (r *DynamoGraphDeploymentReconciler) getUpdatedInProgressForGrove(ctx context.Context, dgd *nvidiacomv1alpha1.DynamoGraphDeployment, inProgress []string) []string { logger := log.FromContext(ctx) pcs := &grovev1alpha1.PodCliqueSet{} err := r.Client.Get(ctx, types.NamespacedName{Name: dgd.Name, Namespace: dgd.Namespace}, pcs) if err != nil { logger.Error(err, "failed to get PodCliqueSet") return inProgress } if pcs.Status.ObservedGeneration == nil { logger.Info("PodCliqueSet observedGeneration is nil", "name", dgd.Name) return inProgress } if *pcs.Status.ObservedGeneration < pcs.Generation { logger.Info("PodCliqueSet not yet reconciled", "name", dgd.Name, "generation", pcs.Generation, "observedGeneration", *pcs.Status.ObservedGeneration) return inProgress } updatedInProgress := make([]string, 0, len(inProgress)) for _, serviceName := range inProgress { component := dgd.Spec.Services[serviceName] resourceName := fmt.Sprintf("%s-0-%s", dgd.Name, strings.ToLower(serviceName)) var isReady bool var reason string if component.GetNumberOfNodes() > 1 { isReady, reason, _ = dynamo.CheckPCSGReady(ctx, r.Client, resourceName, dgd.Namespace, logger) } else { isReady, reason, _ = dynamo.CheckPodCliqueReady(ctx, r.Client, resourceName, dgd.Namespace, logger) } if !isReady { logger.V(1).Info("service not ready", "serviceName", serviceName, "resourceName", resourceName, "reason", reason) updatedInProgress = append(updatedInProgress, serviceName) } } return updatedInProgress } func isRestartAlreadyProcessed(dgd *nvidiacomv1alpha1.DynamoGraphDeployment) bool { if dgd.Spec.Restart == nil || dgd.Spec.Restart.ID == "" { return true } if dgd.Status.Restart == nil || dgd.Status.Restart.ObservedID == "" { return false } if dgd.Spec.Restart.ID == dgd.Status.Restart.ObservedID && (dgd.Status.Restart.Phase == nvidiacomv1alpha1.RestartPhaseCompleted || dgd.Status.Restart.Phase == nvidiacomv1alpha1.RestartPhaseFailed || dgd.Status.Restart.Phase == nvidiacomv1alpha1.RestartPhaseSuperseded) { return true } return false } // scaleGroveResource scales a Grove resource using the generic scaling function func (r *DynamoGraphDeploymentReconciler) scaleGroveResource(ctx context.Context, resourceName, namespace string, newReplicas int32, resourceType string) error { logger := log.FromContext(ctx) // Determine the GroupVersionResource based on resource type var gvr schema.GroupVersionResource switch resourceType { case "PodClique": gvr = consts.PodCliqueGVR case "PodCliqueScalingGroup": gvr = consts.PodCliqueScalingGroupGVR default: return fmt.Errorf("unsupported Grove resource type: %s", resourceType) } // Use the generic scaling function err := commoncontroller.ScaleResource(ctx, r.ScaleClient, gvr, namespace, resourceName, newReplicas) if err != nil { if errors.IsNotFound(err) { // Resource doesn't exist yet - this is normal during initial creation when Grove is still creating the resources asynchronously logger.V(1).Info("Grove resource not found yet, skipping scaling for now - will retry on next reconciliation", "gvr", gvr, "name", resourceName, "namespace", namespace) return nil } } return err } func (r *DynamoGraphDeploymentReconciler) reconcileGrovePodCliqueSet(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment, restartState *dynamo.RestartState, checkpointInfos map[string]*checkpoint.CheckpointInfo) (*commoncontroller.Resource, error) { logger := log.FromContext(ctx) existingRestartAnnotations, err := r.getExistingRestartAnnotationsPCS(ctx, dynamoDeployment) if err != nil { logger.Error(err, "failed to get existing restart annotations") return nil, fmt.Errorf("failed to get existing restart annotations: %w", err) } // generate the dynamoComponentsDeployments from the config grovePodCliqueSet, err := dynamo.GenerateGrovePodCliqueSet(ctx, dynamoDeployment, r.Config, r.RuntimeConfig, r.DockerSecretRetriever, restartState, existingRestartAnnotations, checkpointInfos) if err != nil { logger.Error(err, "failed to generate the Grove GangSet") return nil, fmt.Errorf("failed to generate the Grove GangSet: %w", err) } _, syncedGrovePodCliqueSet, err := commoncontroller.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*grovev1alpha1.PodCliqueSet, bool, error) { return grovePodCliqueSet, false, nil }) if err != nil { logger.Error(err, "failed to sync the Grove GangSet") return nil, fmt.Errorf("failed to sync the Grove GangSet: %w", err) } syncedGrovePodCliqueSetAsResource, err := commoncontroller.NewResourceWithServiceStatuses( syncedGrovePodCliqueSet, func() (bool, string, map[string]nvidiacomv1alpha1.ServiceReplicaStatus) { // Grove readiness: all underlying PodCliques and PodCliqueScalingGroups have replicas == availableReplicas allComponentsReady, reason, serviceStatuses := dynamo.GetComponentReadinessAndServiceReplicaStatuses(ctx, r.Client, dynamoDeployment) if !allComponentsReady { return false, reason, serviceStatuses } return true, "", serviceStatuses }, ) if err != nil { logger.Error(err, "failed to create the Grove PodClique Set resource") return nil, fmt.Errorf("failed to create the Grove PodClique Set resource: %w", err) } return syncedGrovePodCliqueSetAsResource, nil } func (r *DynamoGraphDeploymentReconciler) getExistingRestartAnnotationsPCS(ctx context.Context, dgd *nvidiacomv1alpha1.DynamoGraphDeployment) (map[string]string, error) { restartAnnotations := make(map[string]string) pcs := &grovev1alpha1.PodCliqueSet{} err := r.Client.Get(ctx, types.NamespacedName{Name: dgd.Name, Namespace: dgd.Namespace}, pcs) if err != nil && !errors.IsNotFound(err) { return nil, fmt.Errorf("failed to get PodCliqueSet: %w", err) } if errors.IsNotFound(err) { return restartAnnotations, nil } for _, clique := range pcs.Spec.Template.Cliques { if clique.Annotations != nil { if timestamp, ok := clique.Annotations[consts.RestartAnnotation]; ok { if serviceName, ok := clique.Labels[consts.KubeLabelDynamoComponent]; ok { restartAnnotations[serviceName] = timestamp } } } } return restartAnnotations, nil } // reconcileGroveScaling handles scaling operations for Grove resources based on service replica changes func (r *DynamoGraphDeploymentReconciler) reconcileGroveScaling(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) error { logger := log.FromContext(ctx) logger.V(1).Info("Reconciling Grove scaling operations") replicaIndex := 0 for serviceName, component := range dynamoDeployment.Spec.Services { // Skip if replicas are not specified if component.Replicas == nil { continue } numberOfNodes := component.GetNumberOfNodes() isMultinode := numberOfNodes > 1 if isMultinode { // Scale PodCliqueScalingGroup for multinode services // Grove naming pattern: {DGD.name}-{replicaIndex}-{serviceName} resourceName := fmt.Sprintf("%s-%d-%s", dynamoDeployment.Name, replicaIndex, strings.ToLower(serviceName)) err := r.scaleGroveResource(ctx, resourceName, dynamoDeployment.Namespace, *component.Replicas, "PodCliqueScalingGroup") if err != nil { logger.Error(err, "Failed to scale PodCliqueScalingGroup", "serviceName", serviceName, "resourceName", resourceName, "replicas", *component.Replicas) return fmt.Errorf("failed to scale PodCliqueScalingGroup %s: %w", resourceName, err) } } else { // Scale individual PodClique for single-node services // Grove naming pattern: {DGD.name}-{replicaIndex}-{serviceName} resourceName := fmt.Sprintf("%s-%d-%s", dynamoDeployment.Name, replicaIndex, strings.ToLower(serviceName)) err := r.scaleGroveResource(ctx, resourceName, dynamoDeployment.Namespace, *component.Replicas, "PodClique") if err != nil { logger.Error(err, "Failed to scale PodClique", "serviceName", serviceName, "resourceName", resourceName, "replicas", *component.Replicas) return fmt.Errorf("failed to scale PodClique %s: %w", resourceName, err) } } } logger.V(1).Info("Successfully reconciled Grove scaling operations") return nil } func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment, restartState *dynamo.RestartState, checkpointInfos map[string]*checkpoint.CheckpointInfo) (ReconcileResult, error) { logger := log.FromContext(ctx) grovePodCliqueSetAsResource, err := r.reconcileGrovePodCliqueSet(ctx, dynamoDeployment, restartState, checkpointInfos) if err != nil { logger.Error(err, "failed to reconcile the Grove PodClique Set") return ReconcileResult{}, fmt.Errorf("failed to reconcile the Grove PodClique Set: %w", err) } // Handle Grove scaling operations after structural changes if err := r.reconcileGroveScaling(ctx, dynamoDeployment); err != nil { logger.Error(err, "failed to reconcile Grove scaling") return ReconcileResult{}, fmt.Errorf("failed to reconcile Grove scaling: %w", err) } // Reconcile headless services for model endpoint discovery if err := dynamo.ReconcileModelServicesForComponents( ctx, r, dynamoDeployment, dynamoDeployment.Spec.Services, dynamoDeployment.Namespace, ); err != nil { logger.Error(err, "failed to reconcile model services") return ReconcileResult{}, fmt.Errorf("failed to reconcile model services: %w", err) } resources := []Resource{grovePodCliqueSetAsResource} for componentName, component := range dynamoDeployment.Spec.Services { // if k8s discovery is enabled, create a service for each component // else, only create for the frontend component isK8sDiscoveryEnabled := commoncontroller.IsK8sDiscoveryEnabled(r.Config.Discovery.Backend, dynamoDeployment.Annotations) if isK8sDiscoveryEnabled || component.ComponentType == consts.ComponentTypeFrontend { if component.DynamoNamespace == nil { return ReconcileResult{}, fmt.Errorf("expected component %s to have a dynamoNamespace", componentName) } mainComponentService, err := dynamo.GenerateComponentService(dynamo.ComponentServiceParams{ ServiceName: dynamo.GetDCDResourceName(dynamoDeployment, componentName, ""), Namespace: dynamoDeployment.Namespace, ComponentType: component.ComponentType, DynamoNamespace: *component.DynamoNamespace, ComponentName: componentName, Labels: component.Labels, IsK8sDiscovery: isK8sDiscoveryEnabled, }) if err != nil { logger.Error(err, "failed to generate the main component service") return ReconcileResult{}, fmt.Errorf("failed to generate the main component service: %w", err) } _, syncedMainComponentService, err := commoncontroller.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*corev1.Service, bool, error) { return mainComponentService, false, nil }) if err != nil { logger.Error(err, "failed to sync the main component service") return ReconcileResult{}, fmt.Errorf("failed to sync the main component service: %w", err) } if syncedMainComponentService != nil { mainComponentServiceAsResource, err := commoncontroller.NewResource(syncedMainComponentService, func() (bool, string) { return true, "" }) if err != nil { return ReconcileResult{}, fmt.Errorf("failed to sync the main component service: %w", err) } resources = append(resources, mainComponentServiceAsResource) } } if component.ComponentType == consts.ComponentTypeFrontend { // generate the main component ingress ingressSpec := dynamo.GenerateDefaultIngressSpec(dynamoDeployment, r.Config.Ingress) if component.Ingress != nil { ingressSpec = *component.Ingress } mainComponentIngress := dynamo.GenerateComponentIngress(ctx, dynamo.GetDCDResourceName(dynamoDeployment, componentName, ""), dynamoDeployment.Namespace, ingressSpec) _, syncedMainComponentIngress, err := commoncontroller.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*networkingv1.Ingress, bool, error) { if !ingressSpec.Enabled || ingressSpec.IngressControllerClassName == nil { logger.Info("Ingress is not enabled") return mainComponentIngress, true, nil } return mainComponentIngress, false, nil }) if err != nil { logger.Error(err, "failed to sync the main component ingress") return ReconcileResult{}, fmt.Errorf("failed to sync the main component ingress: %w", err) } if syncedMainComponentIngress != nil { mainComponentIngressAsResource, err := commoncontroller.NewResource(syncedMainComponentIngress, func() (bool, string) { return true, "" }) if err != nil { return ReconcileResult{}, fmt.Errorf("failed to create the main component ingress resource: %w", err) } resources = append(resources, mainComponentIngressAsResource) } // generate the main component virtual service if r.Config.Ingress.UseVirtualService() { mainComponentVirtualService := dynamo.GenerateComponentVirtualService(ctx, dynamo.GetDCDResourceName(dynamoDeployment, componentName, ""), dynamoDeployment.Namespace, ingressSpec) _, syncedMainComponentVirtualService, err := commoncontroller.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*networkingv1beta1.VirtualService, bool, error) { if !ingressSpec.IsVirtualServiceEnabled() { logger.Info("VirtualService is not enabled") return mainComponentVirtualService, true, nil } return mainComponentVirtualService, false, nil }) if err != nil { logger.Error(err, "failed to sync the main component virtual service") return ReconcileResult{}, fmt.Errorf("failed to sync the main component virtual service: %w", err) } if syncedMainComponentVirtualService != nil { mainComponentVirtualServiceAsResource, err := commoncontroller.NewResource(syncedMainComponentVirtualService, func() (bool, string) { return true, "" }) if err != nil { return ReconcileResult{}, fmt.Errorf("failed to create the main component virtual service resource: %w", err) } resources = append(resources, mainComponentVirtualServiceAsResource) } } } } // Check resource readiness result := r.checkResourcesReadiness(resources) return result, nil } // isNewRestartRequest checks if the current spec.restart.id represents a new restart request func isNewRestartRequest(dgd *nvidiacomv1alpha1.DynamoGraphDeployment) bool { if dgd.Status.Restart == nil || dgd.Status.Restart.ObservedID == "" || dgd.Spec.Restart.ID == "" { return true } return dgd.Spec.Restart.ID != dgd.Status.Restart.ObservedID } // computeParallelRestartStatus handles parallel restart where all services restart together. func (r *DynamoGraphDeploymentReconciler) computeParallelRestartStatus( ctx context.Context, dgd *nvidiacomv1alpha1.DynamoGraphDeployment, ) *nvidiacomv1alpha1.RestartStatus { logger := log.FromContext(ctx) specID := dgd.Spec.Restart.ID var servicesToCheck []string if isNewRestartRequest(dgd) { logger.Info("New restart request detected, resetting to all services", "specID", specID) servicesToCheck = make([]string, 0, len(dgd.Spec.Services)) for serviceName := range dgd.Spec.Services { servicesToCheck = append(servicesToCheck, serviceName) } // Sort for deterministic output sort.Strings(servicesToCheck) // For a new restart request with services, immediately return Restarting phase without checking readiness. if len(servicesToCheck) > 0 { return &nvidiacomv1alpha1.RestartStatus{ ObservedID: specID, Phase: nvidiacomv1alpha1.RestartPhaseRestarting, InProgress: servicesToCheck, } } // If no services, fall through to the empty check below } else if dgd.Status.Restart != nil && len(dgd.Status.Restart.InProgress) > 0 { // Continuing existing restart: use current InProgress list servicesToCheck = dgd.Status.Restart.InProgress } else { // No in-progress list but same ID - use all services servicesToCheck = make([]string, 0, len(dgd.Spec.Services)) for serviceName := range dgd.Spec.Services { servicesToCheck = append(servicesToCheck, serviceName) } // Sort for deterministic output sort.Strings(servicesToCheck) } if len(servicesToCheck) == 0 { return &nvidiacomv1alpha1.RestartStatus{ ObservedID: specID, Phase: nvidiacomv1alpha1.RestartPhaseCompleted, } } updatedInProgress := r.getUpdatedInProgress(ctx, dgd, servicesToCheck) if len(updatedInProgress) == 0 { logger.Info("Restart completed for all services") return &nvidiacomv1alpha1.RestartStatus{ ObservedID: specID, Phase: nvidiacomv1alpha1.RestartPhaseCompleted, } } return &nvidiacomv1alpha1.RestartStatus{ ObservedID: specID, Phase: nvidiacomv1alpha1.RestartPhaseRestarting, InProgress: updatedInProgress, } } // computeSequentialRestartStatus handles sequential restart where services restart one at a time. func (r *DynamoGraphDeploymentReconciler) computeSequentialRestartStatus( ctx context.Context, dgd *nvidiacomv1alpha1.DynamoGraphDeployment, order []string, ) *nvidiacomv1alpha1.RestartStatus { logger := log.FromContext(ctx) specID := dgd.Spec.Restart.ID // Get the current service being restarted from previous status var currentService string if isNewRestartRequest(dgd) { // New restart request: start fresh from the first service logger.Info("New restart request detected, starting from first service", "specID", specID, "firstService", order[0]) currentService = order[0] return &nvidiacomv1alpha1.RestartStatus{ ObservedID: specID, Phase: nvidiacomv1alpha1.RestartPhaseRestarting, InProgress: []string{currentService}, } } if dgd.Status.Restart != nil && len(dgd.Status.Restart.InProgress) > 0 { currentService = dgd.Status.Restart.InProgress[0] // For sequential, there's only one } // If no current service, we're starting fresh - use the first service if currentService == "" { currentService = order[0] return &nvidiacomv1alpha1.RestartStatus{ ObservedID: specID, Phase: nvidiacomv1alpha1.RestartPhaseRestarting, InProgress: []string{currentService}, } } // Check if the current service is fully updated updatedInProgress := r.getUpdatedInProgress(ctx, dgd, []string{currentService}) if len(updatedInProgress) > 0 { // Still restarting logger.Info("Service restart not completed", "service", currentService, "updatedInProgress", updatedInProgress) return &nvidiacomv1alpha1.RestartStatus{ ObservedID: specID, Phase: nvidiacomv1alpha1.RestartPhaseRestarting, InProgress: []string{currentService}, } } // Current service is fully updated - it's done logger.Info("Service restart completed", "service", currentService) // Find the next service nextService := getNextServiceInOrder(order, currentService) if nextService == "" { // No more services, restart is complete logger.Info("Restart completed for all services") return &nvidiacomv1alpha1.RestartStatus{ ObservedID: specID, Phase: nvidiacomv1alpha1.RestartPhaseCompleted, } } // Move to the next service logger.Info("Starting next service restart", "service", nextService) return &nvidiacomv1alpha1.RestartStatus{ ObservedID: specID, Phase: nvidiacomv1alpha1.RestartPhaseRestarting, InProgress: []string{nextService}, } } // getNextServiceInOrder returns the service after the given service in the order, or empty string if none. func getNextServiceInOrder(order []string, currentService string) string { for i, svc := range order { if svc == currentService && i+1 < len(order) { return order[i+1] } } return "" } func (r *DynamoGraphDeploymentReconciler) computeRestartStatus(ctx context.Context, dgd *nvidiacomv1alpha1.DynamoGraphDeployment) *nvidiacomv1alpha1.RestartStatus { // No restart requested if dgd.Spec.Restart == nil || dgd.Spec.Restart.ID == "" { // Preserve existing terminal status if dgd.Status.Restart != nil && (dgd.Status.Restart.Phase == nvidiacomv1alpha1.RestartPhaseCompleted || dgd.Status.Restart.Phase == nvidiacomv1alpha1.RestartPhaseFailed || dgd.Status.Restart.Phase == nvidiacomv1alpha1.RestartPhaseSuperseded) { return dgd.Status.Restart } return nil } // If restart was already processed (completed, failed, or superseded), return existing status if isRestartAlreadyProcessed(dgd) { return dgd.Status.Restart } // Supersede restart if a rolling update is in progress if r.isRollingUpdateInProgress(dgd) { r.Recorder.Eventf(dgd, corev1.EventTypeWarning, "RestartSuperseded", "Restart %s superseded by rolling update", dgd.Spec.Restart.ID) return &nvidiacomv1alpha1.RestartStatus{ ObservedID: dgd.Spec.Restart.ID, Phase: nvidiacomv1alpha1.RestartPhaseSuperseded, } } order := dynamo.GetRestartOrder(dgd) if dynamo.IsParallelRestart(dgd) { return r.computeParallelRestartStatus(ctx, dgd) } return r.computeSequentialRestartStatus(ctx, dgd, order) } // checkComponentServiceFullyUpdated checks if a DynamoComponentDeployment is fully updated. func (r *DynamoGraphDeploymentReconciler) checkComponentServiceFullyUpdated(ctx context.Context, dgd *nvidiacomv1alpha1.DynamoGraphDeployment, serviceName string) (bool, string) { resourceName := dynamo.GetDCDResourceName(dgd, serviceName, r.getCurrentWorkerHash(dgd)) return checkDCDReady(ctx, r.Client, resourceName, dgd.Namespace) } // checkDCDReady checks if a DynamoComponentDeployment has completed its restart. // A DCD is considered fully updated when: // 1. The DCD controller has processed the latest spec (observedGeneration >= generation) // 2. The Available condition is set to True func checkDCDReady(ctx context.Context, client client.Client, resourceName, namespace string) (bool, string) { logger := log.FromContext(ctx) dcd := &nvidiacomv1alpha1.DynamoComponentDeployment{} err := client.Get(ctx, types.NamespacedName{Name: resourceName, Namespace: namespace}, dcd) if err != nil { if errors.IsNotFound(err) { logger.V(2).Info("DynamoComponentDeployment not found", "resourceName", resourceName) return false, "resource not found" } logger.V(1).Info("Failed to get DynamoComponentDeployment", "error", err, "resourceName", resourceName) return false, fmt.Sprintf("get error: %v", err) } // Log the DCD status for debugging logger.Info("CheckDCDFullyUpdated", "resourceName", resourceName, "generation", dcd.Generation, "observedGeneration", dcd.Status.ObservedGeneration, "conditionCount", len(dcd.Status.Conditions)) if dcd.Status.ObservedGeneration < dcd.Generation { logger.V(1).Info("DynamoComponentDeployment spec not yet processed", "resourceName", resourceName, "generation", dcd.Generation, "observedGeneration", dcd.Status.ObservedGeneration) return false, fmt.Sprintf("spec not yet processed: generation=%d, observedGeneration=%d", dcd.Generation, dcd.Status.ObservedGeneration) } // Check if the Available condition is True for _, condition := range dcd.Status.Conditions { if condition.Type == nvidiacomv1alpha1.DynamoGraphDeploymentConditionTypeAvailable { if condition.Status == metav1.ConditionTrue { return true, "" } logger.V(1).Info("DynamoComponentDeployment not available", "resourceName", resourceName, "status", condition.Status, "reason", condition.Reason, "message", condition.Message) return false, fmt.Sprintf("not available: %s", condition.Message) } } logger.V(1).Info("DynamoComponentDeployment missing Available condition", "resourceName", resourceName) return false, "Available condition not found" } // getUpdatedInProgressForComponent checks which services are still in progress for DCD pathway. func (r *DynamoGraphDeploymentReconciler) getUpdatedInProgressForComponent(ctx context.Context, dgd *nvidiacomv1alpha1.DynamoGraphDeployment, inProgress []string) []string { logger := log.FromContext(ctx) updatedInProgress := make([]string, 0, len(inProgress)) for _, serviceName := range inProgress { isFullyUpdated, reason := r.checkComponentServiceFullyUpdated(ctx, dgd, serviceName) if !isFullyUpdated { logger.V(1).Info("service not fully updated", "serviceName", serviceName, "reason", reason) updatedInProgress = append(updatedInProgress, serviceName) } } return updatedInProgress } func (r *DynamoGraphDeploymentReconciler) checkResourcesReadiness(resources []Resource) ReconcileResult { // Sort resources by name to ensure deterministic ordering sort.Slice(resources, func(i, j int) bool { return resources[i].GetName() < resources[j].GetName() }) var notReadyReasons []string notReadyResources := []string{} serviceStatuses := make(map[string]nvidiacomv1alpha1.ServiceReplicaStatus) for _, resource := range resources { ready, reason := resource.IsReady() resourceServiceStatuses := resource.GetServiceStatuses() for serviceName, serviceStatus := range resourceServiceStatuses { serviceStatuses[serviceName] = serviceStatus } if !ready { notReadyResources = append(notReadyResources, resource.GetName()) notReadyReasons = append(notReadyReasons, fmt.Sprintf("%s: %s", resource.GetName(), reason)) } } if len(notReadyResources) == 0 { return ReconcileResult{ State: nvidiacomv1alpha1.DGDStateSuccessful, Reason: "all_resources_are_ready", Message: Message("All resources are ready"), ServiceStatus: serviceStatuses, } } return ReconcileResult{ State: nvidiacomv1alpha1.DGDStatePending, Reason: "some_resources_are_not_ready", Message: Message(fmt.Sprintf("Resources not ready: %s", strings.Join(notReadyReasons, "; "))), ServiceStatus: serviceStatuses, } } func (r *DynamoGraphDeploymentReconciler) reconcileDynamoComponentsDeployments(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment, restartState *dynamo.RestartState) (ReconcileResult, error) { resources := []Resource{} logger := log.FromContext(ctx) defaultIngressSpec := dynamo.GenerateDefaultIngressSpec(dynamoDeployment, r.Config.Ingress) rollingUpdateCtx := r.buildRollingUpdateContext(ctx, dynamoDeployment) existingRestartAnnotations, err := r.getExistingRestartAnnotationsDCD(ctx, dynamoDeployment) if err != nil { logger.Error(err, "failed to get existing restart annotations") return ReconcileResult{}, fmt.Errorf("failed to get existing restart annotations: %w", err) } if rollingUpdateCtx.InProgress() { logger.Info("Rolling update in progress", "newWorkerHash", rollingUpdateCtx.NewWorkerHash, "oldWorkerReplicas", rollingUpdateCtx.OldWorkerReplicas) } // Generate all DCDs (handles both normal and rolling update cases) dynamoComponentsDeployments, err := dynamo.GenerateDynamoComponentsDeployments( ctx, dynamoDeployment, &defaultIngressSpec, restartState, existingRestartAnnotations, rollingUpdateCtx, ) if err != nil { logger.Error(err, "failed to generate the DynamoComponentsDeployments") return ReconcileResult{}, fmt.Errorf("failed to generate the DynamoComponentsDeployments: %w", err) } // Sync all generated DCDs for key, dcd := range dynamoComponentsDeployments { logger.Info("Reconciling DynamoComponentDeployment", "key", key, "name", dcd.Name) _, syncedDCD, err := commoncontroller.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*nvidiacomv1alpha1.DynamoComponentDeployment, bool, error) { return dcd, false, nil }) if err != nil { logger.Error(err, "failed to sync the DynamoComponentDeployment", "name", dcd.Name) return ReconcileResult{}, fmt.Errorf("failed to sync the DynamoComponentDeployment: %w", err) } resources = append(resources, syncedDCD) } // During rolling update, scale old worker DCDs via direct patching. // This is done separately from DCD generation to avoid overwriting the old spec // with the new spec (which would trigger an unwanted rolling update on old workers). if rollingUpdateCtx.InProgress() { if err := r.scaleOldWorkerDCDs(ctx, dynamoDeployment, rollingUpdateCtx); err != nil { logger.Error(err, "failed to scale old worker DCDs") return ReconcileResult{}, fmt.Errorf("failed to scale old worker DCDs: %w", err) } } // Check resource readiness result := r.checkResourcesReadiness(resources) // During rolling updates, aggregate old worker service statuses into the result // so that Replicas, ReadyReplicas, etc. reflect the total across old and new DCDs. if rollingUpdateCtx.InProgress() { oldWorkerStatuses, err := r.aggregateOldWorkerServiceStatuses(ctx, dynamoDeployment, rollingUpdateCtx) if err != nil { logger.Error(err, "failed to aggregate old worker service statuses") // Non-fatal: continue with partial status } else if len(oldWorkerStatuses) > 0 { mergeWorkerServiceStatuses(result.ServiceStatus, oldWorkerStatuses) } } return result, nil } func (r *DynamoGraphDeploymentReconciler) getExistingRestartAnnotationsDCD(ctx context.Context, dgd *nvidiacomv1alpha1.DynamoGraphDeployment) (map[string]string, error) { logger := log.FromContext(ctx) computedHash := dynamo.ComputeDGDWorkersSpecHash(dgd) restartAnnotations := make(map[string]string) for serviceName := range dgd.Spec.Services { dcdName := dynamo.GetDCDResourceName(dgd, serviceName, computedHash) existingDCD := &nvidiacomv1alpha1.DynamoComponentDeployment{} err := r.Get(ctx, types.NamespacedName{Name: dcdName, Namespace: dgd.Namespace}, existingDCD) if err != nil && !errors.IsNotFound(err) { return nil, fmt.Errorf("failed to get DynamoComponentDeployment: %w", err) } if errors.IsNotFound(err) { logger.Info("DynamoComponentDeployment not found", "dcdName", dcdName) continue } if existingDCD.Spec.Annotations != nil { if restartAt := existingDCD.Spec.Annotations[consts.RestartAnnotation]; restartAt != "" { restartAnnotations[serviceName] = restartAt } } } return restartAnnotations, nil } // reconcilePVC reconciles a single top-level PVC defined in the DynamoGraphDeployment spec func (r *DynamoGraphDeploymentReconciler) reconcilePVC(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment, pvcName string, pvcConfig nvidiacomv1alpha1.PVC) (*corev1.PersistentVolumeClaim, error) { logger := log.FromContext(ctx) pvc := &corev1.PersistentVolumeClaim{} pvcNamespacedName := types.NamespacedName{Name: pvcName, Namespace: dynamoDeployment.Namespace} err := r.Get(ctx, pvcNamespacedName, pvc) if err != nil && client.IgnoreNotFound(err) != nil { logger.Error(err, "Unable to retrieve top-level PVC", "pvcName", pvcName) return nil, err } // If PVC does not exist, create a new one if err != nil { if pvcConfig.Create == nil || !*pvcConfig.Create { logger.Error(err, "Top-level PVC does not exist and create is not enabled", "pvcName", pvcName) return nil, err } pvc = constructPVC(dynamoDeployment, pvcConfig) if err := controllerutil.SetControllerReference(dynamoDeployment, pvc, r.Client.Scheme()); err != nil { logger.Error(err, "Failed to set controller reference for top-level PVC", "pvcName", pvcName) return nil, err } err = r.Create(ctx, pvc) if err != nil { logger.Error(err, "Failed to create top-level PVC", "pvcName", pvcName) return nil, err } logger.Info("Top-level PVC created", "pvcName", pvcName, "namespace", dynamoDeployment.Namespace) } return pvc, nil } func (r *DynamoGraphDeploymentReconciler) reconcileK8sDiscoveryResources(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) error { logger := log.FromContext(ctx) if !commoncontroller.IsK8sDiscoveryEnabled(r.Config.Discovery.Backend, dynamoDeployment.Annotations) { logger.Info("K8s discovery is not enabled") return nil } logger.Info("K8s discovery is enabled") serviceAccount := discovery.GetK8sDiscoveryServiceAccount(dynamoDeployment.Name, dynamoDeployment.Namespace) _, _, err := commoncontroller.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*corev1.ServiceAccount, bool, error) { return serviceAccount, false, nil }) if err != nil { logger.Error(err, "failed to sync the k8s discovery service account") return fmt.Errorf("failed to sync the k8s discovery service account: %w", err) } role := discovery.GetK8sDiscoveryRole(dynamoDeployment.Name, dynamoDeployment.Namespace) _, _, err = commoncontroller.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*rbacv1.Role, bool, error) { return role, false, nil }) if err != nil { logger.Error(err, "failed to sync the k8s discovery role") return fmt.Errorf("failed to sync the k8s discovery role: %w", err) } roleBinding := discovery.GetK8sDiscoveryRoleBinding(dynamoDeployment.Name, dynamoDeployment.Namespace) _, _, err = commoncontroller.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*rbacv1.RoleBinding, bool, error) { return roleBinding, false, nil }) if err != nil { logger.Error(err, "failed to sync the k8s discovery role binding") return fmt.Errorf("failed to sync the k8s discovery role binding: %w", err) } return nil } // reconcilePVCs reconciles all top-level PVCs defined in the DynamoGraphDeployment spec func (r *DynamoGraphDeploymentReconciler) reconcilePVCs(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) error { logger := log.FromContext(ctx) if dynamoDeployment.Spec.PVCs == nil { return nil } for _, pvcConfig := range dynamoDeployment.Spec.PVCs { if pvcConfig.Name == nil || *pvcConfig.Name == "" { logger.Error(nil, "PVC not reconcilable: name is required", "pvcConfig", pvcConfig) continue } pvcName := *pvcConfig.Name logger.Info("Reconciling top-level PVC", "pvcName", pvcName, "namespace", dynamoDeployment.Namespace) _, err := r.reconcilePVC(ctx, dynamoDeployment, pvcName, pvcConfig) if err != nil { return err } } return nil } // reconcileCheckpoints reconciles Checkpoint CRs for services with checkpointing enabled // For Auto mode, it creates Checkpoint CRs if they don't exist // Returns a map of service names to checkpoint status and a map of service names to checkpoint info func (r *DynamoGraphDeploymentReconciler) reconcileCheckpoints(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) (map[string]nvidiacomv1alpha1.ServiceCheckpointStatus, map[string]*checkpoint.CheckpointInfo, error) { logger := log.FromContext(ctx) statuses := make(map[string]nvidiacomv1alpha1.ServiceCheckpointStatus) checkpointInfos := make(map[string]*checkpoint.CheckpointInfo) for serviceName, component := range dynamoDeployment.Spec.Services { if component.Checkpoint == nil || !component.Checkpoint.Enabled { continue } logger.Info("Reconciling checkpoint for service", "service", serviceName) // Resolve checkpoint for this service info, err := checkpoint.ResolveCheckpointForService(ctx, r.Client, dynamoDeployment.Namespace, component.Checkpoint) if err != nil { logger.Error(err, "Failed to resolve checkpoint for service", "service", serviceName) return nil, nil, fmt.Errorf("failed to resolve checkpoint for service %s: %w", serviceName, err) } // Store checkpoint info for later use in pod spec generation checkpointInfos[serviceName] = info // If no checkpoint found and mode is Auto, create one if info.CheckpointName == "" && component.Checkpoint.Mode == nvidiacomv1alpha1.CheckpointModeAuto { logger.Info("Creating DynamoCheckpoint CR in Auto mode", "service", serviceName) ckpt, err := r.createCheckpointCR(ctx, dynamoDeployment, serviceName, component) if err != nil { logger.Error(err, "Failed to create DynamoCheckpoint CR", "service", serviceName) return nil, nil, fmt.Errorf("failed to create checkpoint for service %s: %w", serviceName, err) } info.CheckpointName = ckpt.Name // Compute hash locally since status may not be populated yet // (checkpoint controller reconciles asynchronously) hash, err := checkpoint.ComputeIdentityHash(*component.Checkpoint.Identity) if err != nil { logger.Error(err, "Failed to compute checkpoint identity hash", "service", serviceName) return nil, nil, fmt.Errorf("failed to compute checkpoint hash for service %s: %w", serviceName, err) } info.Hash = hash info.Ready = false // Newly created checkpoint is not ready yet } // Update status statuses[serviceName] = nvidiacomv1alpha1.ServiceCheckpointStatus{ CheckpointName: info.CheckpointName, IdentityHash: info.Hash, Ready: info.Ready, } } return statuses, checkpointInfos, nil } // createCheckpointCR creates a DynamoCheckpoint CR for a service in Auto mode func (r *DynamoGraphDeploymentReconciler) createCheckpointCR( ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment, serviceName string, component *nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec, ) (*nvidiacomv1alpha1.DynamoCheckpoint, error) { if component.Checkpoint == nil || component.Checkpoint.Identity == nil { return nil, fmt.Errorf("checkpoint identity is required for Auto mode") } identity := component.Checkpoint.Identity // Compute hash for naming hash, err := checkpoint.ComputeIdentityHash(*identity) if err != nil { return nil, fmt.Errorf("failed to compute identity hash: %w", err) } // Generate checkpoint name: use hash directly (16 chars, 64 bits) // This allows natural deduplication - same identity = same checkpoint name // 16 characters provides excellent collision resistance (1% at 500M configs) ckptName := hash // Use SyncResource to create/update the DynamoCheckpoint CR // Pass nil as parentResource to create an independent checkpoint (no owner reference) // This ensures the checkpoint persists even if the DGD is deleted _, ckpt, err := commoncontroller.SyncResource(ctx, r, nil, func(ctx context.Context) (*nvidiacomv1alpha1.DynamoCheckpoint, bool, error) { // Build the checkpoint identity from service identity checkpointIdentity := nvidiacomv1alpha1.DynamoCheckpointIdentity{ Model: identity.Model, BackendFramework: identity.BackendFramework, DynamoVersion: identity.DynamoVersion, TensorParallelSize: identity.TensorParallelSize, PipelineParallelSize: identity.PipelineParallelSize, Dtype: identity.Dtype, MaxModelLen: identity.MaxModelLen, ExtraParameters: identity.ExtraParameters, } // Build pod template from service spec for checkpoint job // This uses GenerateBasePodSpec to ensure same config as worker pods (image pull secrets, etc.) // Pass framework from checkpoint identity for accurate backend detection podTemplate, err := r.buildCheckpointJobPodTemplate( dynamoDeployment, component, serviceName, identity.BackendFramework, // Use framework from checkpoint identity ) if err != nil { return nil, false, fmt.Errorf("failed to build checkpoint job pod template: %w", err) } ckpt := &nvidiacomv1alpha1.DynamoCheckpoint{ ObjectMeta: metav1.ObjectMeta{ Name: ckptName, Namespace: dynamoDeployment.Namespace, Labels: map[string]string{ consts.KubeLabelDynamoGraphDeploymentName: dynamoDeployment.Name, consts.KubeLabelDynamoComponent: serviceName, consts.KubeLabelCheckpointHash: hash, }, }, Spec: nvidiacomv1alpha1.DynamoCheckpointSpec{ Identity: checkpointIdentity, Job: nvidiacomv1alpha1.DynamoCheckpointJobConfig{ PodTemplateSpec: podTemplate, }, }, } return ckpt, false, nil }) if err != nil { return nil, fmt.Errorf("failed to sync checkpoint CR: %w", err) } return ckpt, nil } // buildCheckpointJobPodTemplate builds a pod template for the checkpoint job from service spec // It reuses GenerateBasePodSpec to ensure checkpoint jobs have the same configuration as regular pods, // including auto-discovered image pull secrets, envFromSecret, resources, security context, etc. func (r *DynamoGraphDeploymentReconciler) buildCheckpointJobPodTemplate( dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment, component *nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec, serviceName string, framework string, // From checkpoint identity (e.g., "vllm", "sglang", "trtllm") ) (corev1.PodTemplateSpec, error) { // Parse framework string to BackendFramework type backendFramework, err := dynamo.ParseBackendFramework(framework) if err != nil { return corev1.PodTemplateSpec{}, err } // Create a copy of the component spec without checkpoint config // The checkpoint job is CREATING the checkpoint, not restoring from one componentForJob := component.DeepCopy() componentForJob.Checkpoint = nil // Ensure DYN_NAMESPACE is set for checkpoint job using the same logic as regular pods // This is required for service discovery and distributed coordination dynamoNamespace := dynamo.GetDynamoNamespace(dynamoDeployment, component) componentForJob.DynamoNamespace = &dynamoNamespace // Generate base PodSpec using the same logic as regular worker pods // This includes: image pull secrets (auto-discovered + explicit), envFromSecret, // resources, security context, tolerations, node selectors, etc. // // Note: For checkpoint jobs, we use Grove deployment type even though it's single-node. // This is because GenerateBasePodSpec requires a valid MultinodeDeployer, and for // single-node cases, the backends simply return early without modifications. podSpec, err := dynamo.GenerateBasePodSpec( componentForJob, backendFramework, r.DockerSecretRetriever, dynamoDeployment.Name, dynamoDeployment.Namespace, dynamo.RoleCheckpoint, // Use checkpoint role 1, // Single node for checkpoint job r.Config, consts.MultinodeDeploymentTypeGrove, // Use Grove (single-node backends return early) serviceName, nil, // No checkpoint info for checkpoint creation jobs ) if err != nil { return corev1.PodTemplateSpec{}, fmt.Errorf("failed to generate base pod spec: %w", err) } // Override RestartPolicy for job (must be Never or OnFailure) podSpec.RestartPolicy = corev1.RestartPolicyNever return corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ consts.KubeLabelDynamoComponent: serviceName, }, }, Spec: *podSpec, }, nil } // reconcileScalingAdapters ensures a DynamoGraphDeploymentScalingAdapter exists for each service in the DGD // that has scaling adapter explicitly enabled. Services without scalingAdapter.enabled=true will not have a DGDSA. // This enables pluggable autoscaling via HPA, KEDA, or Planner. func (r *DynamoGraphDeploymentReconciler) reconcileScalingAdapters(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) error { logger := log.FromContext(ctx) // Process each service - SyncResource handles create, update, and delete via toDelete flag for serviceName, component := range dynamoDeployment.Spec.Services { // Check if scaling adapter is enabled for this service (disabled by default) scalingAdapterEnabled := component.ScalingAdapter != nil && component.ScalingAdapter.Enabled // Get current replicas (default to 1 if not set) currentReplicas := int32(1) if component.Replicas != nil { currentReplicas = *component.Replicas } // Use SyncResource to handle creation/updates/deletion // When toDelete=true, SyncResource will delete the existing resource if it exists _, _, err := commoncontroller.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*nvidiacomv1alpha1.DynamoGraphDeploymentScalingAdapter, bool, error) { adapterName := generateAdapterName(dynamoDeployment.Name, serviceName) adapter := &nvidiacomv1alpha1.DynamoGraphDeploymentScalingAdapter{ ObjectMeta: metav1.ObjectMeta{ Name: adapterName, Namespace: dynamoDeployment.Namespace, Labels: map[string]string{ consts.KubeLabelDynamoGraphDeploymentName: dynamoDeployment.Name, consts.KubeLabelDynamoComponent: serviceName, }, }, Spec: nvidiacomv1alpha1.DynamoGraphDeploymentScalingAdapterSpec{ Replicas: currentReplicas, DGDRef: nvidiacomv1alpha1.DynamoGraphDeploymentServiceRef{ Name: dynamoDeployment.Name, ServiceName: serviceName, }, }, } // Return toDelete=true if scaling adapter is not enabled return adapter, !scalingAdapterEnabled, nil }) if err != nil { logger.Error(err, "Failed to sync DynamoGraphDeploymentScalingAdapter", "service", serviceName) return err } } // Clean up adapters for services that were removed from DGD entirely adapterList := &nvidiacomv1alpha1.DynamoGraphDeploymentScalingAdapterList{} if err := r.List(ctx, adapterList, client.InNamespace(dynamoDeployment.Namespace), client.MatchingLabels{consts.KubeLabelDynamoGraphDeploymentName: dynamoDeployment.Name}, ); err != nil { logger.Error(err, "Failed to list DynamoGraphDeploymentScalingAdapters") return err } for i := range adapterList.Items { adapter := &adapterList.Items[i] serviceName := adapter.Spec.DGDRef.ServiceName // Delete adapter if service no longer exists in DGD if _, exists := dynamoDeployment.Spec.Services[serviceName]; !exists { logger.Info("Deleting orphaned DynamoGraphDeploymentScalingAdapter", "adapter", adapter.Name, "service", serviceName) if err := r.Delete(ctx, adapter); err != nil && !errors.IsNotFound(err) { logger.Error(err, "Failed to delete orphaned adapter", "adapter", adapter.Name) return err } r.Recorder.Eventf(dynamoDeployment, corev1.EventTypeNormal, "AdapterDeleted", "Deleted orphaned scaling adapter %s for removed service %s", adapter.Name, serviceName) } } return nil } // generateAdapterName creates a consistent name for a DynamoGraphDeploymentScalingAdapter // Service names are lowercased to comply with Kubernetes DNS subdomain naming requirements func generateAdapterName(dgdName, serviceName string) string { return fmt.Sprintf("%s-%s", dgdName, strings.ToLower(serviceName)) } // hasEPPService checks if the DGD has an EPP service defined // reconcileEPPResources reconciles all EPP-related resources (ConfigMaps, Services, InferencePools) func (r *DynamoGraphDeploymentReconciler) reconcileEPPResources(ctx context.Context, dgd *nvidiacomv1alpha1.DynamoGraphDeployment) error { logger := log.FromContext(ctx) componentName, eppService, hasEPP := dgd.GetEPPService() if !hasEPP { logger.V(1).Info("No EPP service defined, skipping EPP resource reconciliation") return nil } logger.Info("Reconciling EPP resources", "componentName", componentName) // 1. Reconcile EPP ConfigMap (if needed - not needed when ConfigMapRef is used) if eppService.EPPConfig == nil || eppService.EPPConfig.ConfigMapRef == nil { configMap, err := epp.GenerateConfigMap(ctx, dgd, componentName, eppService.EPPConfig) if err != nil { logger.Error(err, "Failed to generate EPP ConfigMap") return fmt.Errorf("failed to generate EPP ConfigMap: %w", err) } if configMap != nil { _, _, err = commoncontroller.SyncResource(ctx, r, dgd, func(ctx context.Context) (*corev1.ConfigMap, bool, error) { return configMap, false, nil }) if err != nil { logger.Error(err, "Failed to sync EPP ConfigMap") return fmt.Errorf("failed to sync EPP ConfigMap: %w", err) } } } // 2. Reconcile InferencePool // Note: EPP Service is created automatically by the standard component reconciliation // via GenerateComponentService() in graph.go (see ComponentTypeEPP case) eppServiceName := dynamo.GetDCDResourceName(dgd, componentName, "") inferencePool, err := epp.GenerateInferencePool(dgd, componentName, eppServiceName, eppService.EPPConfig) if err != nil { logger.Error(err, "Failed to generate EPP InferencePool") return fmt.Errorf("failed to generate EPP InferencePool: %w", err) } _, _, err = commoncontroller.SyncResource(ctx, r, dgd, func(ctx context.Context) (*gaiev1.InferencePool, bool, error) { return inferencePool, false, nil }) if err != nil { logger.Error(err, "Failed to sync EPP InferencePool") return fmt.Errorf("failed to sync EPP InferencePool: %w", err) } logger.Info("Successfully reconciled EPP resources", "poolName", inferencePool.GetName()) return nil } func (r *DynamoGraphDeploymentReconciler) FinalizeResource(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) error { // for now doing nothing return nil } // SetupWithManager sets up the controller with the Manager. func (r *DynamoGraphDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error { ctrlBuilder := ctrl.NewControllerManagedBy(mgr). For(&nvidiacomv1alpha1.DynamoGraphDeployment{}, builder.WithPredicates( predicate.GenerationChangedPredicate{}, )). Named(consts.ResourceTypeDynamoGraphDeployment). Owns(&nvidiacomv1alpha1.DynamoComponentDeployment{}, builder.WithPredicates(predicate.Funcs{ // ignore creation cause we don't want to be called again after we create the deployment CreateFunc: func(ce event.CreateEvent) bool { return false }, DeleteFunc: func(de event.DeleteEvent) bool { return true }, UpdateFunc: func(de event.UpdateEvent) bool { return true }, GenericFunc: func(ge event.GenericEvent) bool { return true }, })). Owns(&nvidiacomv1alpha1.DynamoGraphDeploymentScalingAdapter{}, builder.WithPredicates(predicate.Funcs{ // ignore creation cause we don't want to be called again after we create the adapter CreateFunc: func(ce event.CreateEvent) bool { return false }, DeleteFunc: func(de event.DeleteEvent) bool { return true }, UpdateFunc: func(de event.UpdateEvent) bool { return false }, // Adapter updates are handled by adapter controller GenericFunc: func(ge event.GenericEvent) bool { return false }, })). Owns(&corev1.PersistentVolumeClaim{}, builder.WithPredicates(predicate.Funcs{ // ignore creation cause we don't want to be called again after we create the PVC CreateFunc: func(ce event.CreateEvent) bool { return false }, DeleteFunc: func(de event.DeleteEvent) bool { return true }, UpdateFunc: func(de event.UpdateEvent) bool { return true }, GenericFunc: func(ge event.GenericEvent) bool { return true }, })). WithEventFilter(commoncontroller.EphemeralDeploymentEventFilter(r.Config, r.RuntimeConfig)) if r.RuntimeConfig.GroveEnabled { ctrlBuilder = ctrlBuilder.Owns(&grovev1alpha1.PodCliqueSet{}, builder.WithPredicates(predicate.Funcs{ // ignore creation cause we don't want to be called again after we create the pod gang set CreateFunc: func(ce event.CreateEvent) bool { return false }, DeleteFunc: func(de event.DeleteEvent) bool { return true }, UpdateFunc: func(de event.UpdateEvent) bool { return true }, GenericFunc: func(ge event.GenericEvent) bool { return true }, })). // Watch PodClique resources - only on status changes // Note: We don't need to watch PodCliqueScalingGroup because it's just a container // for PodCliques. The actual status changes happen at the PodClique level. Watches( &grovev1alpha1.PodClique{}, handler.EnqueueRequestsFromMapFunc(r.mapPodCliqueToRequests), builder.WithPredicates(predicate.Funcs{ CreateFunc: func(ce event.CreateEvent) bool { return false }, DeleteFunc: func(de event.DeleteEvent) bool { return false }, UpdateFunc: func(ue event.UpdateEvent) bool { // Only trigger on status changes (readyReplicas or replicas) oldPC, okOld := ue.ObjectOld.(*grovev1alpha1.PodClique) newPC, okNew := ue.ObjectNew.(*grovev1alpha1.PodClique) if !okOld || !okNew { return false } // Trigger if readyReplicas or replicas changed return oldPC.Status.ReadyReplicas != newPC.Status.ReadyReplicas || oldPC.Spec.Replicas != newPC.Spec.Replicas }, GenericFunc: func(ge event.GenericEvent) bool { return false }, }), ) } // Wrap with metrics collection observedReconciler := observability.NewObservedReconciler(r, consts.ResourceTypeDynamoGraphDeployment) return ctrlBuilder.Complete(observedReconciler) } func (r *DynamoGraphDeploymentReconciler) GetRecorder() record.EventRecorder { return r.Recorder } // mapPodCliqueToRequests maps a PodClique to reconcile requests for its owning DGD // Uses the nvidia.com/dynamo-graph-deployment-name label for direct lookup - no API calls needed! func (r *DynamoGraphDeploymentReconciler) mapPodCliqueToRequests(ctx context.Context, obj client.Object) []ctrl.Request { podClique, ok := obj.(*grovev1alpha1.PodClique) if !ok { return nil } // PodCliques are labeled with the DGD name and live in the same namespace dgdName, hasLabel := podClique.GetLabels()[consts.KubeLabelDynamoGraphDeploymentName] if !hasLabel || dgdName == "" { log.FromContext(ctx).V(1).Info("PodClique missing DGD label", "podClique", podClique.Name, "namespace", podClique.Namespace) return nil } return []ctrl.Request{{ NamespacedName: types.NamespacedName{ Name: dgdName, Namespace: podClique.Namespace, }, }} }