/* * SPDX-FileCopyrightText: Copyright (c) 2022 Atalaya Tech. Inc * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. * SPDX-License-Identifier: Apache-2.0 * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * Modifications Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES */ package controller import ( "bytes" "context" "crypto/md5" "encoding/hex" "fmt" "os" "path/filepath" "strconv" "strings" "text/template" "time" "emperror.dev/errors" commonconfig "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/config" "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/consts" commonconsts "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/consts" "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/controller_common" "github.com/apparentlymart/go-shquot/shquot" "github.com/awslabs/amazon-ecr-credential-helper/ecr-login" "github.com/chrismellard/docker-credential-acr-env/pkg/credhelper" "github.com/goccy/go-yaml" "github.com/google/go-containerregistry/pkg/authn" "github.com/google/go-containerregistry/pkg/name" "github.com/google/go-containerregistry/pkg/v1/google" "github.com/google/go-containerregistry/pkg/v1/remote" "github.com/huandu/xstrings" "github.com/mitchellh/hashstructure/v2" "github.com/rs/xid" "github.com/sergeymakinen/go-quote/unix" "github.com/sirupsen/logrus" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" apiStoreClient "github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/dynamo/api_store_client" dynamoCommon "github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/dynamo/common" "github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/dynamo/schemas" nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/v1alpha1" ) // DynamoComponentReconciler reconciles a DynamoComponent object type DynamoComponentReconciler struct { client.Client Scheme *runtime.Scheme Recorder record.EventRecorder Config controller_common.Config } // +kubebuilder:rbac:groups=nvidia.com,resources=dynamocomponents,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=nvidia.com,resources=dynamocomponents/status,verbs=get;update;patch // +kubebuilder:rbac:groups=nvidia.com,resources=dynamocomponents/finalizers,verbs=update //+kubebuilder:rbac:groups=nvidia.com,resources=dynamocomponents,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=nvidia.com,resources=dynamocomponents/status,verbs=get;update;patch //+kubebuilder:rbac:groups=events.k8s.io,resources=events,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=core,resources=events,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=core,resources=serviceaccounts,verbs=get;list;watch //+kubebuilder:rbac:groups=coordination.k8s.io,resources=leases,verbs=get;list;watch;create;update;patch;delete // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. // TODO(user): Modify the Reconcile function to compare the state specified by // the DynamoComponent object against the actual cluster state, and then // perform operations to make the cluster state reflect the state specified by // the user. // // For more details, check Reconcile and its Result here: // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.18.2/pkg/reconcile // //nolint:gocyclo,nakedret func (r *DynamoComponentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, err error) { logs := log.FromContext(ctx) DynamoComponent := &nvidiacomv1alpha1.DynamoComponent{} err = r.Get(ctx, req.NamespacedName, DynamoComponent) if err != nil { if k8serrors.IsNotFound(err) { // Object not found, return. Created objects are automatically garbage collected. // For additional cleanup logic use finalizers. logs.Info("DynamoComponent resource not found. Ignoring since object must be deleted") err = nil return } // Error reading the object - requeue the request. logs.Error(err, "Failed to get DynamoComponent") return } if DynamoComponent.IsReady() { logs.Info("Skip available DynamoComponent") return } if len(DynamoComponent.Status.Conditions) == 0 { DynamoComponent, err = r.setStatusConditions(ctx, req, metav1.Condition{ Type: nvidiacomv1alpha1.DynamoComponentConditionTypeImageBuilding, Status: metav1.ConditionUnknown, Reason: "Reconciling", Message: "Starting to reconcile DynamoComponent", }, metav1.Condition{ Type: nvidiacomv1alpha1.DynamoComponentConditionTypeImageExists, Status: metav1.ConditionUnknown, Reason: "Reconciling", Message: "Starting to reconcile DynamoComponent", }, metav1.Condition{ Type: nvidiacomv1alpha1.DynamoComponentConditionTypeDynamoComponentAvailable, Status: metav1.ConditionUnknown, Reason: "Reconciling", Message: "Reconciling", }, ) if err != nil { return } } logs = logs.WithValues("DynamoComponent", DynamoComponent.Name, "DynamoComponentNamespace", DynamoComponent.Namespace) defer func() { if err == nil { logs.Info("Reconcile success") return } logs.Error(err, "Failed to reconcile DynamoComponent.") r.Recorder.Eventf(DynamoComponent, corev1.EventTypeWarning, "ReconcileError", "Failed to reconcile DynamoComponent: %v", err) _, err_ := r.setStatusConditions(ctx, req, metav1.Condition{ Type: nvidiacomv1alpha1.DynamoComponentConditionTypeDynamoComponentAvailable, Status: metav1.ConditionFalse, Reason: "Reconciling", Message: err.Error(), }, ) if err_ != nil { logs.Error(err_, "Failed to update DynamoComponent status") return } }() DynamoComponent, _, imageExists, imageExistsResult, err := r.ensureImageExists(ctx, ensureImageExistsOption{ DynamoComponent: DynamoComponent, req: req, }) if err != nil { err = errors.Wrapf(err, "ensure image exists") return } if !imageExists { result = imageExistsResult DynamoComponent, err = r.setStatusConditions(ctx, req, metav1.Condition{ Type: nvidiacomv1alpha1.DynamoComponentConditionTypeDynamoComponentAvailable, Status: metav1.ConditionUnknown, Reason: "Reconciling", Message: "DynamoComponent image is building", }, ) if err != nil { return } return } DynamoComponent, err = r.setStatusConditions(ctx, req, metav1.Condition{ Type: nvidiacomv1alpha1.DynamoComponentConditionTypeDynamoComponentAvailable, Status: metav1.ConditionTrue, Reason: "Reconciling", Message: "DynamoComponent image is generated", }, ) if err != nil { return } return } func isEstargzEnabled() bool { return os.Getenv("ESTARGZ_ENABLED") == commonconsts.KubeLabelValueTrue } type ensureImageExistsOption struct { DynamoComponent *nvidiacomv1alpha1.DynamoComponent req ctrl.Request } //nolint:gocyclo,nakedret func (r *DynamoComponentReconciler) ensureImageExists(ctx context.Context, opt ensureImageExistsOption) (DynamoComponent *nvidiacomv1alpha1.DynamoComponent, imageInfo ImageInfo, imageExists bool, result ctrl.Result, err error) { // nolint: unparam logs := log.FromContext(ctx) DynamoComponent = opt.DynamoComponent req := opt.req imageInfo = r.getImageInfo(GetImageInfoOption{ DynamoComponent: DynamoComponent, }) imageExistsCheckedCondition := meta.FindStatusCondition(DynamoComponent.Status.Conditions, nvidiacomv1alpha1.DynamoComponentConditionTypeImageExistsChecked) imageExistsCondition := meta.FindStatusCondition(DynamoComponent.Status.Conditions, nvidiacomv1alpha1.DynamoComponentConditionTypeImageExists) if imageExistsCheckedCondition == nil || imageExistsCheckedCondition.Status != metav1.ConditionTrue || imageExistsCheckedCondition.Message != imageInfo.ImageName { imageExistsCheckedCondition = &metav1.Condition{ Type: nvidiacomv1alpha1.DynamoComponentConditionTypeImageExistsChecked, Status: metav1.ConditionUnknown, Reason: "Reconciling", Message: imageInfo.ImageName, } dynamoComponentAvailableCondition := &metav1.Condition{ Type: nvidiacomv1alpha1.DynamoComponentConditionTypeDynamoComponentAvailable, Status: metav1.ConditionUnknown, Reason: "Reconciling", Message: "Checking image exists", } DynamoComponent, err = r.setStatusConditions(ctx, req, *imageExistsCheckedCondition, *dynamoComponentAvailableCondition) if err != nil { return } r.Recorder.Eventf(DynamoComponent, corev1.EventTypeNormal, "CheckingImage", "Checking image exists: %s", imageInfo.ImageName) imageExists, err = checkImageExists(DynamoComponent, imageInfo.ImageName) if err != nil { err = errors.Wrapf(err, "check image %s exists", imageInfo.ImageName) return } err = r.Get(ctx, req.NamespacedName, DynamoComponent) if err != nil { logs.Error(err, "Failed to re-fetch DynamoComponent") return } if imageExists { r.Recorder.Eventf(DynamoComponent, corev1.EventTypeNormal, "CheckingImage", "Image exists: %s", imageInfo.ImageName) imageExistsCheckedCondition = &metav1.Condition{ Type: nvidiacomv1alpha1.DynamoComponentConditionTypeImageExistsChecked, Status: metav1.ConditionTrue, Reason: "Reconciling", Message: imageInfo.ImageName, } imageExistsCondition = &metav1.Condition{ Type: nvidiacomv1alpha1.DynamoComponentConditionTypeImageExists, Status: metav1.ConditionTrue, Reason: "Reconciling", Message: imageInfo.ImageName, } DynamoComponent, err = r.setStatusConditions(ctx, req, *imageExistsCondition, *imageExistsCheckedCondition) if err != nil { return } } else { r.Recorder.Eventf(DynamoComponent, corev1.EventTypeNormal, "CheckingImage", "Image not exists: %s", imageInfo.ImageName) imageExistsCheckedCondition = &metav1.Condition{ Type: nvidiacomv1alpha1.DynamoComponentConditionTypeImageExistsChecked, Status: metav1.ConditionFalse, Reason: "Reconciling", Message: fmt.Sprintf("Image not exists: %s", imageInfo.ImageName), } imageExistsCondition = &metav1.Condition{ Type: nvidiacomv1alpha1.DynamoComponentConditionTypeImageExists, Status: metav1.ConditionFalse, Reason: "Reconciling", Message: fmt.Sprintf("Image %s is not exists", imageInfo.ImageName), } DynamoComponent, err = r.setStatusConditions(ctx, req, *imageExistsCondition, *imageExistsCheckedCondition) if err != nil { return } } } var DynamoComponentHashStr string DynamoComponentHashStr, err = r.getHashStr(DynamoComponent) if err != nil { err = errors.Wrapf(err, "get DynamoComponent %s/%s hash string", DynamoComponent.Namespace, DynamoComponent.Name) return } imageExists = imageExistsCondition != nil && imageExistsCondition.Status == metav1.ConditionTrue && imageExistsCondition.Message == imageInfo.ImageName if imageExists { return } jobLabels := map[string]string{ commonconsts.KubeLabelDynamoComponent: DynamoComponent.Name, commonconsts.KubeLabelIsDynamoImageBuilder: commonconsts.KubeLabelValueTrue, } jobs := &batchv1.JobList{} err = r.List(ctx, jobs, client.InNamespace(req.Namespace), client.MatchingLabels(jobLabels)) if err != nil { err = errors.Wrap(err, "list jobs") return } reservedJobs := make([]*batchv1.Job, 0) for _, job_ := range jobs.Items { job_ := job_ oldHash := job_.Annotations[consts.KubeAnnotationDynamoComponentHash] if oldHash != DynamoComponentHashStr { logs.Info("Because hash changed, delete old job", "job", job_.Name, "oldHash", oldHash, "newHash", DynamoComponentHashStr) // --cascade=foreground err = r.Delete(ctx, &job_, &client.DeleteOptions{ PropagationPolicy: &[]metav1.DeletionPropagation{metav1.DeletePropagationForeground}[0], }) if err != nil { err = errors.Wrapf(err, "delete job %s", job_.Name) return } return } else { reservedJobs = append(reservedJobs, &job_) } } var job *batchv1.Job if len(reservedJobs) > 0 { job = reservedJobs[0] } if len(reservedJobs) > 1 { for _, job_ := range reservedJobs[1:] { logs.Info("Because has more than one job, delete old job", "job", job_.Name) // --cascade=foreground err = r.Delete(ctx, job_, &client.DeleteOptions{ PropagationPolicy: &[]metav1.DeletionPropagation{metav1.DeletePropagationForeground}[0], }) if err != nil { err = errors.Wrapf(err, "delete job %s", job_.Name) return } } } if job == nil { job, err = r.generateImageBuilderJob(ctx, GenerateImageBuilderJobOption{ ImageInfo: imageInfo, DynamoComponent: DynamoComponent, }) if err != nil { err = errors.Wrap(err, "generate image builder job") return } r.Recorder.Eventf(DynamoComponent, corev1.EventTypeNormal, "GenerateImageBuilderJob", "Creating image builder job: %s", job.Name) err = r.Create(ctx, job) if err != nil { err = errors.Wrapf(err, "create image builder job %s", job.Name) return } r.Recorder.Eventf(DynamoComponent, corev1.EventTypeNormal, "GenerateImageBuilderJob", "Created image builder job: %s", job.Name) return } r.Recorder.Eventf(DynamoComponent, corev1.EventTypeNormal, "CheckingImageBuilderJob", "Found image builder job: %s", job.Name) err = r.Get(ctx, req.NamespacedName, DynamoComponent) if err != nil { logs.Error(err, "Failed to re-fetch DynamoComponent") return } imageBuildingCondition := meta.FindStatusCondition(DynamoComponent.Status.Conditions, nvidiacomv1alpha1.DynamoComponentConditionTypeImageBuilding) isJobFailed := false isJobRunning := true if job.Spec.Completions != nil { if job.Status.Succeeded != *job.Spec.Completions { if job.Status.Failed > 0 { for _, condition := range job.Status.Conditions { if condition.Type == batchv1.JobFailed && condition.Status == corev1.ConditionTrue { isJobFailed = true break } } } isJobRunning = !isJobFailed } else { isJobRunning = false } } if isJobRunning { conditions := make([]metav1.Condition, 0) if job.Status.Active > 0 { conditions = append(conditions, metav1.Condition{ Type: nvidiacomv1alpha1.DynamoComponentConditionTypeImageBuilding, Status: metav1.ConditionTrue, Reason: "Reconciling", Message: fmt.Sprintf("Image building job %s is running", job.Name), }) } else { conditions = append(conditions, metav1.Condition{ Type: nvidiacomv1alpha1.DynamoComponentConditionTypeImageBuilding, Status: metav1.ConditionUnknown, Reason: "Reconciling", Message: fmt.Sprintf("Image building job %s is waiting", job.Name), }) } if DynamoComponent.Spec.ImageBuildTimeout != nil { if imageBuildingCondition != nil && imageBuildingCondition.LastTransitionTime.Add(time.Duration(*DynamoComponent.Spec.ImageBuildTimeout)).Before(time.Now()) { conditions = append(conditions, metav1.Condition{ Type: nvidiacomv1alpha1.DynamoComponentConditionTypeImageBuilding, Status: metav1.ConditionFalse, Reason: "Timeout", Message: fmt.Sprintf("Image building job %s is timeout", job.Name), }) if _, err = r.setStatusConditions(ctx, req, conditions...); err != nil { return } err = errors.New("image build timeout") return } } if DynamoComponent, err = r.setStatusConditions(ctx, req, conditions...); err != nil { return } if imageBuildingCondition != nil && imageBuildingCondition.Status != metav1.ConditionTrue && isJobRunning { r.Recorder.Eventf(DynamoComponent, corev1.EventTypeNormal, "DynamoComponentImageBuilder", "Image is building now") } return } if isJobFailed { DynamoComponent, err = r.setStatusConditions(ctx, req, metav1.Condition{ Type: nvidiacomv1alpha1.DynamoComponentConditionTypeImageBuilding, Status: metav1.ConditionFalse, Reason: "Reconciling", Message: fmt.Sprintf("Image building job %s is failed.", job.Name), }, metav1.Condition{ Type: nvidiacomv1alpha1.DynamoComponentConditionTypeDynamoComponentAvailable, Status: metav1.ConditionFalse, Reason: "Reconciling", Message: fmt.Sprintf("Image building job %s is failed.", job.Name), }, ) if err != nil { return } return } DynamoComponent, err = r.setStatusConditions(ctx, req, metav1.Condition{ Type: nvidiacomv1alpha1.DynamoComponentConditionTypeImageBuilding, Status: metav1.ConditionFalse, Reason: "Reconciling", Message: fmt.Sprintf("Image building job %s is succeeded.", job.Name), }, metav1.Condition{ Type: nvidiacomv1alpha1.DynamoComponentConditionTypeImageExists, Status: metav1.ConditionTrue, Reason: "Reconciling", Message: imageInfo.ImageName, }, ) if err != nil { return } r.Recorder.Eventf(DynamoComponent, corev1.EventTypeNormal, "DynamoComponentImageBuilder", "Image has been built successfully") imageExists = true return } func (r *DynamoComponentReconciler) setStatusConditions(ctx context.Context, req ctrl.Request, conditions ...metav1.Condition) (DynamoComponent *nvidiacomv1alpha1.DynamoComponent, err error) { DynamoComponent = &nvidiacomv1alpha1.DynamoComponent{} /* Please don't blame me when you see this kind of code, this is to avoid "the object has been modified; please apply your changes to the latest version and try again" when updating CR status, don't doubt that almost all CRD operators (e.g. cert-manager) can't avoid this stupid error and can only try to avoid this by this stupid way. */ for i := 0; i < 3; i++ { if err = r.Get(ctx, req.NamespacedName, DynamoComponent); err != nil { err = errors.Wrap(err, "Failed to re-fetch DynamoComponent") return } for _, condition := range conditions { meta.SetStatusCondition(&DynamoComponent.Status.Conditions, condition) } if err = r.Status().Update(ctx, DynamoComponent); err != nil { time.Sleep(100 * time.Millisecond) } else { break } } if err != nil { err = errors.Wrap(err, "Failed to update DynamoComponent status") return } if err = r.Get(ctx, req.NamespacedName, DynamoComponent); err != nil { err = errors.Wrap(err, "Failed to re-fetch DynamoComponent") return } return } type DynamoComponentImageBuildEngine string const ( DynamoComponentImageBuildEngineKaniko DynamoComponentImageBuildEngine = "kaniko" DynamoComponentImageBuildEngineBuildkit DynamoComponentImageBuildEngine = "buildkit" DynamoComponentImageBuildEngineBuildkitRootless DynamoComponentImageBuildEngine = "buildkit-rootless" ) const ( EnvDynamoImageBuildEngine = "DYNAMO_IMAGE_BUILD_ENGINE" ) func getDynamoComponentImageBuildEngine() DynamoComponentImageBuildEngine { engine := os.Getenv(EnvDynamoImageBuildEngine) if engine == "" { return DynamoComponentImageBuildEngineKaniko } return DynamoComponentImageBuildEngine(engine) } //nolint:nakedret func (r *DynamoComponentReconciler) getApiStoreClient(ctx context.Context) (*apiStoreClient.ApiStoreClient, *commonconfig.ApiStoreConfig, error) { apiStoreConf, err := commonconfig.GetApiStoreConfig(ctx) isNotFound := k8serrors.IsNotFound(err) if err != nil && !isNotFound { err = errors.Wrap(err, "get api store config") return nil, nil, err } if isNotFound { return nil, nil, err } if apiStoreConf.Endpoint == "" { return nil, nil, err } if apiStoreConf.ClusterName == "" { apiStoreConf.ClusterName = "default" } apiStoreClient := apiStoreClient.NewApiStoreClient(apiStoreConf.Endpoint) return apiStoreClient, apiStoreConf, nil } //nolint:nakedret func (r *DynamoComponentReconciler) getDockerRegistry(DynamoComponent *nvidiacomv1alpha1.DynamoComponent) *schemas.DockerRegistrySchema { dockerRegistryConfig := commonconfig.GetDockerRegistryConfig() dynamoRepositoryName := "dynamo-components" if dockerRegistryConfig.DynamoComponentsRepositoryName != "" { dynamoRepositoryName = dockerRegistryConfig.DynamoComponentsRepositoryName } dynamoRepositoryURI := fmt.Sprintf("%s/%s", strings.TrimRight(dockerRegistryConfig.Server, "/"), dynamoRepositoryName) if DynamoComponent != nil && DynamoComponent.Spec.DockerConfigJSONSecretName != "" { dockerRegistryConfig.SecretName = DynamoComponent.Spec.DockerConfigJSONSecretName } return &schemas.DockerRegistrySchema{ Server: dockerRegistryConfig.Server, Secure: dockerRegistryConfig.Secure, DynamoRepositoryURI: dynamoRepositoryURI, SecretName: dockerRegistryConfig.SecretName, } } func isAddNamespacePrefix() bool { return os.Getenv("ADD_NAMESPACE_PREFIX_TO_IMAGE_NAME") == trueStr } func getDynamoComponentImagePrefix(DynamoComponent *nvidiacomv1alpha1.DynamoComponent) string { if DynamoComponent == nil { return "" } prefix, exist := DynamoComponent.Annotations[consts.KubeAnnotationDynamoComponentStorageNS] if exist && prefix != "" { return fmt.Sprintf("%s.", prefix) } if isAddNamespacePrefix() { return fmt.Sprintf("%s.", DynamoComponent.Namespace) } return "" } func getDynamoComponentImageName(DynamoComponent *nvidiacomv1alpha1.DynamoComponent, dockerRegistry schemas.DockerRegistrySchema, dynamoComponentRepositoryName, dynamoComponentVersion string) string { if DynamoComponent != nil && DynamoComponent.Spec.Image != "" { return DynamoComponent.Spec.Image } var uri, tag string uri = dockerRegistry.DynamoRepositoryURI tail := fmt.Sprintf("%s.%s", dynamoComponentRepositoryName, dynamoComponentVersion) if isEstargzEnabled() { tail += ".esgz" } tag = fmt.Sprintf("dynamo.%s%s", getDynamoComponentImagePrefix(DynamoComponent), tail) if len(tag) > 128 { hashStr := hash(tail) tag = fmt.Sprintf("dynamo.%s%s", getDynamoComponentImagePrefix(DynamoComponent), hashStr) if len(tag) > 128 { tag = fmt.Sprintf("dynamo.%s", hash(fmt.Sprintf("%s%s", getDynamoComponentImagePrefix(DynamoComponent), tail)))[:128] } } return fmt.Sprintf("%s:%s", uri, tag) } func checkImageExists(DynamoComponent *nvidiacomv1alpha1.DynamoComponent, imageName string) (bool, error) { if DynamoComponent.Annotations["nvidia.com/force-build-image"] == commonconsts.KubeLabelValueTrue { return false, nil } ref, err := name.ParseReference(imageName) if err != nil { return false, fmt.Errorf("parsing image reference: %w", err) } keychain := authn.NewMultiKeychain( // This picks up auth from DOCKER_CONFIG env var authn.DefaultKeychain, // This picks up auth from GCR google.Keychain, // This picks up auth from ECR authn.NewKeychainFromHelper(ecr.NewECRHelper()), // This picks up auth from ACR authn.NewKeychainFromHelper(credhelper.NewACRCredentialsHelper()), ) _, err = remote.Head(ref, remote.WithAuthFromKeychain(keychain)) if err != nil { if strings.Contains(err.Error(), "404") { return false, nil } return false, fmt.Errorf("checking image: %w", err) } return true, nil } type ImageInfo struct { DockerRegistry schemas.DockerRegistrySchema DockerConfigJSONSecretName string ImageName string DockerRegistryInsecure bool } type GetImageInfoOption struct { DynamoComponent *nvidiacomv1alpha1.DynamoComponent } //nolint:nakedret func (r *DynamoComponentReconciler) getImageInfo(opt GetImageInfoOption) ImageInfo { dynamoComponentRepositoryName, _, dynamoComponentVersion := xstrings.Partition(opt.DynamoComponent.Spec.DynamoComponent, ":") dockerRegistry := r.getDockerRegistry(opt.DynamoComponent) imageInfo := ImageInfo{ DockerRegistry: *dockerRegistry, ImageName: getDynamoComponentImageName(opt.DynamoComponent, *dockerRegistry, dynamoComponentRepositoryName, dynamoComponentVersion), DockerConfigJSONSecretName: dockerRegistry.SecretName, DockerRegistryInsecure: opt.DynamoComponent.Annotations[commonconsts.KubeAnnotationDynamoDockerRegistryInsecure] == "true", } return imageInfo } func (r *DynamoComponentReconciler) getImageBuilderJobName() string { guid := xid.New() return fmt.Sprintf("dynamo-image-builder-%s", guid.String()) } func (r *DynamoComponentReconciler) getImageBuilderJobLabels(DynamoComponent *nvidiacomv1alpha1.DynamoComponent) map[string]string { return map[string]string{ commonconsts.KubeLabelDynamoComponent: DynamoComponent.Name, commonconsts.KubeLabelIsDynamoImageBuilder: "true", } } func (r *DynamoComponentReconciler) getImageBuilderPodLabels(DynamoComponent *nvidiacomv1alpha1.DynamoComponent) map[string]string { return map[string]string{ commonconsts.KubeLabelDynamoComponent: DynamoComponent.Name, commonconsts.KubeLabelIsDynamoImageBuilder: "true", } } func hash(text string) string { // nolint: gosec hasher := md5.New() hasher.Write([]byte(text)) return hex.EncodeToString(hasher.Sum(nil)) } type GenerateImageBuilderJobOption struct { ImageInfo ImageInfo DynamoComponent *nvidiacomv1alpha1.DynamoComponent } //nolint:nakedret func (r *DynamoComponentReconciler) generateImageBuilderJob(ctx context.Context, opt GenerateImageBuilderJobOption) (job *batchv1.Job, err error) { // nolint: gosimple podTemplateSpec, err := r.generateImageBuilderPodTemplateSpec(ctx, GenerateImageBuilderPodTemplateSpecOption(opt)) if err != nil { err = errors.Wrap(err, "generate image builder pod template spec") return } kubeAnnotations := make(map[string]string) hashStr, err := r.getHashStr(opt.DynamoComponent) if err != nil { err = errors.Wrap(err, "failed to get hash string") return } kubeAnnotations[consts.KubeAnnotationDynamoComponentHash] = hashStr job = &batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ Name: r.getImageBuilderJobName(), Namespace: opt.DynamoComponent.Namespace, Labels: r.getImageBuilderJobLabels(opt.DynamoComponent), Annotations: kubeAnnotations, }, Spec: batchv1.JobSpec{ TTLSecondsAfterFinished: ptr.To(int32(60 * 60 * 24)), Completions: ptr.To(int32(1)), Parallelism: ptr.To(int32(1)), PodFailurePolicy: &batchv1.PodFailurePolicy{ Rules: []batchv1.PodFailurePolicyRule{ { Action: batchv1.PodFailurePolicyActionFailJob, OnExitCodes: &batchv1.PodFailurePolicyOnExitCodesRequirement{ ContainerName: ptr.To(BuilderContainerName), Operator: batchv1.PodFailurePolicyOnExitCodesOpIn, Values: []int32{BuilderJobFailedExitCode}, }, }, }, }, Template: *podTemplateSpec, }, } err = ctrl.SetControllerReference(opt.DynamoComponent, job, r.Scheme) if err != nil { err = errors.Wrapf(err, "set controller reference for job %s", job.Name) return } return } func injectPodAffinity(podSpec *corev1.PodSpec, DynamoComponent *nvidiacomv1alpha1.DynamoComponent) { if podSpec.Affinity == nil { podSpec.Affinity = &corev1.Affinity{} } if podSpec.Affinity.PodAffinity == nil { podSpec.Affinity.PodAffinity = &corev1.PodAffinity{} } podSpec.Affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution = append(podSpec.Affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution, corev1.WeightedPodAffinityTerm{ Weight: 100, PodAffinityTerm: corev1.PodAffinityTerm{ LabelSelector: &metav1.LabelSelector{ MatchLabels: map[string]string{ commonconsts.KubeLabelDynamoComponent: DynamoComponent.Name, }, }, TopologyKey: corev1.LabelHostname, }, }) } const BuilderContainerName = "builder" const BuilderJobFailedExitCode = 42 const ModelSeederContainerName = "seeder" const ModelSeederJobFailedExitCode = 42 type GenerateImageBuilderPodTemplateSpecOption struct { ImageInfo ImageInfo DynamoComponent *nvidiacomv1alpha1.DynamoComponent } //nolint:gocyclo,nakedret func (r *DynamoComponentReconciler) generateImageBuilderPodTemplateSpec(ctx context.Context, opt GenerateImageBuilderPodTemplateSpecOption) (pod *corev1.PodTemplateSpec, err error) { dynamoComponentRepositoryName, _, dynamoComponentVersion := xstrings.Partition(opt.DynamoComponent.Spec.DynamoComponent, ":") kubeLabels := r.getImageBuilderPodLabels(opt.DynamoComponent) imageName := opt.ImageInfo.ImageName dockerConfigJSONSecretName := opt.ImageInfo.DockerConfigJSONSecretName dockerRegistryInsecure := opt.ImageInfo.DockerRegistryInsecure volumes := []corev1.Volume{ { Name: "dynamo", VolumeSource: corev1.VolumeSource{ EmptyDir: &corev1.EmptyDirVolumeSource{}, }, }, { Name: "workspace", VolumeSource: corev1.VolumeSource{ EmptyDir: &corev1.EmptyDirVolumeSource{}, }, }, } volumeMounts := []corev1.VolumeMount{ { Name: "dynamo", MountPath: "/dynamo", }, { Name: "workspace", MountPath: "/workspace", }, { Name: consts.DockerConfigVolumeName, MountPath: consts.DockerConfigVolumeMountPath, }, } if dockerConfigJSONSecretName != "" { volumes = append(volumes, corev1.Volume{ Name: consts.DockerConfigVolumeName, VolumeSource: corev1.VolumeSource{ Secret: &corev1.SecretVolumeSource{ SecretName: dockerConfigJSONSecretName, Items: []corev1.KeyToPath{ { Key: ".dockerconfigjson", Path: "config.json", }, }, }, }, }) } else { volumes = append(volumes, corev1.Volume{ Name: consts.DockerConfigVolumeName, VolumeSource: corev1.VolumeSource{ EmptyDir: &corev1.EmptyDirVolumeSource{}, }, }) } var dynamoComponent *schemas.DynamoComponent dynamoComponentDownloadURL := opt.DynamoComponent.Spec.DownloadURL if dynamoComponentDownloadURL == "" { var apiStoreClient *apiStoreClient.ApiStoreClient var apiStoreConf *commonconfig.ApiStoreConfig apiStoreClient, apiStoreConf, err = r.getApiStoreClient(ctx) if err != nil { err = errors.Wrap(err, "get api store client") return } if apiStoreClient == nil || apiStoreConf == nil { err = errors.New("can't get api store client, please check api store configuration") return } r.Recorder.Eventf(opt.DynamoComponent, corev1.EventTypeNormal, "GenerateImageBuilderPod", "Getting dynamoComponent %s from api store service", opt.DynamoComponent.Spec.DynamoComponent) dynamoComponent, err = apiStoreClient.GetDynamoComponent(ctx, dynamoComponentRepositoryName, dynamoComponentVersion) if err != nil { err = errors.Wrap(err, "get dynamoComponent") return } r.Recorder.Eventf(opt.DynamoComponent, corev1.EventTypeNormal, "GenerateImageBuilderPod", "Got dynamoComponent %s from api store service", opt.DynamoComponent.Spec.DynamoComponent) if dynamoComponent.TransmissionStrategy != nil && *dynamoComponent.TransmissionStrategy == schemas.TransmissionStrategyPresignedURL { var dynamoComponent_ *schemas.DynamoComponent r.Recorder.Eventf(opt.DynamoComponent, corev1.EventTypeNormal, "GenerateImageBuilderPod", "Getting presigned url for dynamoComponent %s from api store service", opt.DynamoComponent.Spec.DynamoComponent) dynamoComponent_, err = apiStoreClient.PresignDynamoComponentDownloadURL(ctx, dynamoComponentRepositoryName, dynamoComponentVersion) if err != nil { err = errors.Wrap(err, "presign dynamoComponent download url") return } r.Recorder.Eventf(opt.DynamoComponent, corev1.EventTypeNormal, "GenerateImageBuilderPod", "Got presigned url for dynamoComponent %s from api store service", opt.DynamoComponent.Spec.DynamoComponent) dynamoComponentDownloadURL = dynamoComponent_.PresignedDownloadUrl } else { dynamoComponentDownloadURL = fmt.Sprintf("%s/api/v1/dynamo_components/%s/versions/%s/download", apiStoreConf.Endpoint, dynamoComponentRepositoryName, dynamoComponentVersion) } } internalImages := commonconfig.GetInternalImages() logrus.Infof("Image builder is using the images %v", *internalImages) buildEngine := getDynamoComponentImageBuildEngine() dynamoComponentDownloadCommandTemplate, err := template.New("downloadCommand").Parse(` set -e mkdir -p /workspace/buildcontext url="{{.DynamoComponentDownloadURL}}" echo "Downloading dynamoComponent {{.DynamoComponentRepositoryName}}:{{.DynamoComponentVersion}} to /tmp/downloaded.tar..." if [[ ${url} == s3://* ]]; then echo "Downloading from s3..." aws s3 cp ${url} /tmp/downloaded.tar elif [[ ${url} == gs://* ]]; then echo "Downloading from GCS..." gsutil cp ${url} /tmp/downloaded.tar else curl --fail -L ${url} --output /tmp/downloaded.tar --progress-bar fi cd /workspace/buildcontext echo "Extracting dynamoComponent tar file..." tar -xvf /tmp/downloaded.tar echo "Removing dynamoComponent tar file..." rm /tmp/downloaded.tar echo "Done" `) if err != nil { err = errors.Wrap(err, "failed to parse download command template") return } var dynamoComponentDownloadCommandBuffer bytes.Buffer err = dynamoComponentDownloadCommandTemplate.Execute(&dynamoComponentDownloadCommandBuffer, map[string]interface{}{ "DynamoComponentDownloadURL": dynamoComponentDownloadURL, "DynamoComponentRepositoryName": dynamoComponentRepositoryName, "DynamoComponentVersion": dynamoComponentVersion, }) if err != nil { err = errors.Wrap(err, "failed to execute download command template") return } dynamoComponentDownloadCommand := dynamoComponentDownloadCommandBuffer.String() downloaderContainerResources := corev1.ResourceRequirements{ Limits: corev1.ResourceList{ corev1.ResourceCPU: resource.MustParse("1000m"), corev1.ResourceMemory: resource.MustParse("3000Mi"), }, Requests: corev1.ResourceList{ corev1.ResourceCPU: resource.MustParse("100m"), corev1.ResourceMemory: resource.MustParse("1000Mi"), }, } downloaderContainerEnvFrom := opt.DynamoComponent.Spec.DownloaderContainerEnvFrom initContainers := []corev1.Container{ { Name: "dynamocomponent-downloader", Image: internalImages.DynamoComponentsDownloader, Command: []string{ "sh", "-c", dynamoComponentDownloadCommand, }, VolumeMounts: volumeMounts, Resources: downloaderContainerResources, EnvFrom: downloaderContainerEnvFrom, Env: []corev1.EnvVar{ { Name: "AWS_EC2_METADATA_DISABLED", Value: "true", }, }, }, } if dockerConfigJSONSecretName == "" { // if no explicit docker config is provided, we need to provide the docker config to the image builder var ref name.Reference ref, err = name.ParseReference(imageName) if err != nil { err = errors.Wrap(err, "failed to parse reference") return } dockerRegistry := ref.Context().RegistryStr() if isGoogleRegistry(dockerRegistry) { // for GCP, we use the google cloud sdk to get the docker config. initContainers = append(initContainers, corev1.Container{ Name: "gcp-init-docker-config", Image: "google/cloud-sdk:slim", Command: []string{ "/bin/bash", "-c", fmt.Sprintf(`set -e gcloud --quiet config get-value account TOKEN=$(gcloud --quiet auth print-access-token) cat > %s/config.json < 0 { builderContainerEnvs = append(builderContainerEnvs, corev1.EnvVar{ Name: "BUILDKITD_FLAGS", Value: strings.Join(buildkitdFlags, " "), }) } buildkitURL := os.Getenv("BUILDKIT_URL") if buildkitURL == "" { err = errors.New("BUILDKIT_URL is not set") return } command = []string{ "buildctl", } args = []string{ "--addr", buildkitURL, "build", "--frontend", "dockerfile.v0", "--local", "context=/workspace/buildcontext", "--local", fmt.Sprintf("dockerfile=%s", filepath.Dir(dockerFilePath)), "--output", output, } cacheRepo := os.Getenv("BUILDKIT_CACHE_REPO") if cacheRepo != "" { args = append(args, "--export-cache", fmt.Sprintf("type=registry,ref=%s:buildcache,mode=max,compression=zstd,ignore-error=true", cacheRepo)) args = append(args, "--import-cache", fmt.Sprintf("type=registry,ref=%s:buildcache", cacheRepo)) } } // add build args to pass via --build-arg for _, buildArg := range buildArgs { quotedBuildArg := unix.SingleQuote.Quote(buildArg) if isBuildkit { args = append(args, "--opt", fmt.Sprintf("build-arg:%s", quotedBuildArg)) } else { args = append(args, fmt.Sprintf("--build-arg=%s", quotedBuildArg)) } } // add other arguments to builder args = append(args, builderArgs...) logrus.Info("dynamo-image-builder args: ", args) builderContainerArgs := []string{ "-c", fmt.Sprintf("sleep 15; %s && exit 0 || exit %d", shquot.POSIXShell(append(command, args...)), BuilderJobFailedExitCode), // TODO: remove once functionality exists to wait for istio sidecar. } container := corev1.Container{ Name: BuilderContainerName, Image: builderImage, ImagePullPolicy: corev1.PullAlways, Command: []string{"sh"}, Args: builderContainerArgs, VolumeMounts: volumeMounts, Env: builderContainerEnvs, EnvFrom: builderContainerEnvFrom, TTY: true, Stdin: true, } if buildEngine == DynamoComponentImageBuildEngineKaniko { // we need to run as root when using kaniko container.SecurityContext = &corev1.SecurityContext{ RunAsUser: ptr.To(int64(0)), } } if globalDefaultImageBuilderContainerResources != nil { container.Resources = *globalDefaultImageBuilderContainerResources } if opt.DynamoComponent.Spec.ImageBuilderContainerResources != nil { container.Resources = *opt.DynamoComponent.Spec.ImageBuilderContainerResources } containers = append(containers, container) pod = &corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: kubeLabels, Annotations: kubeAnnotations, }, Spec: corev1.PodSpec{ RestartPolicy: corev1.RestartPolicyNever, Volumes: volumes, InitContainers: initContainers, Containers: containers, SecurityContext: &corev1.PodSecurityContext{ RunAsUser: ptr.To(int64(1000)), RunAsGroup: ptr.To(int64(1000)), FSGroup: ptr.To(int64(1000)), }, }, } if globalExtraPodMetadata != nil { for k, v := range globalExtraPodMetadata.Annotations { pod.Annotations[k] = v } for k, v := range globalExtraPodMetadata.Labels { pod.Labels[k] = v } } if opt.DynamoComponent.Spec.ImageBuilderExtraPodMetadata != nil { for k, v := range opt.DynamoComponent.Spec.ImageBuilderExtraPodMetadata.Annotations { pod.Annotations[k] = v } for k, v := range opt.DynamoComponent.Spec.ImageBuilderExtraPodMetadata.Labels { pod.Labels[k] = v } } if globalExtraPodSpec != nil { pod.Spec.PriorityClassName = globalExtraPodSpec.PriorityClassName pod.Spec.SchedulerName = globalExtraPodSpec.SchedulerName pod.Spec.NodeSelector = globalExtraPodSpec.NodeSelector pod.Spec.Affinity = globalExtraPodSpec.Affinity pod.Spec.Tolerations = globalExtraPodSpec.Tolerations pod.Spec.TopologySpreadConstraints = globalExtraPodSpec.TopologySpreadConstraints pod.Spec.ServiceAccountName = globalExtraPodSpec.ServiceAccountName } if opt.DynamoComponent.Spec.ImageBuilderExtraPodSpec != nil { if opt.DynamoComponent.Spec.ImageBuilderExtraPodSpec.PriorityClassName != "" { pod.Spec.PriorityClassName = opt.DynamoComponent.Spec.ImageBuilderExtraPodSpec.PriorityClassName } if opt.DynamoComponent.Spec.ImageBuilderExtraPodSpec.SchedulerName != "" { pod.Spec.SchedulerName = opt.DynamoComponent.Spec.ImageBuilderExtraPodSpec.SchedulerName } if opt.DynamoComponent.Spec.ImageBuilderExtraPodSpec.NodeSelector != nil { pod.Spec.NodeSelector = opt.DynamoComponent.Spec.ImageBuilderExtraPodSpec.NodeSelector } if opt.DynamoComponent.Spec.ImageBuilderExtraPodSpec.Affinity != nil { pod.Spec.Affinity = opt.DynamoComponent.Spec.ImageBuilderExtraPodSpec.Affinity } if opt.DynamoComponent.Spec.ImageBuilderExtraPodSpec.Tolerations != nil { pod.Spec.Tolerations = opt.DynamoComponent.Spec.ImageBuilderExtraPodSpec.Tolerations } if opt.DynamoComponent.Spec.ImageBuilderExtraPodSpec.TopologySpreadConstraints != nil { pod.Spec.TopologySpreadConstraints = opt.DynamoComponent.Spec.ImageBuilderExtraPodSpec.TopologySpreadConstraints } if opt.DynamoComponent.Spec.ImageBuilderExtraPodSpec.ServiceAccountName != "" { pod.Spec.ServiceAccountName = opt.DynamoComponent.Spec.ImageBuilderExtraPodSpec.ServiceAccountName } } injectPodAffinity(&pod.Spec, opt.DynamoComponent) if pod.Spec.ServiceAccountName == "" { serviceAccounts := &corev1.ServiceAccountList{} err = r.List(ctx, serviceAccounts, client.InNamespace(opt.DynamoComponent.Namespace), client.MatchingLabels{ commonconsts.KubeLabelDynamoImageBuilderPod: commonconsts.KubeLabelValueTrue, }) if err != nil { err = errors.Wrapf(err, "failed to list service accounts in namespace %s", opt.DynamoComponent.Namespace) return } if len(serviceAccounts.Items) > 0 { pod.Spec.ServiceAccountName = serviceAccounts.Items[0].Name } else { pod.Spec.ServiceAccountName = "default" } } for i, c := range pod.Spec.InitContainers { env := c.Env if globalExtraContainerEnv != nil { env = append(env, globalExtraContainerEnv...) } env = append(env, opt.DynamoComponent.Spec.ImageBuilderExtraContainerEnv...) pod.Spec.InitContainers[i].Env = env } for i, c := range pod.Spec.Containers { env := c.Env if globalExtraContainerEnv != nil { env = append(env, globalExtraContainerEnv...) } env = append(env, opt.DynamoComponent.Spec.ImageBuilderExtraContainerEnv...) pod.Spec.Containers[i].Env = env } return } func (r *DynamoComponentReconciler) getHashStr(DynamoComponent *nvidiacomv1alpha1.DynamoComponent) (string, error) { var hash uint64 hash, err := hashstructure.Hash(struct { Spec nvidiacomv1alpha1.DynamoComponentSpec Labels map[string]string Annotations map[string]string }{ Spec: DynamoComponent.Spec, Labels: DynamoComponent.Labels, Annotations: DynamoComponent.Annotations, }, hashstructure.FormatV2, nil) if err != nil { err = errors.Wrap(err, "get DynamoComponent CR spec hash") return "", err } hashStr := strconv.FormatUint(hash, 10) return hashStr, nil } const ( trueStr = "true" ) // SetupWithManager sets up the controller with the Manager. func (r *DynamoComponentReconciler) SetupWithManager(mgr ctrl.Manager) error { err := ctrl.NewControllerManagedBy(mgr). For(&nvidiacomv1alpha1.DynamoComponent{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})). Owns(&nvidiacomv1alpha1.DynamoComponent{}). Owns(&batchv1.Job{}). WithEventFilter(controller_common.EphemeralDeploymentEventFilter(r.Config)). Complete(r) return errors.Wrap(err, "failed to setup DynamoComponent controller") }