Commit 99cc11e6 authored by julienmancuso's avatar julienmancuso Committed by GitHub
Browse files

feat: cleanup operator code (#529)

parent 53d30882
...@@ -94,7 +94,9 @@ dynamo-api-server: ...@@ -94,7 +94,9 @@ dynamo-api-server:
resource_scope: "user" resource_scope: "user"
image: image:
repository: gitlab-master.nvidia.com:5005/aire/microservices/compoundai/dynamo-api-server repository: gitlab-master.nvidia.com:5005/aire/microservices/compoundai/dynamo-api-server
tag: ${CI_COMMIT_SHA} # temporarily force to use old commit for api-server
tag: fccbb8777fbd2ac11dad4871c8a8ba6884525e07
#tag: ${CI_COMMIT_SHA}
pullPolicy: IfNotPresent pullPolicy: IfNotPresent
storeImage: storeImage:
repository: gitlab-master.nvidia.com:5005/aire/microservices/compoundai/dynamo-api-store repository: gitlab-master.nvidia.com:5005/aire/microservices/compoundai/dynamo-api-store
......
modelschemas and schemasv1 are from https://github.com/bentoml/yatai-schemas
common, yatai-client and conversion are from yatai-deployment operator
\ No newline at end of file
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package schemas
import (
"encoding/json"
"errors"
"time"
)
type DynamoNIM struct {
PresignedDownloadUrl string `json:"presigned_download_url"`
TransmissionStrategy *TransmissionStrategy `json:"transmission_strategy"`
Manifest *DynamoNIMManifest `json:"manifest"`
}
type TransmissionStrategy string
const (
TransmissionStrategyPresignedURL TransmissionStrategy = "presigned_url"
TransmissionStrategyProxy TransmissionStrategy = "proxy"
)
type DynamoNIMManifest struct {
BentomlVersion string `json:"bentoml_version"`
Models []string `json:"models"`
}
type Duration time.Duration
func (d Duration) MarshalJSON() ([]byte, error) {
return json.Marshal(time.Duration(d).String())
}
func (d *Duration) UnmarshalJSON(b []byte) error {
var v any
if err := json.Unmarshal(b, &v); err != nil {
return err
}
switch value := v.(type) {
case float64:
*d = Duration(time.Duration(value))
case string:
tmp, err := time.ParseDuration(value)
if err != nil {
return err
}
*d = Duration(tmp)
default:
return errors.New("invalid duration")
}
return nil
}
type DeploymentStrategy string
const (
DeploymentStrategyRollingUpdate DeploymentStrategy = "RollingUpdate"
DeploymentStrategyRecreate DeploymentStrategy = "Recreate"
DeploymentStrategyRampedSlowRollout DeploymentStrategy = "RampedSlowRollout"
DeploymentStrategyBestEffortControlledRollout DeploymentStrategy = "BestEffortControlledRollout"
)
type DockerRegistrySchema struct {
BentosRepositoryURI string `json:"bentosRepositoryURI"`
ModelsRepositoryURI string `json:"modelsRepositoryURI"`
BentosRepositoryURIInCluster string `json:"bentosRepositoryURIInCluster"`
ModelsRepositoryURIInCluster string `json:"modelsRepositoryURIInCluster"`
Server string `json:"server"`
Username string `json:"username"`
Password string `json:"password"`
Secure bool `json:"secure"`
}
/* /*
* SPDX-FileCopyrightText: Copyright (c) 2022 Atalaya Tech. Inc
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0 * SPDX-License-Identifier: Apache-2.0
* *
...@@ -13,6 +14,7 @@ ...@@ -13,6 +14,7 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
* Modifications Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES
*/ */
package yataiclient package yataiclient
...@@ -20,13 +22,15 @@ package yataiclient ...@@ -20,13 +22,15 @@ package yataiclient
import ( import (
"context" "context"
"fmt" "fmt"
"strings"
"github.com/ai-dynamo/dynamo/deploy/dynamo/operator/api/dynamo/modelschemas" "github.com/ai-dynamo/dynamo/deploy/dynamo/operator/api/dynamo/schemas"
"github.com/ai-dynamo/dynamo/deploy/dynamo/operator/api/dynamo/schemasv1" )
"github.com/ai-dynamo/dynamo/deploy/dynamo/operator/pkg/dynamo/consts" const (
"github.com/ai-dynamo/dynamo/deploy/dynamo/operator/pkg/dynamo/reqcli" YataiApiTokenHeaderName = "X-YATAI-API-TOKEN"
"github.com/ai-dynamo/dynamo/deploy/dynamo/operator/pkg/dynamo/utils" NgcOrganizationHeaderName = "Nv-Ngc-Org"
NgcUserHeaderName = "Nv-Actor-Id"
) )
type DynamoAuthHeaders struct { type DynamoAuthHeaders struct {
...@@ -51,147 +55,28 @@ func (c *YataiClient) SetAuth(headers DynamoAuthHeaders) { ...@@ -51,147 +55,28 @@ func (c *YataiClient) SetAuth(headers DynamoAuthHeaders) {
c.headers = headers c.headers = headers
} }
func (c *YataiClient) getJSONReqBuilder() *reqcli.JsonRequestBuilder { func (c *YataiClient) getHeaders() map[string]string {
return reqcli.NewJsonRequestBuilder().Headers(map[string]string{ return map[string]string{
consts.YataiApiTokenHeaderName: c.apiToken, YataiApiTokenHeaderName: c.apiToken,
consts.NgcOrganizationHeaderName: c.headers.OrgId, NgcOrganizationHeaderName: c.headers.OrgId,
consts.NgcUserHeaderName: c.headers.UserId, NgcUserHeaderName: c.headers.UserId,
}) }
}
func (c *YataiClient) ListBentos(ctx context.Context, req schemasv1.ListQuerySchema) (bentos *schemasv1.BentoWithRepositoryListSchema, err error) {
url_ := utils.UrlJoin(c.endpoint, "/api/v1/bentos", map[string]string{
"start": fmt.Sprintf("%d", req.Start),
"count": fmt.Sprintf("%d", req.Count),
"q": string(req.Q),
})
bentos = &schemasv1.BentoWithRepositoryListSchema{}
_, err = c.getJSONReqBuilder().Method("GET").Url(url_).Result(bentos).Do(ctx)
return
}
func (c *YataiClient) ListImageBuildStatusUnsyncedBentos(ctx context.Context) (bentos []*schemasv1.BentoWithRepositorySchema, err error) {
url_ := utils.UrlJoin(c.endpoint, "/api/v1/image_build_status_unsynced_bentos")
bentos = []*schemasv1.BentoWithRepositorySchema{}
_, err = c.getJSONReqBuilder().Method("GET").Url(url_).Result(&bentos).Do(ctx)
return
}
func (c *YataiClient) UpdateBentoImageBuildStatusSyncingAt(ctx context.Context, bentoRepositoryName, bentoVersion string) (err error) {
url_ := utils.UrlJoin(c.endpoint, fmt.Sprintf("/api/v1/dynamo_nims/%s/versions/%s/update_image_build_status_syncing_at", bentoRepositoryName, bentoVersion))
_, err = c.getJSONReqBuilder().Method("PATCH").Url(url_).Do(ctx)
return
}
func (c *YataiClient) UpdateBentoImageBuildStatus(ctx context.Context, bentoRepositoryName, bentoVersion string, status modelschemas.ImageBuildStatus) (err error) {
url_ := utils.UrlJoin(c.endpoint, fmt.Sprintf("/api/v1/dynamo_nims/%s/versions/%s/update_image_build_status", bentoRepositoryName, bentoVersion))
_, err = c.getJSONReqBuilder().Method("PATCH").Payload(map[string]string{
"image_build_status": string(status),
}).Url(url_).Do(ctx)
return
}
func (c *YataiClient) GetBento(ctx context.Context, bentoRepositoryName, bentoVersion string) (bento *schemasv1.BentoFullSchema, err error) {
url_ := utils.UrlJoin(c.endpoint, fmt.Sprintf("/api/v1/bento_repositories/%s/bentos/%s", bentoRepositoryName, bentoVersion))
bento = &schemasv1.BentoFullSchema{}
_, err = c.getJSONReqBuilder().Method("GET").Url(url_).Result(bento).Do(ctx)
return
}
func (c *YataiClient) GetModel(ctx context.Context, modelRepositoryName, modelVersion string) (model *schemasv1.ModelFullSchema, err error) {
url_ := utils.UrlJoin(c.endpoint, fmt.Sprintf("/api/v1/model_repositories/%s/models/%s", modelRepositoryName, modelVersion))
model = &schemasv1.ModelFullSchema{}
_, err = c.getJSONReqBuilder().Method("GET").Url(url_).Result(model).Do(ctx)
return
}
func (c *YataiClient) GetBentoRepository(ctx context.Context, bentoRepositoryName string) (bentoRepository *schemasv1.BentoRepositorySchema, err error) {
url_ := utils.UrlJoin(c.endpoint, fmt.Sprintf("/api/v1/bento_repositories/%s", bentoRepositoryName))
bentoRepository = &schemasv1.BentoRepositorySchema{}
_, err = c.getJSONReqBuilder().Method("GET").Url(url_).Result(bentoRepository).Do(ctx)
return
}
func (c *YataiClient) GetDeployment(ctx context.Context, clusterName, namespace, deploymentName string) (deployment *schemasv1.DeploymentSchema, err error) {
url_ := utils.UrlJoin(c.endpoint, fmt.Sprintf("/api/v1/clusters/%s/namespaces/%s/deployments/%s", clusterName, namespace, deploymentName))
deployment = &schemasv1.DeploymentSchema{}
_, err = c.getJSONReqBuilder().Method("GET").Url(url_).Result(deployment).Do(ctx)
return
}
func (c *YataiClient) SyncDeploymentStatus(ctx context.Context, clusterName, namespace, deploymentName string) (deployment *schemasv1.DeploymentSchema, err error) {
url_ := utils.UrlJoin(c.endpoint, fmt.Sprintf("/api/v1/clusters/%s/namespaces/%s/deployments/%s/sync_status", clusterName, namespace, deploymentName))
deployment = &schemasv1.DeploymentSchema{}
_, err = c.getJSONReqBuilder().Method("POST").Url(url_).Result(deployment).Do(ctx)
return
}
func (c *YataiClient) CreateDeployment(ctx context.Context, clusterName string, schema *schemasv1.CreateDeploymentSchema) (deployment *schemasv1.DeploymentSchema, err error) {
url_ := utils.UrlJoin(c.endpoint, fmt.Sprintf("/api/v1/clusters/%s/deployments", clusterName))
deployment = &schemasv1.DeploymentSchema{}
_, err = c.getJSONReqBuilder().Method("POST").Url(url_).Payload(schema).Result(deployment).Do(ctx)
return
}
func (c *YataiClient) UpdateDeployment(ctx context.Context, clusterName, namespace, deploymentName string, schema *schemasv1.UpdateDeploymentSchema) (deployment *schemasv1.DeploymentSchema, err error) {
url_ := utils.UrlJoin(c.endpoint, fmt.Sprintf("/api/v1/clusters/%s/namespaces/%s/deployments/%s", clusterName, namespace, deploymentName))
deployment = &schemasv1.DeploymentSchema{}
_, err = c.getJSONReqBuilder().Method("PATCH").Url(url_).Payload(schema).Result(deployment).Do(ctx)
return
}
func (c *YataiClient) GetDockerRegistryRef(ctx context.Context, clusterName string) (registryRef *modelschemas.DockerRegistryRefSchema, err error) {
url_ := utils.UrlJoin(c.endpoint, fmt.Sprintf("/api/v1/clusters/%s/docker_registry_ref", clusterName))
registryRef = &modelschemas.DockerRegistryRefSchema{}
_, err = c.getJSONReqBuilder().Method("GET").Url(url_).Result(registryRef).Do(ctx)
return
}
func (c *YataiClient) GetMajorCluster(ctx context.Context) (cluster *schemasv1.ClusterFullSchema, err error) {
url_ := utils.UrlJoin(c.endpoint, "/api/v1/current_org/major_cluster")
cluster = &schemasv1.ClusterFullSchema{}
_, err = c.getJSONReqBuilder().Method("GET").Url(url_).Result(cluster).Do(ctx)
return
}
func (c *YataiClient) GetVersion(ctx context.Context) (version *schemasv1.VersionSchema, err error) {
url_ := utils.UrlJoin(c.endpoint, "/api/v1/version")
version = &schemasv1.VersionSchema{}
_, err = c.getJSONReqBuilder().Method("GET").Url(url_).Result(version).Do(ctx)
return
}
func (c *YataiClient) GetOrganization(ctx context.Context) (organization *schemasv1.OrganizationFullSchema, err error) {
url_ := utils.UrlJoin(c.endpoint, "/api/v1/current_org")
organization = &schemasv1.OrganizationFullSchema{}
_, err = c.getJSONReqBuilder().Method("GET").Url(url_).Result(organization).Do(ctx)
return
}
func (c *YataiClient) GetCluster(ctx context.Context, clusterName string) (cluster *schemasv1.ClusterFullSchema, err error) {
url_ := utils.UrlJoin(c.endpoint, fmt.Sprintf("/api/v1/clusters/%s", clusterName))
cluster = &schemasv1.ClusterFullSchema{}
_, err = c.getJSONReqBuilder().Method("GET").Url(url_).Result(cluster).Do(ctx)
return
} }
func (c *YataiClient) RegisterYataiComponent(ctx context.Context, clusterName string, schema *schemasv1.RegisterYataiComponentSchema) (yataiComponent *schemasv1.YataiComponentSchema, err error) { func (c *YataiClient) GetBento(ctx context.Context, bentoRepositoryName, bentoVersion string) (bento *schemas.DynamoNIM, err error) {
url_ := utils.UrlJoin(c.endpoint, fmt.Sprintf("/api/v1/clusters/%s/yatai_components", clusterName)) url_ := urlJoin(c.endpoint, fmt.Sprintf("/api/v1/bento_repositories/%s/bentos/%s", bentoRepositoryName, bentoVersion))
yataiComponent = &schemasv1.YataiComponentSchema{} bento = &schemas.DynamoNIM{}
_, err = c.getJSONReqBuilder().Method("POST").Url(url_).Payload(schema).Result(yataiComponent).Do(ctx) _, err = DoJsonRequest(ctx, "GET", url_, c.getHeaders(), nil, nil, bento, nil)
return return
} }
func (c *YataiClient) PresignBentoDownloadURL(ctx context.Context, bentoRepositoryName, bentoVersion string) (bento *schemasv1.BentoSchema, err error) { func (c *YataiClient) PresignBentoDownloadURL(ctx context.Context, bentoRepositoryName, bentoVersion string) (bento *schemas.DynamoNIM, err error) {
url_ := utils.UrlJoin(c.endpoint, fmt.Sprintf("/api/v1/dynamo_nims/%s/versions/%s/presign_download_url", bentoRepositoryName, bentoVersion)) url_ := urlJoin(c.endpoint, fmt.Sprintf("/api/v1/dynamo_nims/%s/versions/%s/presign_download_url", bentoRepositoryName, bentoVersion))
bento = &schemasv1.BentoSchema{} bento = &schemas.DynamoNIM{}
_, err = c.getJSONReqBuilder().Method("PATCH").Url(url_).Result(bento).Do(ctx) _, err = DoJsonRequest(ctx, "PATCH", url_, c.getHeaders(), nil, nil, bento, nil)
return return
} }
func (c *YataiClient) PresignModelDownloadURL(ctx context.Context, modelRepositoryName, modelVersion string) (model *schemasv1.ModelSchema, err error) { func urlJoin(baseURL string, pathPart string) string {
url_ := utils.UrlJoin(c.endpoint, fmt.Sprintf("/api/v1/model_repositories/%s/models/%s/presign_download_url", modelRepositoryName, modelVersion)) return strings.TrimRight(baseURL, "/") + "/" + strings.TrimLeft(pathPart, "/")
model = &schemasv1.ModelSchema{}
_, err = c.getJSONReqBuilder().Method("PATCH").Url(url_).Result(model).Do(ctx)
return
} }
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package yataiclient
import (
"context"
"crypto/tls"
"fmt"
"time"
"resty.dev/v3"
)
var defaultClient *resty.Client
func GetDefaultClient() *resty.Client {
if defaultClient == nil {
defaultClient = resty.New().
SetTimeout(90*time.Second).
SetRetryCount(3).
SetRetryWaitTime(2*time.Second).
SetRetryMaxWaitTime(10*time.Second).
SetHeader("Content-Type", "application/json").
SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true}) // Optional: mirrors your custom transport
}
return defaultClient
}
func DoJsonRequest(ctx context.Context, method string, url string, headers map[string]string, query map[string]string, payload interface{}, result interface{}, timeout *time.Duration) (int, error) {
client := GetDefaultClient()
if timeout != nil {
client.SetTimeout(*timeout)
}
req := client.R().
SetContext(ctx).
SetBody(payload).
SetResult(result).
SetHeaders(headers).
SetQueryParams(query)
var resp *resty.Response
var err error
switch method {
case "GET":
resp, err = req.Get(url)
case "POST":
resp, err = req.Post(url)
case "PUT":
resp, err = req.Put(url)
case "DELETE":
resp, err = req.Delete(url)
case "PATCH":
resp, err = req.Patch(url)
default:
return 0, fmt.Errorf("unsupported method: %s", method)
}
if err != nil {
return 0, fmt.Errorf("request error: %w", err)
}
if resp.IsError() {
return resp.StatusCode(), fmt.Errorf("http %s %s failed with status %d: %s", method, url, resp.StatusCode(), resp.String())
}
return resp.StatusCode(), nil
}
...@@ -21,7 +21,7 @@ package v1alpha1 ...@@ -21,7 +21,7 @@ package v1alpha1
import ( import (
dynamoCommon "github.com/ai-dynamo/dynamo/deploy/dynamo/operator/api/dynamo/common" dynamoCommon "github.com/ai-dynamo/dynamo/deploy/dynamo/operator/api/dynamo/common"
"github.com/ai-dynamo/dynamo/deploy/dynamo/operator/api/dynamo/modelschemas" "github.com/ai-dynamo/dynamo/deploy/dynamo/operator/api/dynamo/schemas"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
) )
...@@ -53,7 +53,7 @@ type DynamoNimRequestSpec struct { ...@@ -53,7 +53,7 @@ type DynamoNimRequestSpec struct {
// +kubebuilder:validation:Optional // +kubebuilder:validation:Optional
Image string `json:"image,omitempty"` Image string `json:"image,omitempty"`
ImageBuildTimeout *modelschemas.Duration `json:"imageBuildTimeout,omitempty"` ImageBuildTimeout *schemas.Duration `json:"imageBuildTimeout,omitempty"`
// +kubebuilder:validation:Optional // +kubebuilder:validation:Optional
BuildArgs []string `json:"buildArgs,omitempty"` BuildArgs []string `json:"buildArgs,omitempty"`
......
...@@ -25,7 +25,7 @@ package v1alpha1 ...@@ -25,7 +25,7 @@ package v1alpha1
import ( import (
"github.com/ai-dynamo/dynamo/deploy/dynamo/operator/api/dynamo/common" "github.com/ai-dynamo/dynamo/deploy/dynamo/operator/api/dynamo/common"
"github.com/ai-dynamo/dynamo/deploy/dynamo/operator/api/dynamo/modelschemas" "github.com/ai-dynamo/dynamo/deploy/dynamo/operator/api/dynamo/schemas"
"k8s.io/api/autoscaling/v2" "k8s.io/api/autoscaling/v2"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1"
...@@ -570,7 +570,7 @@ func (in *DynamoNimRequestSpec) DeepCopyInto(out *DynamoNimRequestSpec) { ...@@ -570,7 +570,7 @@ func (in *DynamoNimRequestSpec) DeepCopyInto(out *DynamoNimRequestSpec) {
} }
if in.ImageBuildTimeout != nil { if in.ImageBuildTimeout != nil {
in, out := &in.ImageBuildTimeout, &out.ImageBuildTimeout in, out := &in.ImageBuildTimeout, &out.ImageBuildTimeout
*out = new(modelschemas.Duration) *out = new(schemas.Duration)
**out = **in **out = **in
} }
if in.BuildArgs != nil { if in.BuildArgs != nil {
......
...@@ -15,9 +15,7 @@ require ( ...@@ -15,9 +15,7 @@ require (
github.com/mitchellh/hashstructure/v2 v2.0.2 github.com/mitchellh/hashstructure/v2 v2.0.2
github.com/onsi/ginkgo/v2 v2.19.0 github.com/onsi/ginkgo/v2 v2.19.0
github.com/onsi/gomega v1.33.1 github.com/onsi/gomega v1.33.1
github.com/pkg/errors v0.9.1
github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring v0.71.2 github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring v0.71.2
github.com/prometheus/common v0.55.0
github.com/prune998/docker-registry-client v0.0.0-20200114164314-f8cd511a014c github.com/prune998/docker-registry-client v0.0.0-20200114164314-f8cd511a014c
github.com/rs/xid v1.4.0 github.com/rs/xid v1.4.0
github.com/sergeymakinen/go-quote v1.1.0 github.com/sergeymakinen/go-quote v1.1.0
...@@ -30,6 +28,7 @@ require ( ...@@ -30,6 +28,7 @@ require (
k8s.io/apimachinery v0.31.3 k8s.io/apimachinery v0.31.3
k8s.io/client-go v0.31.3 k8s.io/client-go v0.31.3
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 k8s.io/utils v0.0.0-20240711033017-18e509b52bc8
resty.dev/v3 v3.0.0-beta.2
sigs.k8s.io/controller-runtime v0.19.4 sigs.k8s.io/controller-runtime v0.19.4
volcano.sh/apis v1.11.0 volcano.sh/apis v1.11.0
) )
...@@ -70,8 +69,10 @@ require ( ...@@ -70,8 +69,10 @@ require (
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0 // indirect github.com/opencontainers/image-spec v1.1.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.19.1 // indirect github.com/prometheus/client_golang v1.19.1 // indirect
github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.55.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect github.com/prometheus/procfs v0.15.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect github.com/spf13/pflag v1.0.5 // indirect
github.com/x448/float16 v0.8.4 // indirect github.com/x448/float16 v0.8.4 // indirect
......
...@@ -234,6 +234,8 @@ k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 h1:BZqlfIlq5YbRMFko6/PM7F ...@@ -234,6 +234,8 @@ k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 h1:BZqlfIlq5YbRMFko6/PM7F
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340/go.mod h1:yD4MZYeKMBwQKVht279WycxKyM84kkAx2DPrTXaeb98= k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340/go.mod h1:yD4MZYeKMBwQKVht279WycxKyM84kkAx2DPrTXaeb98=
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 h1:pUdcCO1Lk/tbT5ztQWOBi5HBgbBP1J8+AsQnQCKsi8A= k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 h1:pUdcCO1Lk/tbT5ztQWOBi5HBgbBP1J8+AsQnQCKsi8A=
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= k8s.io/utils v0.0.0-20240711033017-18e509b52bc8/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
resty.dev/v3 v3.0.0-beta.2 h1:xu4mGAdbCLuc3kbk7eddWfWm4JfhwDtdapwss5nCjnQ=
resty.dev/v3 v3.0.0-beta.2/go.mod h1:OgkqiPvTDtOuV4MGZuUDhwOpkY8enjOsjjMzeOHefy4=
sigs.k8s.io/controller-runtime v0.19.4 h1:SUmheabttt0nx8uJtoII4oIP27BVVvAKFvdvGFwV/Qo= sigs.k8s.io/controller-runtime v0.19.4 h1:SUmheabttt0nx8uJtoII4oIP27BVVvAKFvdvGFwV/Qo=
sigs.k8s.io/controller-runtime v0.19.4/go.mod h1:iRmWllt8IlaLjvTTDLhRBXIEtkCK6hwVBJJsYS9Ajf4= sigs.k8s.io/controller-runtime v0.19.4/go.mod h1:iRmWllt8IlaLjvTTDLhRBXIEtkCK6hwVBJJsYS9Ajf4=
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo=
......
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package config
import (
"context"
"os"
"github.com/ai-dynamo/dynamo/deploy/dynamo/operator/internal/consts"
)
func GetYataiImageBuilderNamespace(ctx context.Context) (namespace string, err error) {
return os.Getenv(consts.EnvYataiImageBuilderNamespace), nil
}
type DockerRegistryConfig struct {
BentoRepositoryName string `yaml:"bento_repository_name"`
ModelRepositoryName string `yaml:"model_repository_name"`
Server string `yaml:"server"`
InClusterServer string `yaml:"in_cluster_server"`
Username string `yaml:"username"`
Password string `yaml:"password"`
Secure bool `yaml:"secure"`
}
func GetDockerRegistryConfig() (conf *DockerRegistryConfig, err error) {
return &DockerRegistryConfig{
BentoRepositoryName: os.Getenv(consts.EnvDockerRegistryBentoRepositoryName),
ModelRepositoryName: os.Getenv(consts.EnvDockerRegistryModelRepositoryName),
Server: os.Getenv(consts.EnvDockerRegistryServer),
InClusterServer: os.Getenv(consts.EnvDockerRegistryInClusterServer),
Username: os.Getenv(consts.EnvDockerRegistryUsername),
Password: os.Getenv(consts.EnvDockerRegistryPassword),
Secure: os.Getenv(consts.EnvDockerRegistrySecure) == "true",
}, nil
}
type YataiConfig struct {
Endpoint string `yaml:"endpoint"`
ClusterName string `yaml:"cluster_name"`
ApiToken string `yaml:"api_token"`
}
func GetYataiConfig(ctx context.Context) (conf *YataiConfig, err error) {
return &YataiConfig{
Endpoint: os.Getenv(consts.EnvYataiEndpoint),
ClusterName: os.Getenv(consts.EnvYataiClusterName),
ApiToken: os.Getenv(consts.EnvYataiApiToken),
}, nil
}
func getEnv(key, fallback string) string {
if value, ok := os.LookupEnv(key); ok {
return value
}
return fallback
}
type InternalImages struct {
BentoDownloader string
Kaniko string
MetricsTransformer string
Buildkit string
BuildkitRootless string
}
func GetInternalImages() (conf *InternalImages) {
conf = &InternalImages{}
conf.BentoDownloader = getEnv(consts.EnvInternalImagesBentoDownloader, consts.InternalImagesBentoDownloaderDefault)
conf.Kaniko = getEnv(consts.EnvInternalImagesKaniko, consts.InternalImagesKanikoDefault)
conf.MetricsTransformer = getEnv(consts.EnvInternalImagesMetricsTransformer, consts.InternalImagesMetricsTransformerDefault)
conf.Buildkit = getEnv(consts.EnvInternalImagesBuildkit, consts.InternalImagesBuildkitDefault)
conf.BuildkitRootless = getEnv(consts.EnvInternalImagesBuildkitRootless, consts.InternalImagesBuildkitRootlessDefault)
return
}
package consts
const (
HPACPUDefaultAverageUtilization = 80
// nolint: gosec
YataiApiTokenHeaderName = "X-YATAI-API-TOKEN"
NgcOrganizationHeaderName = "Nv-Ngc-Org"
NgcUserHeaderName = "Nv-Actor-Id"
DefaultUserId = "default"
DefaultOrgId = "default"
BentoServicePort = 3000
BentoServicePortName = "http"
BentoContainerPortName = "http"
YataiImageBuilderComponentName = "yatai-image-builder"
YataiDeploymentComponentName = "yatai-deployment"
YataiBentoDeploymentComponentApiServer = "api-server"
InternalImagesBentoDownloaderDefault = "quay.io/bentoml/bento-downloader:0.0.3"
InternalImagesKanikoDefault = "quay.io/bentoml/kaniko:1.9.1"
InternalImagesMetricsTransformerDefault = "quay.io/bentoml/yatai-bento-metrics-transformer:0.0.3"
InternalImagesBuildkitDefault = "quay.io/bentoml/buildkit:master"
InternalImagesBuildkitRootlessDefault = "quay.io/bentoml/buildkit:master-rootless"
EnvYataiEndpoint = "YATAI_ENDPOINT"
EnvYataiClusterName = "YATAI_CLUSTER_NAME"
// nolint: gosec
EnvYataiApiToken = "YATAI_API_TOKEN"
EnvBentoServicePort = "PORT"
// tracking envars
EnvYataiDeploymentUID = "YATAI_T_DEPLOYMENT_UID"
EnvYataiBentoDeploymentName = "YATAI_BENTO_DEPLOYMENT_NAME"
EnvYataiBentoDeploymentNamespace = "YATAI_BENTO_DEPLOYMENT_NAMESPACE"
EnvDockerRegistryServer = "DOCKER_REGISTRY_SERVER"
EnvDockerRegistryInClusterServer = "DOCKER_REGISTRY_IN_CLUSTER_SERVER"
EnvDockerRegistryUsername = "DOCKER_REGISTRY_USERNAME"
// nolint:gosec
EnvDockerRegistryPassword = "DOCKER_REGISTRY_PASSWORD"
EnvDockerRegistrySecure = "DOCKER_REGISTRY_SECURE"
EnvDockerRegistryBentoRepositoryName = "DOCKER_REGISTRY_BENTO_REPOSITORY_NAME"
EnvDockerRegistryModelRepositoryName = "DOCKER_REGISTRY_MODEL_REPOSITORY_NAME"
EnvInternalImagesBentoDownloader = "INTERNAL_IMAGES_BENTO_DOWNLOADER"
EnvInternalImagesKaniko = "INTERNAL_IMAGES_KANIKO"
EnvInternalImagesMetricsTransformer = "INTERNAL_IMAGES_METRICS_TRANSFORMER"
EnvInternalImagesBuildkit = "INTERNAL_IMAGES_BUILDKIT"
EnvInternalImagesBuildkitRootless = "INTERNAL_IMAGES_BUILDKIT_ROOTLESS"
EnvYataiSystemNamespace = "YATAI_SYSTEM_NAMESPACE"
EnvYataiImageBuilderNamespace = "YATAI_IMAGE_BUILDER_NAMESPACE"
EnvYataiDeploymentNamespace = "YATAI_DEPLOYMENT_NAMESPACE"
EnvBentoDeploymentNamespaces = "BENTO_DEPLOYMENT_NAMESPACES"
EnvImageBuildersNamespace = "IMAGE_BUILDERS_NAMESPACE"
KubeLabelYataiSelector = "yatai.ai/selector"
KubeLabelYataiBentoRepository = "yatai.ai/bento-repository"
KubeLabelYataiBento = "yatai.ai/bento"
KubeLabelYataiModelRepository = "yatai.ai/model-repository"
KubeLabelYataiModel = "yatai.ai/model"
KubeLabelYataiBentoDeployment = "yatai.ai/bento-deployment"
KubeLabelYataiBentoDeploymentComponentType = "yatai.ai/bento-deployment-component-type"
KubeLabelYataiBentoDeploymentTargetType = "yatai.ai/bento-deployment-target-type"
KubeLabelBentoRepository = "yatai.ai/bento-repository"
KubeLabelBentoVersion = "yatai.ai/bento-version"
KubeLabelCreator = "yatai.ai/creator"
KubeLabelIsBentoImageBuilder = "yatai.ai/is-bento-image-builder"
KubeLabelIsModelSeeder = "yatai.ai/is-model-seeder"
KubeLabelBentoRequest = "yatai.ai/bento-request"
KubeLabelValueFalse = "false"
KubeLabelValueTrue = "true"
KubeLabelYataiImageBuilderPod = "yatai.ai/yatai-image-builder-pod"
KubeLabelBentoDeploymentPod = "yatai.ai/bento-deployment-pod"
KubeAnnotationBentoRepository = "yatai.ai/bento-repository"
KubeAnnotationBentoVersion = "yatai.ai/bento-version"
KubeAnnotationDockerRegistryInsecure = "yatai.ai/docker-registry-insecure"
KubeAnnotationYataiImageBuilderSeparateModels = "yatai.ai/yatai-image-builder-separate-models"
KubeAnnotationIsMultiTenancy = "yatai.ai/is-multi-tenancy"
KubeResourceGPUNvidia = "nvidia.com/gpu"
// nolint: gosec
KubeSecretNameRegcred = "yatai-regcred"
)
...@@ -22,8 +22,6 @@ import ( ...@@ -22,8 +22,6 @@ import (
"strings" "strings"
"dario.cat/mergo" "dario.cat/mergo"
"emperror.dev/errors"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
...@@ -107,7 +105,7 @@ func (r *DynamoDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Req ...@@ -107,7 +105,7 @@ func (r *DynamoDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Req
}() }()
// fetch the DynamoNIMConfig // fetch the DynamoNIMConfig
dynamoNIMConfig, err := nim.GetDynamoNIMConfig(ctx, dynamoDeployment, r.getSecret, r.Recorder) dynamoNIMConfig, err := nim.GetDynamoNIMConfig(ctx, dynamoDeployment, r.Recorder)
if err != nil { if err != nil {
reason = "failed_to_get_the_DynamoNIMConfig" reason = "failed_to_get_the_DynamoNIMConfig"
return ctrl.Result{}, err return ctrl.Result{}, err
...@@ -179,12 +177,6 @@ func (r *DynamoDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Req ...@@ -179,12 +177,6 @@ func (r *DynamoDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Req
} }
func (r *DynamoDeploymentReconciler) getSecret(ctx context.Context, namespace, name string) (*corev1.Secret, error) {
secret := &corev1.Secret{}
err := r.Get(ctx, types.NamespacedName{Namespace: namespace, Name: name}, secret)
return secret, errors.Wrap(err, "get secret")
}
// SetupWithManager sets up the controller with the Manager. // SetupWithManager sets up the controller with the Manager.
func (r *DynamoDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error { func (r *DynamoDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr). return ctrl.NewControllerManagedBy(mgr).
......
...@@ -21,10 +21,8 @@ package controller ...@@ -21,10 +21,8 @@ package controller
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"os" "os"
"reflect"
"sort" "sort"
"strconv" "strconv"
"strings" "strings"
...@@ -38,19 +36,16 @@ import ( ...@@ -38,19 +36,16 @@ import (
"emperror.dev/errors" "emperror.dev/errors"
dynamoCommon "github.com/ai-dynamo/dynamo/deploy/dynamo/operator/api/dynamo/common" dynamoCommon "github.com/ai-dynamo/dynamo/deploy/dynamo/operator/api/dynamo/common"
"github.com/ai-dynamo/dynamo/deploy/dynamo/operator/api/dynamo/modelschemas" "github.com/ai-dynamo/dynamo/deploy/dynamo/operator/api/dynamo/schemas"
"github.com/ai-dynamo/dynamo/deploy/dynamo/operator/api/dynamo/schemasv1"
yataiclient "github.com/ai-dynamo/dynamo/deploy/dynamo/operator/api/dynamo/yatai-client" yataiclient "github.com/ai-dynamo/dynamo/deploy/dynamo/operator/api/dynamo/yatai-client"
"github.com/ai-dynamo/dynamo/deploy/dynamo/operator/api/v1alpha1" "github.com/ai-dynamo/dynamo/deploy/dynamo/operator/api/v1alpha1"
commonconfig "github.com/ai-dynamo/dynamo/deploy/dynamo/operator/internal/config"
commonconsts "github.com/ai-dynamo/dynamo/deploy/dynamo/operator/internal/consts"
"github.com/ai-dynamo/dynamo/deploy/dynamo/operator/internal/controller_common" "github.com/ai-dynamo/dynamo/deploy/dynamo/operator/internal/controller_common"
"github.com/ai-dynamo/dynamo/deploy/dynamo/operator/internal/envoy" "github.com/ai-dynamo/dynamo/deploy/dynamo/operator/internal/envoy"
commonconfig "github.com/ai-dynamo/dynamo/deploy/dynamo/operator/pkg/dynamo/config"
commonconsts "github.com/ai-dynamo/dynamo/deploy/dynamo/operator/pkg/dynamo/consts"
"github.com/ai-dynamo/dynamo/deploy/dynamo/operator/pkg/dynamo/system"
"github.com/cisco-open/k8s-objectmatcher/patch" "github.com/cisco-open/k8s-objectmatcher/patch"
"github.com/huandu/xstrings" "github.com/huandu/xstrings"
"github.com/jinzhu/copier" "github.com/jinzhu/copier"
"github.com/prometheus/common/version"
istioNetworking "istio.io/api/networking/v1beta1" istioNetworking "istio.io/api/networking/v1beta1"
networkingv1beta1 "istio.io/client-go/pkg/apis/networking/v1beta1" networkingv1beta1 "istio.io/client-go/pkg/apis/networking/v1beta1"
k8serrors "k8s.io/apimachinery/pkg/api/errors" k8serrors "k8s.io/apimachinery/pkg/api/errors"
...@@ -59,20 +54,16 @@ import ( ...@@ -59,20 +54,16 @@ import (
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
"k8s.io/utils/ptr" "k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime" ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/reconcile"
compounadaiConversion "github.com/ai-dynamo/dynamo/deploy/dynamo/operator/api/dynamo/conversion"
) )
const ( const (
...@@ -395,131 +386,6 @@ func (r *DynamoNimDeploymentReconciler) Reconcile(ctx context.Context, req ctrl. ...@@ -395,131 +386,6 @@ func (r *DynamoNimDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.
modified = true modified = true
} }
if yataiClient != nil && clusterName != nil {
yataiClient_ := *yataiClient
clusterName_ := *clusterName
dynamoNimRepositoryName, dynamoNimVersion := getDynamoNimRepositoryNameAndDynamoNimVersion(dynamoNimCR)
_, err = yataiClient_.GetBento(ctx, dynamoNimRepositoryName, dynamoNimVersion)
dynamoNimIsNotFound := isNotFoundError(err)
if err != nil && !dynamoNimIsNotFound {
return
}
if dynamoNimIsNotFound {
dynamoNimDeployment, err = r.setStatusConditions(ctx, req,
metav1.Condition{
Type: v1alpha1.DynamoDeploymentConditionTypeAvailable,
Status: metav1.ConditionTrue,
Reason: "Reconciling",
Message: "Remote dynamoNim from Yatai is not found",
},
)
return
}
r.Recorder.Eventf(dynamoNimDeployment, corev1.EventTypeNormal, "GetYataiDeployment", "Fetching yatai deployment %s", dynamoNimDeployment.Name)
var oldYataiDeployment *schemasv1.DeploymentSchema
oldYataiDeployment, err = yataiClient_.GetDeployment(ctx, clusterName_, dynamoNimDeployment.Namespace, dynamoNimDeployment.Name)
isNotFound := isNotFoundError(err)
if err != nil && !isNotFound {
r.Recorder.Eventf(dynamoNimDeployment, corev1.EventTypeWarning, "GetYataiDeployment", "Failed to fetch yatai deployment %s: %s", dynamoNimDeployment.Name, err)
return
}
err = nil
envs := make([]*modelschemas.LabelItemSchema, 0)
specEnvs := dynamoNimDeployment.Spec.Envs
for _, env := range specEnvs {
envs = append(envs, &modelschemas.LabelItemSchema{
Key: env.Name,
Value: env.Value,
})
}
var hpaConf *modelschemas.DeploymentTargetHPAConf
hpaConf, err = TransformToOldHPA(dynamoNimDeployment.Spec.Autoscaling)
if err != nil {
return
}
deploymentTargets := make([]*schemasv1.CreateDeploymentTargetSchema, 0, 1)
deploymentTarget := &schemasv1.CreateDeploymentTargetSchema{
DeploymentTargetTypeSchema: schemasv1.DeploymentTargetTypeSchema{
Type: modelschemas.DeploymentTargetTypeStable,
},
BentoRepository: dynamoNimRepositoryName,
Bento: dynamoNimVersion,
Config: &modelschemas.DeploymentTargetConfig{
KubeResourceUid: string(dynamoNimDeployment.UID),
KubeResourceVersion: dynamoNimDeployment.ResourceVersion,
Resources: compounadaiConversion.ConvertToDeploymentTargetResources(dynamoNimDeployment.Spec.Resources),
HPAConf: hpaConf,
Envs: &envs,
EnableIngress: &dynamoNimDeployment.Spec.Ingress.Enabled,
EnableStealingTrafficDebugMode: &[]bool{checkIfIsStealingTrafficDebugModeEnabled(dynamoNimDeployment.Spec.Annotations)}[0],
EnableDebugMode: &[]bool{checkIfIsDebugModeEnabled(dynamoNimDeployment.Spec.Annotations)}[0],
EnableDebugPodReceiveProductionTraffic: &[]bool{checkIfIsDebugPodReceiveProductionTrafficEnabled(dynamoNimDeployment.Spec.Annotations)}[0],
BentoDeploymentOverrides: &modelschemas.ApiServerBentoDeploymentOverrides{
MonitorExporter: dynamoNimDeployment.Spec.MonitorExporter,
ExtraPodMetadata: dynamoNimDeployment.Spec.ExtraPodMetadata,
ExtraPodSpec: dynamoNimDeployment.Spec.ExtraPodSpec,
},
BentoRequestOverrides: &modelschemas.BentoRequestOverrides{
ImageBuildTimeout: dynamoNimRequest.Spec.ImageBuildTimeout,
ImageBuilderExtraPodSpec: dynamoNimRequest.Spec.ImageBuilderExtraPodSpec,
ImageBuilderExtraPodMetadata: dynamoNimRequest.Spec.ImageBuilderExtraPodMetadata,
ImageBuilderExtraContainerEnv: dynamoNimRequest.Spec.ImageBuilderExtraContainerEnv,
ImageBuilderContainerResources: dynamoNimRequest.Spec.ImageBuilderContainerResources,
DockerConfigJSONSecretName: dynamoNimRequest.Spec.DockerConfigJSONSecretName,
DownloaderContainerEnvFrom: dynamoNimRequest.Spec.DownloaderContainerEnvFrom,
},
},
}
deploymentTargets = append(deploymentTargets, deploymentTarget)
updateSchema := &schemasv1.UpdateDeploymentSchema{
Targets: deploymentTargets,
DoNotDeploy: true,
}
if isNotFound {
r.Recorder.Eventf(dynamoNimDeployment, corev1.EventTypeNormal, "CreateYataiDeployment", "Creating yatai deployment %s", dynamoNimDeployment.Name)
_, err = yataiClient_.CreateDeployment(ctx, clusterName_, &schemasv1.CreateDeploymentSchema{
Name: dynamoNimDeployment.Name,
KubeNamespace: dynamoNimDeployment.Namespace,
UpdateDeploymentSchema: *updateSchema,
})
if err != nil {
r.Recorder.Eventf(dynamoNimDeployment, corev1.EventTypeWarning, "CreateYataiDeployment", "Failed to create yatai deployment %s: %s", dynamoNimDeployment.Name, err)
return
}
r.Recorder.Eventf(dynamoNimDeployment, corev1.EventTypeNormal, "CreateYataiDeployment", "Created yatai deployment %s", dynamoNimDeployment.Name)
} else {
noChange := false
if oldYataiDeployment != nil && oldYataiDeployment.LatestRevision != nil && len(oldYataiDeployment.LatestRevision.Targets) > 0 {
oldYataiDeployment.LatestRevision.Targets[0].Config.KubeResourceUid = updateSchema.Targets[0].Config.KubeResourceUid
oldYataiDeployment.LatestRevision.Targets[0].Config.KubeResourceVersion = updateSchema.Targets[0].Config.KubeResourceVersion
noChange = reflect.DeepEqual(oldYataiDeployment.LatestRevision.Targets[0].Config, updateSchema.Targets[0].Config)
}
if noChange {
r.Recorder.Eventf(dynamoNimDeployment, corev1.EventTypeNormal, "UpdateYataiDeployment", "No change in yatai deployment %s, skipping", dynamoNimDeployment.Name)
} else {
r.Recorder.Eventf(dynamoNimDeployment, corev1.EventTypeNormal, "UpdateYataiDeployment", "Updating yatai deployment %s", dynamoNimDeployment.Name)
_, err = yataiClient_.UpdateDeployment(ctx, clusterName_, dynamoNimDeployment.Namespace, dynamoNimDeployment.Name, updateSchema)
if err != nil {
r.Recorder.Eventf(dynamoNimDeployment, corev1.EventTypeWarning, "UpdateYataiDeployment", "Failed to update yatai deployment %s: %s", dynamoNimDeployment.Name, err)
return
}
r.Recorder.Eventf(dynamoNimDeployment, corev1.EventTypeNormal, "UpdateYataiDeployment", "Updated yatai deployment %s", dynamoNimDeployment.Name)
}
}
r.Recorder.Eventf(dynamoNimDeployment, corev1.EventTypeNormal, "SyncYataiDeploymentStatus", "Syncing yatai deployment %s status", dynamoNimDeployment.Name)
_, err = yataiClient_.SyncDeploymentStatus(ctx, clusterName_, dynamoNimDeployment.Namespace, dynamoNimDeployment.Name)
if err != nil {
r.Recorder.Eventf(dynamoNimDeployment, corev1.EventTypeWarning, "SyncYataiDeploymentStatus", "Failed to sync yatai deployment %s status: %s", dynamoNimDeployment.Name, err)
return
}
r.Recorder.Eventf(dynamoNimDeployment, corev1.EventTypeNormal, "SyncYataiDeploymentStatus", "Synced yatai deployment %s status", dynamoNimDeployment.Name)
}
if !modified { if !modified {
r.Recorder.Eventf(dynamoNimDeployment, corev1.EventTypeNormal, "UpdateYataiDeployment", "No changes to yatai deployment %s", dynamoNimDeployment.Name) r.Recorder.Eventf(dynamoNimDeployment, corev1.EventTypeNormal, "UpdateYataiDeployment", "No changes to yatai deployment %s", dynamoNimDeployment.Name)
} }
...@@ -537,14 +403,6 @@ func (r *DynamoNimDeploymentReconciler) Reconcile(ctx context.Context, req ctrl. ...@@ -537,14 +403,6 @@ func (r *DynamoNimDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.
return return
} }
func isNotFoundError(err error) bool {
if err == nil {
return false
}
errMsg := strings.ToLower(err.Error())
return strings.Contains(errMsg, "not found") || strings.Contains(errMsg, "could not find") || strings.Contains(errMsg, "404")
}
func (r *DynamoNimDeploymentReconciler) reconcilePVC(ctx context.Context, crd *v1alpha1.DynamoNimDeployment) (*corev1.PersistentVolumeClaim, error) { func (r *DynamoNimDeploymentReconciler) reconcilePVC(ctx context.Context, crd *v1alpha1.DynamoNimDeployment) (*corev1.PersistentVolumeClaim, error) {
logger := log.FromContext(ctx) logger := log.FromContext(ctx)
if crd.Spec.PVC == nil { if crd.Spec.PVC == nil {
...@@ -611,21 +469,12 @@ var cachedYataiConf *commonconfig.YataiConfig ...@@ -611,21 +469,12 @@ var cachedYataiConf *commonconfig.YataiConfig
//nolint:nakedret //nolint:nakedret
func (r *DynamoNimDeploymentReconciler) getYataiClient(ctx context.Context) (yataiClient **yataiclient.YataiClient, clusterName *string, err error) { func (r *DynamoNimDeploymentReconciler) getYataiClient(ctx context.Context) (yataiClient **yataiclient.YataiClient, clusterName *string, err error) {
restConfig := config.GetConfigOrDie()
clientset, err := kubernetes.NewForConfig(restConfig)
if err != nil {
err = errors.Wrapf(err, "create kubernetes clientset")
return
}
var yataiConf *commonconfig.YataiConfig var yataiConf *commonconfig.YataiConfig
if cachedYataiConf != nil { if cachedYataiConf != nil {
yataiConf = cachedYataiConf yataiConf = cachedYataiConf
} else { } else {
yataiConf, err = commonconfig.GetYataiConfig(ctx, func(ctx context.Context, namespace, name string) (*corev1.Secret, error) { yataiConf, err = commonconfig.GetYataiConfig(ctx)
secret, err := clientset.CoreV1().Secrets(namespace).Get(ctx, name, metav1.GetOptions{})
return secret, errors.Wrap(err, "get secret")
}, commonconsts.YataiDeploymentComponentName, false)
isNotFound := k8serrors.IsNotFound(err) isNotFound := k8serrors.IsNotFound(err)
if err != nil && !isNotFound { if err != nil && !isNotFound {
err = errors.Wrap(err, "get yatai config") err = errors.Wrap(err, "get yatai config")
...@@ -1234,130 +1083,15 @@ type createOrUpdateIngressOption struct { ...@@ -1234,130 +1083,15 @@ type createOrUpdateIngressOption struct {
//nolint:nakedret //nolint:nakedret
func (r *DynamoNimDeploymentReconciler) createOrUpdateIngresses(ctx context.Context, opt createOrUpdateIngressOption) (modified bool, err error) { func (r *DynamoNimDeploymentReconciler) createOrUpdateIngresses(ctx context.Context, opt createOrUpdateIngressOption) (modified bool, err error) {
logs := log.FromContext(ctx)
dynamoNimDeployment := opt.dynamoNimDeployment dynamoNimDeployment := opt.dynamoNimDeployment
dynamoNim := opt.dynamoNim
modified, err = r.createOrUpdateVirtualService(ctx, dynamoNimDeployment) modified, err = r.createOrUpdateVirtualService(ctx, dynamoNimDeployment)
if err != nil { if err != nil {
return false, err return false, err
} }
return modified, nil
// generateIngresses generates an ingress and actively waits for the ingress to come online ....
// so disabling it for now unless explicitly enabled
if !opt.dynamoNimDeployment.Spec.Ingress.Enabled || (opt.dynamoNimDeployment.Spec.Ingress.UseVirtualService != nil && *opt.dynamoNimDeployment.Spec.Ingress.UseVirtualService) {
return false, nil
}
ingresses, err := r.generateIngresses(ctx, generateIngressesOption{
yataiClient: opt.yataiClient,
dynamoNimDeployment: dynamoNimDeployment,
dynamoNim: dynamoNim,
})
if err != nil {
return
}
for _, ingress := range ingresses {
logs := logs.WithValues("namespace", ingress.Namespace, "ingressName", ingress.Name)
ingressNamespacedName := fmt.Sprintf("%s/%s", ingress.Namespace, ingress.Name)
r.Recorder.Eventf(dynamoNimDeployment, corev1.EventTypeNormal, "GetIngress", "Getting Ingress %s", ingressNamespacedName)
oldIngress := &networkingv1.Ingress{}
err = r.Get(ctx, types.NamespacedName{Name: ingress.Name, Namespace: ingress.Namespace}, oldIngress)
oldIngressIsNotFound := k8serrors.IsNotFound(err)
if err != nil && !oldIngressIsNotFound {
r.Recorder.Eventf(dynamoNimDeployment, corev1.EventTypeWarning, "GetIngress", "Failed to get Ingress %s: %s", ingressNamespacedName, err)
logs.Error(err, "Failed to get Ingress.")
return
}
err = nil
if oldIngressIsNotFound {
if !dynamoNimDeployment.Spec.Ingress.Enabled {
logs.Info("Ingress not enabled. Skipping.")
r.Recorder.Eventf(dynamoNimDeployment, corev1.EventTypeNormal, "GetIngress", "Skipping Ingress %s", ingressNamespacedName)
continue
}
logs.Info("Ingress not found. Creating a new one.")
err = errors.Wrapf(patch.DefaultAnnotator.SetLastAppliedAnnotation(ingress), "set last applied annotation for ingress %s", ingress.Name)
if err != nil {
logs.Error(err, "Failed to set last applied annotation.")
r.Recorder.Eventf(dynamoNimDeployment, corev1.EventTypeWarning, "SetLastAppliedAnnotation", "Failed to set last applied annotation for Ingress %s: %s", ingressNamespacedName, err)
return
}
r.Recorder.Eventf(dynamoNimDeployment, corev1.EventTypeNormal, "CreateIngress", "Creating a new Ingress %s", ingressNamespacedName)
err = r.Create(ctx, ingress)
if err != nil {
logs.Error(err, "Failed to create Ingress.")
r.Recorder.Eventf(dynamoNimDeployment, corev1.EventTypeWarning, "CreateIngress", "Failed to create Ingress %s: %s", ingressNamespacedName, err)
return
}
logs.Info("Ingress created.")
r.Recorder.Eventf(dynamoNimDeployment, corev1.EventTypeNormal, "CreateIngress", "Created Ingress %s", ingressNamespacedName)
modified = true
} else {
logs.Info("Ingress found.")
if !dynamoNimDeployment.Spec.Ingress.Enabled {
logs.Info("Ingress not enabled. Deleting.")
r.Recorder.Eventf(dynamoNimDeployment, corev1.EventTypeNormal, "DeleteIngress", "Deleting Ingress %s", ingressNamespacedName)
err = r.Delete(ctx, ingress)
if err != nil {
logs.Error(err, "Failed to delete Ingress.")
r.Recorder.Eventf(dynamoNimDeployment, corev1.EventTypeWarning, "DeleteIngress", "Failed to delete Ingress %s: %s", ingressNamespacedName, err)
return
}
logs.Info("Ingress deleted.")
r.Recorder.Eventf(dynamoNimDeployment, corev1.EventTypeNormal, "DeleteIngress", "Deleted Ingress %s", ingressNamespacedName)
modified = true
continue
}
// Keep host unchanged
ingress.Spec.Rules[0].Host = oldIngress.Spec.Rules[0].Host
var patchResult *patch.PatchResult
patchResult, err = patch.DefaultPatchMaker.Calculate(oldIngress, ingress)
if err != nil {
logs.Error(err, "Failed to calculate patch.")
r.Recorder.Eventf(dynamoNimDeployment, corev1.EventTypeWarning, "CalculatePatch", "Failed to calculate patch for Ingress %s: %s", ingressNamespacedName, err)
return
}
if !patchResult.IsEmpty() {
logs.Info("Ingress spec is different. Updating Ingress.")
err = errors.Wrapf(patch.DefaultAnnotator.SetLastAppliedAnnotation(ingress), "set last applied annotation for ingress %s", ingress.Name)
if err != nil {
logs.Error(err, "Failed to set last applied annotation.")
r.Recorder.Eventf(dynamoNimDeployment, corev1.EventTypeWarning, "SetLastAppliedAnnotation", "Failed to set last applied annotation for Ingress %s: %s", ingressNamespacedName, err)
return
}
r.Recorder.Eventf(dynamoNimDeployment, corev1.EventTypeNormal, "UpdateIngress", "Updating Ingress %s", ingressNamespacedName)
err = r.Update(ctx, ingress)
if err != nil {
logs.Error(err, "Failed to update Ingress.")
r.Recorder.Eventf(dynamoNimDeployment, corev1.EventTypeWarning, "UpdateIngress", "Failed to update Ingress %s: %s", ingressNamespacedName, err)
return
}
logs.Info("Ingress updated.")
r.Recorder.Eventf(dynamoNimDeployment, corev1.EventTypeNormal, "UpdateIngress", "Updated Ingress %s", ingressNamespacedName)
modified = true
} else {
logs.Info("Ingress spec is the same. Skipping update.")
r.Recorder.Eventf(dynamoNimDeployment, corev1.EventTypeNormal, "UpdateIngress", "Skipping update Ingress %s", ingressNamespacedName)
}
}
}
return
} }
func (r *DynamoNimDeploymentReconciler) getKubeName(dynamoNimDeployment *v1alpha1.DynamoNimDeployment, _ *v1alpha1.DynamoNim, debug bool) string { func (r *DynamoNimDeploymentReconciler) getKubeName(dynamoNimDeployment *v1alpha1.DynamoNimDeployment, _ *v1alpha1.DynamoNim, debug bool) string {
...@@ -1451,9 +1185,9 @@ func (r *DynamoNimDeploymentReconciler) generateDeployment(ctx context.Context, ...@@ -1451,9 +1185,9 @@ func (r *DynamoNimDeploymentReconciler) generateDeployment(ctx context.Context,
resourceAnnotations := getResourceAnnotations(opt.dynamoNimDeployment) resourceAnnotations := getResourceAnnotations(opt.dynamoNimDeployment)
strategyStr := resourceAnnotations[KubeAnnotationDeploymentStrategy] strategyStr := resourceAnnotations[KubeAnnotationDeploymentStrategy]
if strategyStr != "" { if strategyStr != "" {
strategyType := modelschemas.DeploymentStrategy(strategyStr) strategyType := schemas.DeploymentStrategy(strategyStr)
switch strategyType { switch strategyType {
case modelschemas.DeploymentStrategyRollingUpdate: case schemas.DeploymentStrategyRollingUpdate:
strategy = appsv1.DeploymentStrategy{ strategy = appsv1.DeploymentStrategy{
Type: appsv1.RollingUpdateDeploymentStrategyType, Type: appsv1.RollingUpdateDeploymentStrategyType,
RollingUpdate: &appsv1.RollingUpdateDeployment{ RollingUpdate: &appsv1.RollingUpdateDeployment{
...@@ -1461,11 +1195,11 @@ func (r *DynamoNimDeploymentReconciler) generateDeployment(ctx context.Context, ...@@ -1461,11 +1195,11 @@ func (r *DynamoNimDeploymentReconciler) generateDeployment(ctx context.Context,
MaxUnavailable: &defaultMaxUnavailable, MaxUnavailable: &defaultMaxUnavailable,
}, },
} }
case modelschemas.DeploymentStrategyRecreate: case schemas.DeploymentStrategyRecreate:
strategy = appsv1.DeploymentStrategy{ strategy = appsv1.DeploymentStrategy{
Type: appsv1.RecreateDeploymentStrategyType, Type: appsv1.RecreateDeploymentStrategyType,
} }
case modelschemas.DeploymentStrategyRampedSlowRollout: case schemas.DeploymentStrategyRampedSlowRollout:
strategy = appsv1.DeploymentStrategy{ strategy = appsv1.DeploymentStrategy{
Type: appsv1.RollingUpdateDeploymentStrategyType, Type: appsv1.RollingUpdateDeploymentStrategyType,
RollingUpdate: &appsv1.RollingUpdateDeployment{ RollingUpdate: &appsv1.RollingUpdateDeployment{
...@@ -1473,7 +1207,7 @@ func (r *DynamoNimDeploymentReconciler) generateDeployment(ctx context.Context, ...@@ -1473,7 +1207,7 @@ func (r *DynamoNimDeploymentReconciler) generateDeployment(ctx context.Context,
MaxUnavailable: &[]intstr.IntOrString{intstr.FromInt(0)}[0], MaxUnavailable: &[]intstr.IntOrString{intstr.FromInt(0)}[0],
}, },
} }
case modelschemas.DeploymentStrategyBestEffortControlledRollout: case schemas.DeploymentStrategyBestEffortControlledRollout:
strategy = appsv1.DeploymentStrategy{ strategy = appsv1.DeploymentStrategy{
Type: appsv1.RollingUpdateDeploymentStrategyType, Type: appsv1.RollingUpdateDeploymentStrategyType,
RollingUpdate: &appsv1.RollingUpdateDeployment{ RollingUpdate: &appsv1.RollingUpdateDeployment{
...@@ -1697,37 +1431,6 @@ func (r *DynamoNimDeploymentReconciler) generatePodTemplateSpec(ctx context.Cont ...@@ -1697,37 +1431,6 @@ func (r *DynamoNimDeploymentReconciler) generatePodTemplateSpec(ctx context.Cont
}) })
} }
if opt.yataiClient != nil {
yataiClient := *opt.yataiClient
var cluster *schemasv1.ClusterFullSchema
clusterName := DefaultClusterName
if opt.clusterName != nil {
clusterName = *opt.clusterName
}
cluster, err = yataiClient.GetCluster(ctx, clusterName)
if err != nil {
return
}
var version *schemasv1.VersionSchema
version, err = yataiClient.GetVersion(ctx)
if err != nil {
return
}
defaultEnvs = append(defaultEnvs, []corev1.EnvVar{
{
Name: commonconsts.EnvYataiVersion,
Value: fmt.Sprintf("%s-%s", version.Version, version.GitCommit),
},
{
Name: commonconsts.EnvYataiClusterUID,
Value: cluster.Uid,
},
}...)
}
for _, env := range defaultEnvs { for _, env := range defaultEnvs {
if _, ok := envsSeen[env.Name]; !ok { if _, ok := envsSeen[env.Name]; !ok {
envs = append(envs, env) envs = append(envs, env)
...@@ -2571,37 +2274,6 @@ func (r *DynamoNimDeploymentReconciler) generateService(opt generateServiceOptio ...@@ -2571,37 +2274,6 @@ func (r *DynamoNimDeploymentReconciler) generateService(opt generateServiceOptio
return return
} }
func (r *DynamoNimDeploymentReconciler) generateIngressHost(ctx context.Context, dynamoNimDeployment *v1alpha1.DynamoNimDeployment) (string, error) {
return r.generateDefaultHostname(ctx, dynamoNimDeployment)
}
var cachedDomainSuffix *string
func (r *DynamoNimDeploymentReconciler) generateDefaultHostname(ctx context.Context, dynamoNimDeployment *v1alpha1.DynamoNimDeployment) (string, error) {
var domainSuffix string
if cachedDomainSuffix != nil {
domainSuffix = *cachedDomainSuffix
} else {
restConfig := config.GetConfigOrDie()
clientset, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return "", errors.Wrapf(err, "create kubernetes clientset")
}
domainSuffix, err = system.GetDomainSuffix(ctx, func(ctx context.Context, namespace, name string) (*corev1.ConfigMap, error) {
configmap, err := clientset.CoreV1().ConfigMaps(namespace).Get(ctx, name, metav1.GetOptions{})
return configmap, errors.Wrap(err, "get configmap")
}, clientset)
if err != nil {
return "", errors.Wrapf(err, "get domain suffix")
}
cachedDomainSuffix = &domainSuffix
}
return fmt.Sprintf("%s-%s.%s", dynamoNimDeployment.Name, dynamoNimDeployment.Namespace, domainSuffix), nil
}
type TLSModeOpt string type TLSModeOpt string
const ( const (
...@@ -2619,402 +2291,8 @@ type IngressConfig struct { ...@@ -2619,402 +2291,8 @@ type IngressConfig struct {
StaticTLSSecretName string StaticTLSSecretName string
} }
var cachedIngressConfig *IngressConfig
//nolint:nakedret
func (r *DynamoNimDeploymentReconciler) GetIngressConfig(ctx context.Context) (ingressConfig *IngressConfig, err error) {
if cachedIngressConfig != nil {
ingressConfig = cachedIngressConfig
return
}
restConfig := config.GetConfigOrDie()
clientset, err := kubernetes.NewForConfig(restConfig)
if err != nil {
err = errors.Wrapf(err, "create kubernetes clientset")
return
}
configMap, err := system.GetNetworkConfigConfigMap(ctx, func(ctx context.Context, namespace, name string) (*corev1.ConfigMap, error) {
configmap, err := clientset.CoreV1().ConfigMaps(namespace).Get(ctx, name, metav1.GetOptions{})
return configmap, errors.Wrap(err, "get network config configmap")
})
if err != nil {
err = errors.Wrapf(err, "failed to get configmap %s", commonconsts.KubeConfigMapNameNetworkConfig)
return
}
var className *string
className_ := strings.TrimSpace(configMap.Data[commonconsts.KubeConfigMapKeyNetworkConfigIngressClass])
if className_ != "" {
className = &className_
}
annotations := make(map[string]string)
annotations_ := strings.TrimSpace(configMap.Data[commonconsts.KubeConfigMapKeyNetworkConfigIngressAnnotations])
if annotations_ != "" {
err = json.Unmarshal([]byte(annotations_), &annotations)
if err != nil {
err = errors.Wrapf(err, "failed to json unmarshal %s in configmap %s: %s", commonconsts.KubeConfigMapKeyNetworkConfigIngressAnnotations, commonconsts.KubeConfigMapNameNetworkConfig, annotations_)
return
}
}
path := strings.TrimSpace(configMap.Data["ingress-path"])
if path == "" {
path = "/"
}
pathType := networkingv1.PathTypeImplementationSpecific
pathType_ := strings.TrimSpace(configMap.Data["ingress-path-type"])
if pathType_ != "" {
pathType = networkingv1.PathType(pathType_)
}
tlsMode := TLSModeNone
tlsModeStr := strings.TrimSpace(configMap.Data["ingress-tls-mode"])
if tlsModeStr != "" && tlsModeStr != "none" {
if tlsModeStr == "auto" || tlsModeStr == "static" {
tlsMode = TLSModeOpt(tlsModeStr)
} else {
fmt.Println("Invalid TLS mode:", tlsModeStr)
err = errors.Wrapf(err, "Invalid TLS mode: %s", tlsModeStr)
return
}
}
staticTLSSecretName := strings.TrimSpace(configMap.Data["ingress-static-tls-secret-name"])
if tlsMode == TLSModeStatic && staticTLSSecretName == "" {
err = errors.Wrapf(err, "TLS mode is static but ingress-static-tls-secret isn't set")
return
}
ingressConfig = &IngressConfig{
ClassName: className,
Annotations: annotations,
Path: path,
PathType: pathType,
TLSMode: tlsMode,
StaticTLSSecretName: staticTLSSecretName,
}
cachedIngressConfig = ingressConfig
return
}
type generateIngressesOption struct {
yataiClient **yataiclient.YataiClient
dynamoNimDeployment *v1alpha1.DynamoNimDeployment
dynamoNim *v1alpha1.DynamoNim
}
//nolint:nakedret
func (r *DynamoNimDeploymentReconciler) generateIngresses(ctx context.Context, opt generateIngressesOption) (ingresses []*networkingv1.Ingress, err error) {
dynamoNimRepositoryName, dynamoNimVersion := getDynamoNimRepositoryNameAndDynamoNimVersion(opt.dynamoNim)
dynamoNimDeployment := opt.dynamoNimDeployment
dynamoNim := opt.dynamoNim
kubeName := r.getKubeName(dynamoNimDeployment, dynamoNim, false)
r.Recorder.Eventf(dynamoNimDeployment, corev1.EventTypeNormal, "GenerateIngressHost", "Generating hostname for ingress")
internalHost, err := r.generateIngressHost(ctx, dynamoNimDeployment)
if err != nil {
r.Recorder.Eventf(dynamoNimDeployment, corev1.EventTypeWarning, "GenerateIngressHost", "Failed to generate hostname for ingress: %v", err)
return
}
r.Recorder.Eventf(dynamoNimDeployment, corev1.EventTypeNormal, "GenerateIngressHost", "Generated hostname for ingress: %s", internalHost)
annotations := r.getKubeAnnotations(dynamoNimDeployment, dynamoNim)
tag := fmt.Sprintf("%s:%s", dynamoNimRepositoryName, dynamoNimVersion)
orgName := "unknown"
annotations["nginx.ingress.kubernetes.io/configuration-snippet"] = fmt.Sprintf(`
more_set_headers "X-Powered-By: Yatai";
more_set_headers "X-Yatai-Org-Name: %s";
more_set_headers "X-Yatai-Bento: %s";
`, orgName, tag)
annotations["nginx.ingress.kubernetes.io/ssl-redirect"] = "false"
labels := r.getKubeLabels(dynamoNimDeployment, dynamoNim)
kubeNs := dynamoNimDeployment.Namespace
ingressConfig, err := r.GetIngressConfig(ctx)
if err != nil {
err = errors.Wrapf(err, "get ingress config")
return
}
ingressClassName := ingressConfig.ClassName
ingressAnnotations := ingressConfig.Annotations
ingressPath := ingressConfig.Path
ingressPathType := ingressConfig.PathType
ingressTLSMode := ingressConfig.TLSMode
ingressStaticTLSSecretName := ingressConfig.StaticTLSSecretName
for k, v := range ingressAnnotations {
annotations[k] = v
}
for k, v := range opt.dynamoNimDeployment.Spec.Ingress.Annotations {
annotations[k] = v
}
for k, v := range opt.dynamoNimDeployment.Spec.Ingress.Labels {
labels[k] = v
}
var tls []networkingv1.IngressTLS
// set default tls from network configmap
switch ingressTLSMode {
case TLSModeNone:
case TLSModeAuto:
tls = make([]networkingv1.IngressTLS, 0, 1)
tls = append(tls, networkingv1.IngressTLS{
Hosts: []string{internalHost},
SecretName: kubeName,
})
case TLSModeStatic:
tls = make([]networkingv1.IngressTLS, 0, 1)
tls = append(tls, networkingv1.IngressTLS{
Hosts: []string{internalHost},
SecretName: ingressStaticTLSSecretName,
})
default:
err = errors.Wrapf(err, "TLS mode is invalid: %s", ingressTLSMode)
return
}
// override default tls if DynamoNimDeployment defines its own tls section
if opt.dynamoNimDeployment.Spec.Ingress.TLS != nil && opt.dynamoNimDeployment.Spec.Ingress.TLS.SecretName != "" {
tls = make([]networkingv1.IngressTLS, 0, 1)
tls = append(tls, networkingv1.IngressTLS{
Hosts: []string{internalHost},
SecretName: opt.dynamoNimDeployment.Spec.Ingress.TLS.SecretName,
})
}
serviceName := r.getGenericServiceName(dynamoNimDeployment, dynamoNim)
interIng := &networkingv1.Ingress{
ObjectMeta: metav1.ObjectMeta{
Name: kubeName,
Namespace: kubeNs,
Labels: labels,
Annotations: annotations,
},
Spec: networkingv1.IngressSpec{
IngressClassName: ingressClassName,
TLS: tls,
Rules: []networkingv1.IngressRule{
{
Host: internalHost,
IngressRuleValue: networkingv1.IngressRuleValue{
HTTP: &networkingv1.HTTPIngressRuleValue{
Paths: []networkingv1.HTTPIngressPath{
{
Path: ingressPath,
PathType: &ingressPathType,
Backend: networkingv1.IngressBackend{
Service: &networkingv1.IngressServiceBackend{
Name: serviceName,
Port: networkingv1.ServiceBackendPort{
Name: commonconsts.BentoServicePortName,
},
},
},
},
},
},
},
},
},
},
}
err = ctrl.SetControllerReference(dynamoNimDeployment, interIng, r.Scheme)
if err != nil {
err = errors.Wrapf(err, "set ingress %s controller reference", interIng.Name)
return
}
ings := []*networkingv1.Ingress{interIng}
return ings, err
}
var cachedDynamoNimDeploymentNamespaces *[]string
func (r *DynamoNimDeploymentReconciler) doCleanUpAbandonedRunnerServices() error {
logs := log.Log.WithValues("func", "doCleanUpAbandonedRunnerServices")
logs.Info("start cleaning up abandoned runner services")
ctx, cancel := context.WithTimeout(context.TODO(), time.Minute*10)
defer cancel()
var dynamoNimDeploymentNamespaces []string
if cachedDynamoNimDeploymentNamespaces != nil {
dynamoNimDeploymentNamespaces = *cachedDynamoNimDeploymentNamespaces
} else {
restConfig := config.GetConfigOrDie()
clientset, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return errors.Wrapf(err, "create kubernetes clientset")
}
dynamoNimDeploymentNamespaces, err = commonconfig.GetBentoDeploymentNamespaces(ctx, func(ctx context.Context, namespace, name string) (*corev1.Secret, error) {
secret, err := clientset.CoreV1().Secrets(namespace).Get(ctx, name, metav1.GetOptions{})
return secret, errors.Wrap(err, "get secret")
})
if err != nil {
err = errors.Wrapf(err, "get dynamoNim deployment namespaces")
return err
}
cachedDynamoNimDeploymentNamespaces = &dynamoNimDeploymentNamespaces
}
for _, dynamoNimDeploymentNamespace := range dynamoNimDeploymentNamespaces {
serviceList := &corev1.ServiceList{}
serviceListOpts := []client.ListOption{
client.HasLabels{commonconsts.KubeLabelYataiBentoDeploymentRunner},
client.InNamespace(dynamoNimDeploymentNamespace),
}
err := r.List(ctx, serviceList, serviceListOpts...)
if err != nil {
return errors.Wrap(err, "list services")
}
for _, service := range serviceList.Items {
service := service
podList := &corev1.PodList{}
podListOpts := []client.ListOption{
client.InNamespace(service.Namespace),
client.MatchingLabels(service.Spec.Selector),
}
err := r.List(ctx, podList, podListOpts...)
if err != nil {
return errors.Wrap(err, "list pods")
}
if len(podList.Items) > 0 {
continue
}
createdAt := service.ObjectMeta.CreationTimestamp
if time.Since(createdAt.Time) < time.Minute*3 {
continue
}
logs.Info("deleting abandoned runner service", "name", service.Name, "namespace", service.Namespace)
err = r.Delete(ctx, &service)
if err != nil {
return errors.Wrapf(err, "delete service %s", service.Name)
}
}
}
logs.Info("finished cleaning up abandoned runner services")
return nil
}
func (r *DynamoNimDeploymentReconciler) cleanUpAbandonedRunnerServices() {
logs := log.Log.WithValues("func", "cleanUpAbandonedRunnerServices")
err := r.doCleanUpAbandonedRunnerServices()
if err != nil {
logs.Error(err, "cleanUpAbandonedRunnerServices")
}
ticker := time.NewTicker(time.Second * 30)
for range ticker.C {
err := r.doCleanUpAbandonedRunnerServices()
if err != nil {
logs.Error(err, "cleanUpAbandonedRunnerServices")
}
}
}
//nolint:nakedret
func (r *DynamoNimDeploymentReconciler) doRegisterDynamoComponent() (err error) {
logs := log.Log.WithValues("func", "doRegisterYataiComponent")
ctx, cancel := context.WithTimeout(context.TODO(), time.Minute*5)
defer cancel()
logs.Info("getting yatai client")
yataiClient, clusterName, err := r.getYataiClient(ctx)
if err != nil {
err = errors.Wrap(err, "get yatai client")
return
}
if yataiClient == nil {
logs.Info("yatai client is nil")
return
}
yataiClient_ := *yataiClient
namespace, err := commonconfig.GetYataiDeploymentNamespace(ctx, func(ctx context.Context, namespace, name string) (*corev1.Secret, error) {
secret := &corev1.Secret{}
err := r.Client.Get(ctx, client.ObjectKey{Namespace: namespace, Name: name}, secret)
return secret, errors.Wrap(err, "get secret")
})
if err != nil {
err = errors.Wrap(err, "get yatai deployment namespace")
return
}
_, err = yataiClient_.RegisterYataiComponent(ctx, *clusterName, &schemasv1.RegisterYataiComponentSchema{
Name: modelschemas.YataiComponentNameDeployment,
KubeNamespace: namespace,
Version: version.Version,
SelectorLabels: map[string]string{
"app.kubernetes.io/name": "yatai-deployment",
},
Manifest: &modelschemas.YataiComponentManifestSchema{
SelectorLabels: map[string]string{
"app.kubernetes.io/name": "yatai-deployment",
},
LatestCRDVersion: "v2alpha1",
},
})
return err
}
func (r *DynamoNimDeploymentReconciler) registerDynamoComponent() {
logs := log.Log.WithValues("func", "registerYataiComponent")
err := r.doRegisterDynamoComponent()
if err != nil {
logs.Error(err, "registerYataiComponent")
}
ticker := time.NewTicker(time.Minute * 5)
for range ticker.C {
err := r.doRegisterDynamoComponent()
if err != nil {
logs.Error(err, "registerYataiComponent")
}
}
}
// SetupWithManager sets up the controller with the Manager. // SetupWithManager sets up the controller with the Manager.
func (r *DynamoNimDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error { func (r *DynamoNimDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error {
logs := log.Log.WithValues("func", "SetupWithManager")
if os.Getenv("DISABLE_CLEANUP_ABANDONED_RUNNER_SERVICES") != commonconsts.KubeLabelValueTrue {
go r.cleanUpAbandonedRunnerServices()
} else {
logs.Info("cleanup abandoned runner services is disabled")
}
if os.Getenv("DISABLE_YATAI_COMPONENT_REGISTRATION") != commonconsts.KubeLabelValueTrue {
go r.registerDynamoComponent()
} else {
logs.Info("yatai component registration is disabled")
}
m := ctrl.NewControllerManagedBy(mgr). m := ctrl.NewControllerManagedBy(mgr).
For(&v1alpha1.DynamoNimDeployment{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})). For(&v1alpha1.DynamoNimDeployment{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
...@@ -3089,58 +2367,3 @@ func (r *DynamoNimDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error ...@@ -3089,58 +2367,3 @@ func (r *DynamoNimDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error
m.Owns(&autoscalingv2.HorizontalPodAutoscaler{}) m.Owns(&autoscalingv2.HorizontalPodAutoscaler{})
return m.Complete(r) return m.Complete(r)
} }
//nolint:nakedret
func TransformToOldHPA(hpa *v1alpha1.Autoscaling) (oldHpa *modelschemas.DeploymentTargetHPAConf, err error) {
if hpa == nil {
return
}
minReplicas := int32(hpa.MinReplicas)
maxReplicas := int32(hpa.MaxReplicas)
oldHpa = &modelschemas.DeploymentTargetHPAConf{
MinReplicas: &minReplicas,
MaxReplicas: &maxReplicas,
}
for _, metric := range hpa.Metrics {
if metric.Type == autoscalingv2.PodsMetricSourceType {
if metric.Pods == nil {
continue
}
if metric.Pods.Metric.Name == commonconsts.KubeHPAQPSMetric {
if metric.Pods.Target.Type != autoscalingv2.UtilizationMetricType {
continue
}
if metric.Pods.Target.AverageValue == nil {
continue
}
qps := metric.Pods.Target.AverageValue.Value()
oldHpa.QPS = &qps
}
} else if metric.Type == autoscalingv2.ResourceMetricSourceType {
if metric.Resource == nil {
continue
}
if metric.Resource.Name == corev1.ResourceCPU {
if metric.Resource.Target.Type != autoscalingv2.UtilizationMetricType {
continue
}
if metric.Resource.Target.AverageUtilization == nil {
continue
}
cpu := *metric.Resource.Target.AverageUtilization
oldHpa.CPU = &cpu
} else if metric.Resource.Name == corev1.ResourceMemory {
if metric.Resource.Target.Type != autoscalingv2.UtilizationMetricType {
continue
}
if metric.Resource.Target.AverageUtilization == nil {
continue
}
memory := metric.Resource.Target.AverageValue.String()
oldHpa.Memory = &memory
}
}
}
return
}
...@@ -36,14 +36,13 @@ import ( ...@@ -36,14 +36,13 @@ import (
"time" "time"
"emperror.dev/errors" "emperror.dev/errors"
commonconfig "github.com/ai-dynamo/dynamo/deploy/dynamo/operator/internal/config"
commonconsts "github.com/ai-dynamo/dynamo/deploy/dynamo/operator/internal/consts"
"github.com/ai-dynamo/dynamo/deploy/dynamo/operator/internal/controller_common" "github.com/ai-dynamo/dynamo/deploy/dynamo/operator/internal/controller_common"
commonconfig "github.com/ai-dynamo/dynamo/deploy/dynamo/operator/pkg/dynamo/config"
commonconsts "github.com/ai-dynamo/dynamo/deploy/dynamo/operator/pkg/dynamo/consts"
"github.com/apparentlymart/go-shquot/shquot" "github.com/apparentlymart/go-shquot/shquot"
"github.com/ettle/strcase" "github.com/ettle/strcase"
"github.com/huandu/xstrings" "github.com/huandu/xstrings"
"github.com/mitchellh/hashstructure/v2" "github.com/mitchellh/hashstructure/v2"
"github.com/prometheus/common/version"
"github.com/prune998/docker-registry-client/registry" "github.com/prune998/docker-registry-client/registry"
"github.com/rs/xid" "github.com/rs/xid"
"github.com/sergeymakinen/go-quote/unix" "github.com/sergeymakinen/go-quote/unix"
...@@ -66,8 +65,7 @@ import ( ...@@ -66,8 +65,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/predicate"
dynamoCommon "github.com/ai-dynamo/dynamo/deploy/dynamo/operator/api/dynamo/common" dynamoCommon "github.com/ai-dynamo/dynamo/deploy/dynamo/operator/api/dynamo/common"
"github.com/ai-dynamo/dynamo/deploy/dynamo/operator/api/dynamo/modelschemas" "github.com/ai-dynamo/dynamo/deploy/dynamo/operator/api/dynamo/schemas"
"github.com/ai-dynamo/dynamo/deploy/dynamo/operator/api/dynamo/schemasv1"
yataiclient "github.com/ai-dynamo/dynamo/deploy/dynamo/operator/api/dynamo/yatai-client" yataiclient "github.com/ai-dynamo/dynamo/deploy/dynamo/operator/api/dynamo/yatai-client"
nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/dynamo/operator/api/v1alpha1" nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/dynamo/operator/api/v1alpha1"
) )
...@@ -206,17 +204,10 @@ func (r *DynamoNimRequestReconciler) Reconcile(ctx context.Context, req ctrl.Req ...@@ -206,17 +204,10 @@ func (r *DynamoNimRequestReconciler) Reconcile(ctx context.Context, req ctrl.Req
} }
} }
separateModels := isSeparateModels(dynamoNimRequest) if isSeparateModels(dynamoNimRequest) {
err = errors.New("separate models, unsupported feature")
modelsExists := false logs.Error(err, "unsupported feature")
var modelsExistsResult ctrl.Result return
var modelsExistsErr error
if separateModels {
dynamoNimRequest, modelsExists, modelsExistsResult, modelsExistsErr = r.ensureModelsExists(ctx, ensureModelsExistsOption{
dynamoNimRequest: dynamoNimRequest,
req: req,
})
} }
dynamoNimRequest, imageInfo, imageExists, imageExistsResult, err := r.ensureImageExists(ctx, ensureImageExistsOption{ dynamoNimRequest, imageInfo, imageExists, imageExistsResult, err := r.ensureImageExists(ctx, ensureImageExistsOption{
...@@ -245,27 +236,6 @@ func (r *DynamoNimRequestReconciler) Reconcile(ctx context.Context, req ctrl.Req ...@@ -245,27 +236,6 @@ func (r *DynamoNimRequestReconciler) Reconcile(ctx context.Context, req ctrl.Req
return return
} }
if modelsExistsErr != nil {
err = errors.Wrap(modelsExistsErr, "ensure model exists")
return
}
if separateModels && !modelsExists {
result = modelsExistsResult
dynamoNimRequest, err = r.setStatusConditions(ctx, req,
metav1.Condition{
Type: nvidiacomv1alpha1.DynamoNimRequestConditionTypeDynamoNimAvailable,
Status: metav1.ConditionUnknown,
Reason: "Reconciling",
Message: "Model is seeding",
},
)
if err != nil {
return
}
return
}
dynamoNimCR := &nvidiacomv1alpha1.DynamoNim{ dynamoNimCR := &nvidiacomv1alpha1.DynamoNim{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: dynamoNimRequest.Name, Name: dynamoNimRequest.Name,
...@@ -280,16 +250,6 @@ func (r *DynamoNimRequestReconciler) Reconcile(ctx context.Context, req ctrl.Req ...@@ -280,16 +250,6 @@ func (r *DynamoNimRequestReconciler) Reconcile(ctx context.Context, req ctrl.Req
}, },
} }
if separateModels {
dynamoNimCR.Annotations = map[string]string{
commonconsts.KubeAnnotationYataiImageBuilderSeparateModels: commonconsts.KubeLabelValueTrue,
}
if isAddNamespacePrefix() { // deprecated
dynamoNimCR.Annotations[commonconsts.KubeAnnotationIsMultiTenancy] = commonconsts.KubeLabelValueTrue
}
dynamoNimCR.Annotations[KubeAnnotationModelStorageNS] = dynamoNimRequest.Annotations[KubeAnnotationModelStorageNS]
}
err = ctrl.SetControllerReference(dynamoNimRequest, dynamoNimCR, r.Scheme) err = ctrl.SetControllerReference(dynamoNimRequest, dynamoNimCR, r.Scheme)
if err != nil { if err != nil {
err = errors.Wrap(err, "set controller reference") err = errors.Wrap(err, "set controller reference")
...@@ -305,7 +265,7 @@ func (r *DynamoNimRequestReconciler) Reconcile(ctx context.Context, req ctrl.Req ...@@ -305,7 +265,7 @@ func (r *DynamoNimRequestReconciler) Reconcile(ctx context.Context, req ctrl.Req
} }
if dynamoNimRequest.Spec.DownloadURL == "" { if dynamoNimRequest.Spec.DownloadURL == "" {
var dynamoNim *schemasv1.BentoFullSchema var dynamoNim *schemas.DynamoNIM
dynamoNim, err = r.getDynamoNim(ctx, dynamoNimRequest) dynamoNim, err = r.getDynamoNim(ctx, dynamoNimRequest)
if err != nil { if err != nil {
err = errors.Wrap(err, "get dynamoNim") err = errors.Wrap(err, "get dynamoNim")
...@@ -659,258 +619,6 @@ func (r *DynamoNimRequestReconciler) ensureImageExists(ctx context.Context, opt ...@@ -659,258 +619,6 @@ func (r *DynamoNimRequestReconciler) ensureImageExists(ctx context.Context, opt
return return
} }
type ensureModelsExistsOption struct {
dynamoNimRequest *nvidiacomv1alpha1.DynamoNimRequest
req ctrl.Request
}
//nolint:gocyclo,nakedret
func (r *DynamoNimRequestReconciler) ensureModelsExists(ctx context.Context, opt ensureModelsExistsOption) (dynamoNimRequest *nvidiacomv1alpha1.DynamoNimRequest, modelsExists bool, result ctrl.Result, err error) { // nolint: unparam
dynamoNimRequest = opt.dynamoNimRequest
modelTags := make([]string, 0)
for _, model := range dynamoNimRequest.Spec.Models {
modelTags = append(modelTags, model.Tag)
}
modelsExistsCondition := meta.FindStatusCondition(dynamoNimRequest.Status.Conditions, nvidiacomv1alpha1.DynamoNimRequestConditionTypeModelsExists)
r.Recorder.Eventf(dynamoNimRequest, corev1.EventTypeNormal, "SeparateModels", "Separate models are enabled")
if modelsExistsCondition == nil || modelsExistsCondition.Status == metav1.ConditionUnknown {
r.Recorder.Eventf(dynamoNimRequest, corev1.EventTypeNormal, "ModelsExists", "Models are not ready")
modelsExistsCondition = &metav1.Condition{
Type: nvidiacomv1alpha1.DynamoNimRequestConditionTypeModelsExists,
Status: metav1.ConditionFalse,
Reason: "Reconciling",
Message: "Models are not ready",
}
dynamoNimRequest, err = r.setStatusConditions(ctx, opt.req, *modelsExistsCondition)
if err != nil {
return
}
}
modelsExists = modelsExistsCondition != nil && modelsExistsCondition.Status == metav1.ConditionTrue && modelsExistsCondition.Message == fmt.Sprintf("%s:%s", getJuiceFSStorageClassName(), strings.Join(modelTags, ", "))
if modelsExists {
return
}
modelsMap := make(map[string]*nvidiacomv1alpha1.BentoModel)
for _, model := range dynamoNimRequest.Spec.Models {
model := model
modelsMap[model.Tag] = &model
}
jobLabels := map[string]string{
commonconsts.KubeLabelBentoRequest: dynamoNimRequest.Name,
commonconsts.KubeLabelIsModelSeeder: "true",
}
jobs := &batchv1.JobList{}
err = r.List(ctx, jobs, client.InNamespace(dynamoNimRequest.Namespace), client.MatchingLabels(jobLabels))
if err != nil {
err = errors.Wrap(err, "list jobs")
return
}
var dynamoNimRequestHashStr string
dynamoNimRequestHashStr, err = r.getHashStr(dynamoNimRequest)
if err != nil {
err = errors.Wrapf(err, "get dynamoNimRequest %s/%s hash string", dynamoNimRequest.Namespace, dynamoNimRequest.Name)
return
}
existingJobModelTags := make(map[string]struct{})
for _, job_ := range jobs.Items {
job_ := job_
oldHash := job_.Annotations[KubeAnnotationDynamoNimRequestHash]
if oldHash != dynamoNimRequestHashStr {
r.Recorder.Eventf(dynamoNimRequest, corev1.EventTypeNormal, "DeleteJob", "Because hash changed, delete old job %s, oldHash: %s, newHash: %s", job_.Name, oldHash, dynamoNimRequestHashStr)
// --cascade=foreground
err = r.Delete(ctx, &job_, &client.DeleteOptions{
PropagationPolicy: &[]metav1.DeletionPropagation{metav1.DeletePropagationForeground}[0],
})
if err != nil {
err = errors.Wrapf(err, "delete job %s", job_.Name)
return
}
continue
}
modelTag := fmt.Sprintf("%s:%s", job_.Labels[commonconsts.KubeLabelYataiModelRepository], job_.Labels[commonconsts.KubeLabelYataiModel])
_, ok := modelsMap[modelTag]
if !ok {
r.Recorder.Eventf(dynamoNimRequest, corev1.EventTypeNormal, "DeleteJob", "Due to the nonexistence of the model %s, job %s has been deleted.", modelTag, job_.Name)
// --cascade=foreground
err = r.Delete(ctx, &job_, &client.DeleteOptions{
PropagationPolicy: &[]metav1.DeletionPropagation{metav1.DeletePropagationForeground}[0],
})
if err != nil {
err = errors.Wrapf(err, "delete job %s", job_.Name)
return
}
} else {
existingJobModelTags[modelTag] = struct{}{}
}
}
for _, model := range dynamoNimRequest.Spec.Models {
if _, ok := existingJobModelTags[model.Tag]; ok {
continue
}
model := model
pvc := &corev1.PersistentVolumeClaim{}
pvcName := r.getModelPVCName(dynamoNimRequest, &model)
err = r.Get(ctx, client.ObjectKey{
Namespace: dynamoNimRequest.Namespace,
Name: pvcName,
}, pvc)
isPVCNotFound := k8serrors.IsNotFound(err)
if err != nil && !isPVCNotFound {
err = errors.Wrapf(err, "get PVC %s/%s", dynamoNimRequest.Namespace, pvcName)
return
}
if isPVCNotFound {
pvc = r.generateModelPVC(GenerateModelPVCOption{
DynamoNimRequest: dynamoNimRequest,
Model: &model,
})
err = r.Create(ctx, pvc)
isPVCAlreadyExists := k8serrors.IsAlreadyExists(err)
if err != nil && !isPVCAlreadyExists {
err = errors.Wrapf(err, "create model %s/%s pvc", dynamoNimRequest.Namespace, model.Tag)
return
}
}
var job *batchv1.Job
job, err = r.generateModelSeederJob(ctx, GenerateModelSeederJobOption{
DynamoNimRequest: dynamoNimRequest,
Model: &model,
})
if err != nil {
err = errors.Wrap(err, "generate model seeder job")
return
}
oldJob := &batchv1.Job{}
err = r.Get(ctx, client.ObjectKeyFromObject(job), oldJob)
oldJobIsNotFound := k8serrors.IsNotFound(err)
if err != nil && !oldJobIsNotFound {
err = errors.Wrap(err, "get job")
return
}
if oldJobIsNotFound {
err = r.Create(ctx, job)
if err != nil {
err = errors.Wrap(err, "create job")
return
}
r.Recorder.Eventf(dynamoNimRequest, corev1.EventTypeNormal, "CreateJob", "Job %s has been created.", job.Name)
} else if !reflect.DeepEqual(job.Labels, oldJob.Labels) || !reflect.DeepEqual(job.Annotations, oldJob.Annotations) {
job.Labels = oldJob.Labels
job.Annotations = oldJob.Annotations
err = r.Update(ctx, job)
if err != nil {
err = errors.Wrap(err, "update job")
return
}
r.Recorder.Eventf(dynamoNimRequest, corev1.EventTypeNormal, "UpdateJob", "Job %s has been updated.", job.Name)
}
}
jobs = &batchv1.JobList{}
err = r.List(ctx, jobs, client.InNamespace(dynamoNimRequest.Namespace), client.MatchingLabels(jobLabels))
if err != nil {
err = errors.Wrap(err, "list jobs")
return
}
succeedModelTags := make(map[string]struct{})
failedJobNames := make([]string, 0)
notReadyJobNames := make([]string, 0)
for _, job_ := range jobs.Items {
if job_.Spec.Completions != nil && job_.Status.Succeeded == *job_.Spec.Completions {
modelTag := fmt.Sprintf("%s:%s", job_.Labels[commonconsts.KubeLabelYataiModelRepository], job_.Labels[commonconsts.KubeLabelYataiModel])
succeedModelTags[modelTag] = struct{}{}
continue
}
if job_.Status.Failed > 0 {
for _, condition := range job_.Status.Conditions {
if condition.Type == batchv1.JobFailed && condition.Status == corev1.ConditionTrue {
failedJobNames = append(failedJobNames, job_.Name)
continue
}
}
}
notReadyJobNames = append(notReadyJobNames, job_.Name)
}
if len(failedJobNames) > 0 {
msg := fmt.Sprintf("Model seeder jobs failed: %s", strings.Join(failedJobNames, ", "))
r.Recorder.Event(dynamoNimRequest, corev1.EventTypeNormal, "ModelsExists", msg)
dynamoNimRequest, err = r.setStatusConditions(ctx, opt.req,
metav1.Condition{
Type: nvidiacomv1alpha1.DynamoNimRequestConditionTypeModelsExists,
Status: metav1.ConditionFalse,
Reason: "Reconciling",
Message: msg,
},
metav1.Condition{
Type: nvidiacomv1alpha1.DynamoNimRequestConditionTypeDynamoNimAvailable,
Status: metav1.ConditionFalse,
Reason: "Reconciling",
Message: msg,
},
)
if err != nil {
return
}
err = errors.New(msg)
return
}
modelsExists = true
for _, model := range dynamoNimRequest.Spec.Models {
if _, ok := succeedModelTags[model.Tag]; !ok {
modelsExists = false
break
}
}
if modelsExists {
dynamoNimRequest, err = r.setStatusConditions(ctx, opt.req,
metav1.Condition{
Type: nvidiacomv1alpha1.DynamoNimRequestConditionTypeModelsExists,
Status: metav1.ConditionTrue,
Reason: "Reconciling",
Message: fmt.Sprintf("%s:%s", getJuiceFSStorageClassName(), strings.Join(modelTags, ", ")),
},
metav1.Condition{
Type: nvidiacomv1alpha1.DynamoNimRequestConditionTypeModelsSeeding,
Status: metav1.ConditionFalse,
Reason: "Reconciling",
Message: "All models have been seeded.",
},
)
if err != nil {
return
}
} else {
dynamoNimRequest, err = r.setStatusConditions(ctx, opt.req,
metav1.Condition{
Type: nvidiacomv1alpha1.DynamoNimRequestConditionTypeModelsSeeding,
Status: metav1.ConditionTrue,
Reason: "Reconciling",
Message: fmt.Sprintf("Model seeder jobs are not ready: %s.", strings.Join(notReadyJobNames, ", ")),
},
)
if err != nil {
return
}
}
return
}
func (r *DynamoNimRequestReconciler) setStatusConditions(ctx context.Context, req ctrl.Request, conditions ...metav1.Condition) (dynamoNimRequest *nvidiacomv1alpha1.DynamoNimRequest, err error) { func (r *DynamoNimRequestReconciler) setStatusConditions(ctx context.Context, req ctrl.Request, conditions ...metav1.Condition) (dynamoNimRequest *nvidiacomv1alpha1.DynamoNimRequest, err error) {
dynamoNimRequest = &nvidiacomv1alpha1.DynamoNimRequest{} dynamoNimRequest = &nvidiacomv1alpha1.DynamoNimRequest{}
/* /*
...@@ -1043,14 +751,7 @@ func (r *DynamoNimRequestReconciler) makeSureDockerConfigJSONSecret(ctx context. ...@@ -1043,14 +751,7 @@ func (r *DynamoNimRequestReconciler) makeSureDockerConfigJSONSecret(ctx context.
//nolint:nakedret //nolint:nakedret
func (r *DynamoNimRequestReconciler) getYataiClient(ctx context.Context) (yataiClient **yataiclient.YataiClient, yataiConf **commonconfig.YataiConfig, err error) { func (r *DynamoNimRequestReconciler) getYataiClient(ctx context.Context) (yataiClient **yataiclient.YataiClient, yataiConf **commonconfig.YataiConfig, err error) {
yataiConf_, err := commonconfig.GetYataiConfig(ctx, func(ctx context.Context, namespace, name string) (*corev1.Secret, error) { yataiConf_, err := commonconfig.GetYataiConfig(ctx)
secret := &corev1.Secret{}
err := r.Get(ctx, types.NamespacedName{
Namespace: namespace,
Name: name,
}, secret)
return secret, errors.Wrap(err, "get secret")
}, commonconsts.YataiImageBuilderComponentName, false)
isNotFound := k8serrors.IsNotFound(err) isNotFound := k8serrors.IsNotFound(err)
if err != nil && !isNotFound { if err != nil && !isNotFound {
err = errors.Wrap(err, "get yatai config") err = errors.Wrap(err, "get yatai config")
...@@ -1102,7 +803,7 @@ func (r *DynamoNimRequestReconciler) getYataiClientWithAuth(ctx context.Context, ...@@ -1102,7 +803,7 @@ func (r *DynamoNimRequestReconciler) getYataiClientWithAuth(ctx context.Context,
} }
//nolint:nakedret //nolint:nakedret
func (r *DynamoNimRequestReconciler) getDockerRegistry(ctx context.Context, dynamoNimRequest *nvidiacomv1alpha1.DynamoNimRequest) (dockerRegistry modelschemas.DockerRegistrySchema, err error) { func (r *DynamoNimRequestReconciler) getDockerRegistry(ctx context.Context, dynamoNimRequest *nvidiacomv1alpha1.DynamoNimRequest) (dockerRegistry schemas.DockerRegistrySchema, err error) {
if dynamoNimRequest != nil && dynamoNimRequest.Spec.DockerConfigJSONSecretName != "" { if dynamoNimRequest != nil && dynamoNimRequest.Spec.DockerConfigJSONSecretName != "" {
secret := &corev1.Secret{} secret := &corev1.Secret{}
err = r.Get(ctx, types.NamespacedName{ err = r.Get(ctx, types.NamespacedName{
...@@ -1174,14 +875,7 @@ func (r *DynamoNimRequestReconciler) getDockerRegistry(ctx context.Context, dyna ...@@ -1174,14 +875,7 @@ func (r *DynamoNimRequestReconciler) getDockerRegistry(ctx context.Context, dyna
return return
} }
dockerRegistryConfig, err := commonconfig.GetDockerRegistryConfig(ctx, func(ctx context.Context, namespace, name string) (*corev1.Secret, error) { dockerRegistryConfig, err := commonconfig.GetDockerRegistryConfig()
secret := &corev1.Secret{}
err := r.Get(ctx, types.NamespacedName{
Namespace: namespace,
Name: name,
}, secret)
return secret, errors.Wrap(err, "get secret")
})
if err != nil { if err != nil {
err = errors.Wrap(err, "get docker registry") err = errors.Wrap(err, "get docker registry")
return return
...@@ -1211,7 +905,7 @@ func (r *DynamoNimRequestReconciler) getDockerRegistry(ctx context.Context, dyna ...@@ -1211,7 +905,7 @@ func (r *DynamoNimRequestReconciler) getDockerRegistry(ctx context.Context, dyna
modelRepositoryInClusterURI = fmt.Sprintf("docker.io/%s", modelRepositoryName) modelRepositoryInClusterURI = fmt.Sprintf("docker.io/%s", modelRepositoryName)
} }
} }
dockerRegistry = modelschemas.DockerRegistrySchema{ dockerRegistry = schemas.DockerRegistrySchema{
Server: dockerRegistryConfig.Server, Server: dockerRegistryConfig.Server,
Username: dockerRegistryConfig.Username, Username: dockerRegistryConfig.Username,
Password: dockerRegistryConfig.Password, Password: dockerRegistryConfig.Password,
...@@ -1243,21 +937,7 @@ func getDynamoNimImagePrefix(dynamoNimRequest *nvidiacomv1alpha1.DynamoNimReques ...@@ -1243,21 +937,7 @@ func getDynamoNimImagePrefix(dynamoNimRequest *nvidiacomv1alpha1.DynamoNimReques
return "" return ""
} }
func getModelNamespace(dynamoNimRequest *nvidiacomv1alpha1.DynamoNimRequest) string { func getDynamoNimImageName(dynamoNimRequest *nvidiacomv1alpha1.DynamoNimRequest, dockerRegistry schemas.DockerRegistrySchema, dynamoNimRepositoryName, dynamoNimVersion string, inCluster bool) string {
if dynamoNimRequest == nil {
return ""
}
prefix := dynamoNimRequest.Annotations[KubeAnnotationModelStorageNS]
if prefix != "" {
return prefix
}
if isAddNamespacePrefix() {
return dynamoNimRequest.Namespace
}
return ""
}
func getDynamoNimImageName(dynamoNimRequest *nvidiacomv1alpha1.DynamoNimRequest, dockerRegistry modelschemas.DockerRegistrySchema, dynamoNimRepositoryName, dynamoNimVersion string, inCluster bool) string {
if dynamoNimRequest != nil && dynamoNimRequest.Spec.Image != "" { if dynamoNimRequest != nil && dynamoNimRequest.Spec.Image != "" {
return dynamoNimRequest.Spec.Image return dynamoNimRequest.Spec.Image
} }
...@@ -1292,7 +972,7 @@ func isSeparateModels(dynamoNimRequest *nvidiacomv1alpha1.DynamoNimRequest) (sep ...@@ -1292,7 +972,7 @@ func isSeparateModels(dynamoNimRequest *nvidiacomv1alpha1.DynamoNimRequest) (sep
return dynamoNimRequest.Annotations[commonconsts.KubeAnnotationYataiImageBuilderSeparateModels] == commonconsts.KubeLabelValueTrue return dynamoNimRequest.Annotations[commonconsts.KubeAnnotationYataiImageBuilderSeparateModels] == commonconsts.KubeLabelValueTrue
} }
func checkImageExists(dynamoNimRequest *nvidiacomv1alpha1.DynamoNimRequest, dockerRegistry modelschemas.DockerRegistrySchema, imageName string) (bool, error) { func checkImageExists(dynamoNimRequest *nvidiacomv1alpha1.DynamoNimRequest, dockerRegistry schemas.DockerRegistrySchema, imageName string) (bool, error) {
if dynamoNimRequest.Annotations["yatai.ai/force-build-image"] == commonconsts.KubeLabelValueTrue { if dynamoNimRequest.Annotations["yatai.ai/force-build-image"] == commonconsts.KubeLabelValueTrue {
return false, nil return false, nil
} }
...@@ -1330,7 +1010,7 @@ func checkImageExists(dynamoNimRequest *nvidiacomv1alpha1.DynamoNimRequest, dock ...@@ -1330,7 +1010,7 @@ func checkImageExists(dynamoNimRequest *nvidiacomv1alpha1.DynamoNimRequest, dock
} }
type ImageInfo struct { type ImageInfo struct {
DockerRegistry modelschemas.DockerRegistrySchema DockerRegistry schemas.DockerRegistrySchema
DockerConfigJSONSecretName string DockerConfigJSONSecretName string
ImageName string ImageName string
InClusterImageName string InClusterImageName string
...@@ -1362,11 +1042,7 @@ func (r *DynamoNimRequestReconciler) getImageInfo(ctx context.Context, opt GetIm ...@@ -1362,11 +1042,7 @@ func (r *DynamoNimRequestReconciler) getImageInfo(ctx context.Context, opt GetIm
if imageInfo.DockerConfigJSONSecretName == "" { if imageInfo.DockerConfigJSONSecretName == "" {
var dockerRegistryConf *commonconfig.DockerRegistryConfig var dockerRegistryConf *commonconfig.DockerRegistryConfig
dockerRegistryConf, err = commonconfig.GetDockerRegistryConfig(ctx, func(ctx context.Context, namespace, name string) (*corev1.Secret, error) { dockerRegistryConf, err = commonconfig.GetDockerRegistryConfig()
secret := &corev1.Secret{}
err := r.Get(ctx, types.NamespacedName{Namespace: namespace, Name: name}, secret)
return secret, errors.Wrap(err, "get docker registry secret")
})
if err != nil { if err != nil {
err = errors.Wrap(err, "get docker registry") err = errors.Wrap(err, "get docker registry")
return return
...@@ -1387,7 +1063,7 @@ func (r *DynamoNimRequestReconciler) getImageInfo(ctx context.Context, opt GetIm ...@@ -1387,7 +1063,7 @@ func (r *DynamoNimRequestReconciler) getImageInfo(ctx context.Context, opt GetIm
return return
} }
func (r *DynamoNimRequestReconciler) getDynamoNim(ctx context.Context, dynamoNimRequest *nvidiacomv1alpha1.DynamoNimRequest) (dynamoNim *schemasv1.BentoFullSchema, err error) { func (r *DynamoNimRequestReconciler) getDynamoNim(ctx context.Context, dynamoNimRequest *nvidiacomv1alpha1.DynamoNimRequest) (dynamoNim *schemas.DynamoNIM, err error) {
dynamoNimRepositoryName, _, dynamoNimVersion := xstrings.Partition(dynamoNimRequest.Spec.BentoTag, ":") dynamoNimRepositoryName, _, dynamoNimVersion := xstrings.Partition(dynamoNimRequest.Spec.BentoTag, ":")
yataiClient_, _, err := r.getYataiClient(ctx) yataiClient_, _, err := r.getYataiClient(ctx)
...@@ -1418,38 +1094,6 @@ func (r *DynamoNimRequestReconciler) getImageBuilderJobName() string { ...@@ -1418,38 +1094,6 @@ func (r *DynamoNimRequestReconciler) getImageBuilderJobName() string {
return fmt.Sprintf("yatai-dynamonim-image-builder-%s", guid.String()) return fmt.Sprintf("yatai-dynamonim-image-builder-%s", guid.String())
} }
func (r *DynamoNimRequestReconciler) getModelSeederJobName() string {
guid := xid.New()
return fmt.Sprintf("yatai-model-seeder-%s", guid.String())
}
func (r *DynamoNimRequestReconciler) getModelSeederJobLabels(dynamoNimRequest *nvidiacomv1alpha1.DynamoNimRequest, model *nvidiacomv1alpha1.BentoModel) map[string]string {
dynamoNimRepositoryName, _, dynamoNimVersion := xstrings.Partition(dynamoNimRequest.Spec.BentoTag, ":")
modelRepositoryName, _, modelVersion := xstrings.Partition(model.Tag, ":")
return map[string]string{
commonconsts.KubeLabelBentoRequest: dynamoNimRequest.Name,
commonconsts.KubeLabelIsModelSeeder: "true",
commonconsts.KubeLabelYataiModelRepository: modelRepositoryName,
commonconsts.KubeLabelYataiModel: modelVersion,
commonconsts.KubeLabelYataiBentoRepository: dynamoNimRepositoryName,
commonconsts.KubeLabelYataiBento: dynamoNimVersion,
}
}
func (r *DynamoNimRequestReconciler) getModelSeederPodLabels(dynamoNimRequest *nvidiacomv1alpha1.DynamoNimRequest, model *nvidiacomv1alpha1.BentoModel) map[string]string {
dynamoNimRepositoryName, _, dynamoNimVersion := xstrings.Partition(dynamoNimRequest.Spec.BentoTag, ":")
modelRepositoryName, _, modelVersion := xstrings.Partition(model.Tag, ":")
return map[string]string{
commonconsts.KubeLabelBentoRequest: dynamoNimRequest.Name,
commonconsts.KubeLabelIsModelSeeder: "true",
commonconsts.KubeLabelIsBentoImageBuilder: "true",
commonconsts.KubeLabelYataiModelRepository: modelRepositoryName,
commonconsts.KubeLabelYataiModel: modelVersion,
commonconsts.KubeLabelYataiBentoRepository: dynamoNimRepositoryName,
commonconsts.KubeLabelYataiBento: dynamoNimVersion,
}
}
func (r *DynamoNimRequestReconciler) getImageBuilderJobLabels(dynamoNimRequest *nvidiacomv1alpha1.DynamoNimRequest) map[string]string { func (r *DynamoNimRequestReconciler) getImageBuilderJobLabels(dynamoNimRequest *nvidiacomv1alpha1.DynamoNimRequest) map[string]string {
dynamoNimRepositoryName, _, dynamoNimVersion := xstrings.Partition(dynamoNimRequest.Spec.BentoTag, ":") dynamoNimRepositoryName, _, dynamoNimVersion := xstrings.Partition(dynamoNimRequest.Spec.BentoTag, ":")
labels := map[string]string{ labels := map[string]string{
...@@ -1484,413 +1128,17 @@ func hash(text string) string { ...@@ -1484,413 +1128,17 @@ func hash(text string) string {
return hex.EncodeToString(hasher.Sum(nil)) return hex.EncodeToString(hasher.Sum(nil))
} }
func (r *DynamoNimRequestReconciler) getModelPVCName(dynamoNimRequest *nvidiacomv1alpha1.DynamoNimRequest, model *nvidiacomv1alpha1.BentoModel) string { type GenerateImageBuilderJobOption struct {
storageClassName := getJuiceFSStorageClassName() ImageInfo ImageInfo
var hashStr string
ns := getModelNamespace(dynamoNimRequest)
if ns == "" {
hashStr = hash(fmt.Sprintf("%s:%s", storageClassName, model.Tag))
} else {
hashStr = hash(fmt.Sprintf("%s:%s:%s", storageClassName, ns, model.Tag))
}
pvcName := fmt.Sprintf("model-seeder-%s", hashStr)
if len(pvcName) > 63 {
pvcName = pvcName[:63]
}
return pvcName
}
func (r *DynamoNimRequestReconciler) getJuiceFSModelPath(dynamoNimRequest *nvidiacomv1alpha1.DynamoNimRequest, model *nvidiacomv1alpha1.BentoModel) string {
modelRepositoryName, _, modelVersion := xstrings.Partition(model.Tag, ":")
ns := getModelNamespace(dynamoNimRequest)
if isHuggingfaceModel(model) {
modelVersion = "all"
}
var path string
if ns == "" {
path = fmt.Sprintf("models/.shared/%s/%s", modelRepositoryName, modelVersion)
} else {
path = fmt.Sprintf("models/%s/%s/%s", ns, modelRepositoryName, modelVersion)
}
return path
}
func isHuggingfaceModel(model *nvidiacomv1alpha1.BentoModel) bool {
return strings.HasPrefix(model.DownloadURL, "hf://")
}
type GenerateModelPVCOption struct {
DynamoNimRequest *nvidiacomv1alpha1.DynamoNimRequest
Model *nvidiacomv1alpha1.BentoModel
}
//nolint:nakedret
func (r *DynamoNimRequestReconciler) generateModelPVC(opt GenerateModelPVCOption) (pvc *corev1.PersistentVolumeClaim) {
storageSize := resource.MustParse("100Gi")
if opt.Model.Size != nil {
storageSize = *opt.Model.Size
minStorageSize := resource.MustParse("1Gi")
if storageSize.Value() < minStorageSize.Value() {
storageSize = minStorageSize
}
storageSize.Set(storageSize.Value() * 2)
}
path := r.getJuiceFSModelPath(opt.DynamoNimRequest, opt.Model)
pvcName := r.getModelPVCName(opt.DynamoNimRequest, opt.Model)
pvc = &corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: pvcName,
Namespace: opt.DynamoNimRequest.Namespace,
Annotations: map[string]string{
"path": path,
},
},
Spec: corev1.PersistentVolumeClaimSpec{
AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteMany},
Resources: corev1.VolumeResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceStorage: storageSize,
},
},
StorageClassName: ptr.To(getJuiceFSStorageClassName()),
},
}
return
}
type GenerateModelSeederJobOption struct {
DynamoNimRequest *nvidiacomv1alpha1.DynamoNimRequest DynamoNimRequest *nvidiacomv1alpha1.DynamoNimRequest
Model *nvidiacomv1alpha1.BentoModel
} }
//nolint:nakedret //nolint:nakedret
func (r *DynamoNimRequestReconciler) generateModelSeederJob(ctx context.Context, opt GenerateModelSeederJobOption) (job *batchv1.Job, err error) { func (r *DynamoNimRequestReconciler) generateImageBuilderJob(ctx context.Context, opt GenerateImageBuilderJobOption) (job *batchv1.Job, err error) {
// nolint: gosimple // nolint: gosimple
podTemplateSpec, err := r.generateModelSeederPodTemplateSpec(ctx, GenerateModelSeederPodTemplateSpecOption(opt)) podTemplateSpec, err := r.generateImageBuilderPodTemplateSpec(ctx, GenerateImageBuilderPodTemplateSpecOption(opt))
if err != nil { if err != nil {
err = errors.Wrap(err, "generate model seeder pod template spec") err = errors.Wrap(err, "generate image builder pod template spec")
return
}
kubeAnnotations := make(map[string]string)
hashStr, err := r.getHashStr(opt.DynamoNimRequest)
if err != nil {
err = errors.Wrap(err, "failed to get hash string")
return
}
kubeAnnotations[KubeAnnotationDynamoNimRequestHash] = hashStr
job = &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: r.getModelSeederJobName(),
Namespace: opt.DynamoNimRequest.Namespace,
Labels: r.getModelSeederJobLabels(opt.DynamoNimRequest, opt.Model),
Annotations: kubeAnnotations,
},
Spec: batchv1.JobSpec{
Completions: ptr.To(int32(1)),
Parallelism: ptr.To(int32(1)),
PodFailurePolicy: &batchv1.PodFailurePolicy{
Rules: []batchv1.PodFailurePolicyRule{
{
Action: batchv1.PodFailurePolicyActionFailJob,
OnExitCodes: &batchv1.PodFailurePolicyOnExitCodesRequirement{
ContainerName: ptr.To(ModelSeederContainerName),
Operator: batchv1.PodFailurePolicyOnExitCodesOpIn,
Values: []int32{ModelSeederJobFailedExitCode},
},
},
},
},
Template: *podTemplateSpec,
},
}
err = ctrl.SetControllerReference(opt.DynamoNimRequest, job, r.Scheme)
if err != nil {
err = errors.Wrapf(err, "set controller reference for job %s", job.Name)
return
}
return
}
type GenerateModelSeederPodTemplateSpecOption struct {
DynamoNimRequest *nvidiacomv1alpha1.DynamoNimRequest
Model *nvidiacomv1alpha1.BentoModel
}
//nolint:nakedret
func (r *DynamoNimRequestReconciler) generateModelSeederPodTemplateSpec(ctx context.Context, opt GenerateModelSeederPodTemplateSpecOption) (pod *corev1.PodTemplateSpec, err error) {
kubeLabels := r.getModelSeederPodLabels(opt.DynamoNimRequest, opt.Model)
volumes := make([]corev1.Volume, 0)
volumeMounts := make([]corev1.VolumeMount, 0)
yataiAPITokenSecretName := ""
internalImages := commonconfig.GetInternalImages()
logrus.Infof("Model seeder is using the images %v", *internalImages)
downloaderContainerResources := corev1.ResourceRequirements{
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1000m"),
corev1.ResourceMemory: resource.MustParse("3000Mi"),
},
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("100m"),
corev1.ResourceMemory: resource.MustParse("1000Mi"),
},
}
downloaderContainerEnvFrom := opt.DynamoNimRequest.Spec.DownloaderContainerEnvFrom
if yataiAPITokenSecretName != "" {
downloaderContainerEnvFrom = append(downloaderContainerEnvFrom, corev1.EnvFromSource{
SecretRef: &corev1.SecretEnvSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: yataiAPITokenSecretName,
},
},
})
}
containers := make([]corev1.Container, 0)
model := opt.Model
modelRepositoryName, _, modelVersion := xstrings.Partition(model.Tag, ":")
modelDownloadURL := model.DownloadURL
modelDownloadHeader := ""
if modelDownloadURL == "" {
var yataiClient_ **yataiclient.YataiClient
var yataiConf_ **commonconfig.YataiConfig
yataiClient_, yataiConf_, err = r.getYataiClient(ctx)
if err != nil {
err = errors.Wrap(err, "get yatai client")
return
}
if yataiClient_ == nil || yataiConf_ == nil {
err = errors.New("can't get yatai client, please check yatai configuration")
return
}
yataiClient := *yataiClient_
yataiConf := *yataiConf_
var model_ *schemasv1.ModelFullSchema
r.Recorder.Eventf(opt.DynamoNimRequest, corev1.EventTypeNormal, "GenerateImageBuilderPod", "Getting model %s from yatai service", model.Tag)
model_, err = yataiClient.GetModel(ctx, modelRepositoryName, modelVersion)
if err != nil {
err = errors.Wrap(err, "get model")
return
}
r.Recorder.Eventf(opt.DynamoNimRequest, corev1.EventTypeNormal, "GenerateImageBuilderPod", "Model %s is got from yatai service", model.Tag)
if model_.TransmissionStrategy != nil && *model_.TransmissionStrategy == modelschemas.TransmissionStrategyPresignedURL {
var model0 *schemasv1.ModelSchema
r.Recorder.Eventf(opt.DynamoNimRequest, corev1.EventTypeNormal, "GenerateImageBuilderPod", "Getting presigned url for model %s from yatai service", model.Tag)
model0, err = yataiClient.PresignModelDownloadURL(ctx, modelRepositoryName, modelVersion)
if err != nil {
err = errors.Wrap(err, "presign model download url")
return
}
r.Recorder.Eventf(opt.DynamoNimRequest, corev1.EventTypeNormal, "GenerateImageBuilderPod", "Presigned url for model %s is got from yatai service", model.Tag)
modelDownloadURL = model0.PresignedDownloadUrl
} else {
modelDownloadURL = fmt.Sprintf("%s/api/v1/model_repositories/%s/models/%s/download", yataiConf.Endpoint, modelRepositoryName, modelVersion)
modelDownloadHeader = fmt.Sprintf("%s: %s:%s:$%s", commonconsts.YataiApiTokenHeaderName, commonconsts.YataiImageBuilderComponentName, yataiConf.ClusterName, commonconsts.EnvYataiApiToken)
}
}
modelDirPath := "/juicefs-workspace"
var modelSeedCommandOutput bytes.Buffer
err = template.Must(template.New("script").Parse(`
set -e
mkdir -p {{.ModelDirPath}}
url="{{.ModelDownloadURL}}"
if [[ ${url} == hf://* ]]; then
if [ -f "{{.ModelDirPath}}/{{.ModelVersion}}.exists" ]; then
echo "Model {{.ModelDirPath}}/{{.ModelVersion}}.exists already exists, skip downloading"
exit 0
fi
else
if [ -f "{{.ModelDirPath}}/.exists" ]; then
echo "Model {{.ModelDirPath}} already exists, skip downloading"
exit 0
fi
fi
cleanup() {
echo "Cleaning up..."
rm -rf /tmp/model
rm -f /tmp/downloaded.tar
}
trap cleanup EXIT
if [[ ${url} == hf://* ]]; then
mkdir -p /tmp/model
hf_url="${url:5}"
model_id=$(echo "$hf_url" | awk -F '@' '{print $1}')
revision=$(echo "$hf_url" | awk -F '@' '{print $2}')
endpoint=$(echo "$hf_url" | awk -F '@' '{print $3}')
export HF_ENDPOINT=${endpoint}
echo "Downloading model ${model_id} (endpoint=${endpoint}, revision=${revision}) from Huggingface..."
huggingface-cli download ${model_id} --revision ${revision} --cache-dir {{.ModelDirPath}}
else
echo "Downloading model {{.ModelRepositoryName}}:{{.ModelVersion}} to /tmp/downloaded.tar..."
if [[ ${url} == s3://* ]]; then
echo "Downloading from s3..."
aws s3 cp ${url} /tmp/downloaded.tar
elif [[ ${url} == gs://* ]]; then
echo "Downloading from GCS..."
gsutil cp ${url} /tmp/downloaded.tar
else
curl --fail -L -H "{{.ModelDownloadHeader}}" ${url} --output /tmp/downloaded.tar --progress-bar
fi
cd {{.ModelDirPath}}
echo "Extracting model tar file..."
tar -xvf /tmp/downloaded.tar
fi
if [[ ${url} == hf://* ]]; then
echo "Creating {{.ModelDirPath}}/{{.ModelVersion}}.exists file..."
touch {{.ModelDirPath}}/{{.ModelVersion}}.exists
else
echo "Creating {{.ModelDirPath}}/.exists file..."
touch {{.ModelDirPath}}/.exists
fi
echo "Done"
`)).Execute(&modelSeedCommandOutput, map[string]interface{}{
"ModelDirPath": modelDirPath,
"ModelDownloadURL": modelDownloadURL,
"ModelDownloadHeader": modelDownloadHeader,
"ModelRepositoryName": modelRepositoryName,
"ModelVersion": modelVersion,
"HuggingfaceModelDir": fmt.Sprintf("models--%s", strings.ReplaceAll(modelRepositoryName, "/", "--")),
})
if err != nil {
err = errors.Wrap(err, "failed to generate download command")
return
}
modelSeedCommand := modelSeedCommandOutput.String()
pvcName := r.getModelPVCName(opt.DynamoNimRequest, model)
volumes = append(volumes, corev1.Volume{
Name: pvcName,
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: pvcName,
},
},
})
containers = append(containers, corev1.Container{
Name: ModelSeederContainerName,
Image: internalImages.BentoDownloader,
Command: []string{
"bash",
"-c",
modelSeedCommand,
},
VolumeMounts: append(volumeMounts, corev1.VolumeMount{
Name: pvcName,
MountPath: modelDirPath,
}),
Resources: downloaderContainerResources,
EnvFrom: downloaderContainerEnvFrom,
Env: []corev1.EnvVar{
{
Name: "AWS_EC2_METADATA_DISABLED",
Value: "true",
},
},
})
kubeAnnotations := make(map[string]string)
kubeAnnotations[KubeAnnotationDynamoNimRequestModelSeederHash] = opt.DynamoNimRequest.Annotations[KubeAnnotationDynamoNimRequestModelSeederHash]
pod = &corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: kubeLabels,
Annotations: kubeAnnotations,
},
Spec: corev1.PodSpec{
RestartPolicy: corev1.RestartPolicyNever,
Volumes: volumes,
Containers: containers,
},
}
var globalExtraPodSpec *dynamoCommon.ExtraPodSpec
configNamespace, err := commonconfig.GetYataiImageBuilderNamespace(ctx, func(ctx context.Context, namespace, name string) (*corev1.Secret, error) {
secret := &corev1.Secret{}
err := r.Get(ctx, types.NamespacedName{
Namespace: namespace,
Name: name,
}, secret)
return secret, errors.Wrap(err, "get secret")
})
if err != nil {
err = errors.Wrap(err, "failed to get Yatai image builder namespace")
return
}
configCmName := "yatai-image-builder-config"
r.Recorder.Eventf(opt.DynamoNimRequest, corev1.EventTypeNormal, "GenerateModelSeederPod", "Getting configmap %s from namespace %s", configCmName, configNamespace)
configCm := &corev1.ConfigMap{}
err = r.Get(ctx, types.NamespacedName{Name: configCmName, Namespace: configNamespace}, configCm)
configCmIsNotFound := k8serrors.IsNotFound(err)
if err != nil && !configCmIsNotFound {
err = errors.Wrap(err, "failed to get configmap")
return
}
err = nil
if !configCmIsNotFound {
r.Recorder.Eventf(opt.DynamoNimRequest, corev1.EventTypeNormal, "GenerateModelSeederPod", "Configmap %s is got from namespace %s", configCmName, configNamespace)
globalExtraPodSpec = &dynamoCommon.ExtraPodSpec{}
if val, ok := configCm.Data["extra_pod_spec"]; ok {
err = yaml.Unmarshal([]byte(val), globalExtraPodSpec)
if err != nil {
err = errors.Wrapf(err, "failed to yaml unmarshal extra_pod_spec, please check the configmap %s in namespace %s", configCmName, configNamespace)
return
}
}
} else {
r.Recorder.Eventf(opt.DynamoNimRequest, corev1.EventTypeNormal, "GenerateModelSeederPod", "Configmap %s is not found in namespace %s", configCmName, configNamespace)
}
if globalExtraPodSpec != nil {
pod.Spec.PriorityClassName = globalExtraPodSpec.PriorityClassName
pod.Spec.SchedulerName = globalExtraPodSpec.SchedulerName
pod.Spec.NodeSelector = globalExtraPodSpec.NodeSelector
pod.Spec.Affinity = globalExtraPodSpec.Affinity
pod.Spec.Tolerations = globalExtraPodSpec.Tolerations
pod.Spec.TopologySpreadConstraints = globalExtraPodSpec.TopologySpreadConstraints
pod.Spec.ServiceAccountName = globalExtraPodSpec.ServiceAccountName
}
injectPodAffinity(&pod.Spec, opt.DynamoNimRequest)
return
}
type GenerateImageBuilderJobOption struct {
ImageInfo ImageInfo
DynamoNimRequest *nvidiacomv1alpha1.DynamoNimRequest
}
//nolint:nakedret
func (r *DynamoNimRequestReconciler) generateImageBuilderJob(ctx context.Context, opt GenerateImageBuilderJobOption) (job *batchv1.Job, err error) {
// nolint: gosimple
podTemplateSpec, err := r.generateImageBuilderPodTemplateSpec(ctx, GenerateImageBuilderPodTemplateSpecOption(opt))
if err != nil {
err = errors.Wrap(err, "generate image builder pod template spec")
return return
} }
kubeAnnotations := make(map[string]string) kubeAnnotations := make(map[string]string)
...@@ -2024,7 +1272,7 @@ func (r *DynamoNimRequestReconciler) generateImageBuilderPodTemplateSpec(ctx con ...@@ -2024,7 +1272,7 @@ func (r *DynamoNimRequestReconciler) generateImageBuilderPodTemplateSpec(ctx con
}) })
} }
var dynamoNim *schemasv1.BentoFullSchema var dynamoNim *schemas.DynamoNIM
yataiAPITokenSecretName := "" yataiAPITokenSecretName := ""
dynamoNimDownloadURL := opt.DynamoNimRequest.Spec.DownloadURL dynamoNimDownloadURL := opt.DynamoNimRequest.Spec.DownloadURL
dynamoNimDownloadHeader := "" dynamoNimDownloadHeader := ""
...@@ -2055,8 +1303,8 @@ func (r *DynamoNimRequestReconciler) generateImageBuilderPodTemplateSpec(ctx con ...@@ -2055,8 +1303,8 @@ func (r *DynamoNimRequestReconciler) generateImageBuilderPodTemplateSpec(ctx con
} }
r.Recorder.Eventf(opt.DynamoNimRequest, corev1.EventTypeNormal, "GenerateImageBuilderPod", "Got dynamoNim %s from yatai service", opt.DynamoNimRequest.Spec.BentoTag) r.Recorder.Eventf(opt.DynamoNimRequest, corev1.EventTypeNormal, "GenerateImageBuilderPod", "Got dynamoNim %s from yatai service", opt.DynamoNimRequest.Spec.BentoTag)
if dynamoNim.TransmissionStrategy != nil && *dynamoNim.TransmissionStrategy == modelschemas.TransmissionStrategyPresignedURL { if dynamoNim.TransmissionStrategy != nil && *dynamoNim.TransmissionStrategy == schemas.TransmissionStrategyPresignedURL {
var dynamoNim_ *schemasv1.BentoSchema var dynamoNim_ *schemas.DynamoNIM
r.Recorder.Eventf(opt.DynamoNimRequest, corev1.EventTypeNormal, "GenerateImageBuilderPod", "Getting presigned url for dynamoNim %s from yatai service", opt.DynamoNimRequest.Spec.BentoTag) r.Recorder.Eventf(opt.DynamoNimRequest, corev1.EventTypeNormal, "GenerateImageBuilderPod", "Getting presigned url for dynamoNim %s from yatai service", opt.DynamoNimRequest.Spec.BentoTag)
dynamoNim_, err = yataiClient.PresignBentoDownloadURL(ctx, dynamoNimRepositoryName, dynamoNimVersion) dynamoNim_, err = yataiClient.PresignBentoDownloadURL(ctx, dynamoNimRepositoryName, dynamoNimVersion)
if err != nil { if err != nil {
...@@ -2212,8 +1460,6 @@ echo "Done" ...@@ -2212,8 +1460,6 @@ echo "Done"
containers := make([]corev1.Container, 0) containers := make([]corev1.Container, 0)
separateModels := isSeparateModels(opt.DynamoNimRequest)
models := opt.DynamoNimRequest.Spec.Models models := opt.DynamoNimRequest.Spec.Models
modelsSeen := map[string]struct{}{} modelsSeen := map[string]struct{}{}
for _, model := range models { for _, model := range models {
...@@ -2230,123 +1476,6 @@ echo "Done" ...@@ -2230,123 +1476,6 @@ echo "Done"
} }
} }
for idx, model := range models {
if separateModels {
continue
}
modelRepositoryName, _, modelVersion := xstrings.Partition(model.Tag, ":")
modelDownloadURL := model.DownloadURL
modelDownloadHeader := ""
if modelDownloadURL == "" {
if dynamoNim == nil {
continue
}
var yataiClient_ **yataiclient.YataiClient
var yataiConf_ **commonconfig.YataiConfig
yataiClient_, yataiConf_, err = r.getYataiClientWithAuth(ctx, opt.DynamoNimRequest)
if err != nil {
err = errors.Wrap(err, "get yatai client")
return
}
if yataiClient_ == nil || yataiConf_ == nil {
err = errors.New("can't get yatai client, please check yatai configuration")
return
}
yataiClient := *yataiClient_
yataiConf := *yataiConf_
var model_ *schemasv1.ModelFullSchema
r.Recorder.Eventf(opt.DynamoNimRequest, corev1.EventTypeNormal, "GenerateImageBuilderPod", "Getting model %s from yatai service", model.Tag)
model_, err = yataiClient.GetModel(ctx, modelRepositoryName, modelVersion)
if err != nil {
err = errors.Wrap(err, "get model")
return
}
r.Recorder.Eventf(opt.DynamoNimRequest, corev1.EventTypeNormal, "GenerateImageBuilderPod", "Model %s is got from yatai service", model.Tag)
if model_.TransmissionStrategy != nil && *model_.TransmissionStrategy == modelschemas.TransmissionStrategyPresignedURL {
var model0 *schemasv1.ModelSchema
r.Recorder.Eventf(opt.DynamoNimRequest, corev1.EventTypeNormal, "GenerateImageBuilderPod", "Getting presigned url for model %s from yatai service", model.Tag)
model0, err = yataiClient.PresignModelDownloadURL(ctx, modelRepositoryName, modelVersion)
if err != nil {
err = errors.Wrap(err, "presign model download url")
return
}
r.Recorder.Eventf(opt.DynamoNimRequest, corev1.EventTypeNormal, "GenerateImageBuilderPod", "Presigned url for model %s is got from yatai service", model.Tag)
modelDownloadURL = model0.PresignedDownloadUrl
} else {
modelDownloadURL = fmt.Sprintf("%s/api/v1/model_repositories/%s/models/%s/download", yataiConf.Endpoint, modelRepositoryName, modelVersion)
modelDownloadHeader = fmt.Sprintf("%s: %s:%s:$%s", commonconsts.YataiApiTokenHeaderName, commonconsts.YataiImageBuilderComponentName, yataiConf.ClusterName, commonconsts.EnvYataiApiToken)
}
}
modelRepositoryDirPath := fmt.Sprintf("/workspace/buildcontext/models/%s", modelRepositoryName)
modelDirPath := filepath.Join(modelRepositoryDirPath, modelVersion)
var modelDownloadCommandOutput bytes.Buffer
err = template.Must(template.New("script").Parse(`
set -e
mkdir -p {{.ModelDirPath}}
url="{{.ModelDownloadURL}}"
echo "Downloading model {{.ModelRepositoryName}}:{{.ModelVersion}} to /tmp/downloaded.tar..."
if [[ ${url} == s3://* ]]; then
echo "Downloading from s3..."
aws s3 cp ${url} /tmp/downloaded.tar
elif [[ ${url} == gs://* ]]; then
echo "Downloading from GCS..."
gsutil cp ${url} /tmp/downloaded.tar
else
curl --fail -L -H "{{.ModelDownloadHeader}}" ${url} --output /tmp/downloaded.tar --progress-bar
fi
cd {{.ModelDirPath}}
echo "Extracting model tar file..."
tar -xvf /tmp/downloaded.tar
echo -n '{{.ModelVersion}}' > {{.ModelRepositoryDirPath}}/latest
echo "Removing model tar file..."
rm /tmp/downloaded.tar
{{if not .Privileged}}
echo "Changing directory permission..."
chown -R 1000:1000 /workspace
{{end}}
echo "Done"
`)).Execute(&modelDownloadCommandOutput, map[string]interface{}{
"ModelDirPath": modelDirPath,
"ModelDownloadURL": modelDownloadURL,
"ModelDownloadHeader": modelDownloadHeader,
"ModelRepositoryDirPath": modelRepositoryDirPath,
"ModelRepositoryName": modelRepositoryName,
"ModelVersion": modelVersion,
"Privileged": privileged,
})
if err != nil {
err = errors.Wrap(err, "failed to generate download command")
return
}
modelDownloadCommand := modelDownloadCommandOutput.String()
initContainers = append(initContainers, corev1.Container{
Name: fmt.Sprintf("model-downloader-%d", idx),
Image: internalImages.BentoDownloader,
Command: []string{
"bash",
"-c",
modelDownloadCommand,
},
VolumeMounts: volumeMounts,
Resources: downloaderContainerResources,
EnvFrom: downloaderContainerEnvFrom,
Env: []corev1.EnvVar{
{
Name: "AWS_EC2_METADATA_DISABLED",
Value: "true",
},
},
})
}
var globalExtraPodMetadata *dynamoCommon.ExtraPodMetadata var globalExtraPodMetadata *dynamoCommon.ExtraPodMetadata
var globalExtraPodSpec *dynamoCommon.ExtraPodSpec var globalExtraPodSpec *dynamoCommon.ExtraPodSpec
var globalExtraContainerEnv []corev1.EnvVar var globalExtraContainerEnv []corev1.EnvVar
...@@ -2354,14 +1483,7 @@ echo "Done" ...@@ -2354,14 +1483,7 @@ echo "Done"
var buildArgs []string var buildArgs []string
var builderArgs []string var builderArgs []string
configNamespace, err := commonconfig.GetYataiImageBuilderNamespace(ctx, func(ctx context.Context, namespace, name string) (*corev1.Secret, error) { configNamespace, err := commonconfig.GetYataiImageBuilderNamespace(ctx)
secret := &corev1.Secret{}
err := r.Get(ctx, types.NamespacedName{
Namespace: namespace,
Name: name,
}, secret)
return secret, errors.Wrap(err, "get secret")
})
if err != nil { if err != nil {
err = errors.Wrap(err, "failed to get Yatai image builder namespace") err = errors.Wrap(err, "failed to get Yatai image builder namespace")
return return
...@@ -2826,95 +1948,12 @@ func (r *DynamoNimRequestReconciler) getHashStr(dynamoNimRequest *nvidiacomv1alp ...@@ -2826,95 +1948,12 @@ func (r *DynamoNimRequestReconciler) getHashStr(dynamoNimRequest *nvidiacomv1alp
return hashStr, nil return hashStr, nil
} }
func getJuiceFSStorageClassName() string {
if v := os.Getenv("JUICEFS_STORAGE_CLASS_NAME"); v != "" {
return v
}
return "juicefs-sc"
}
const ( const (
trueStr = "true" trueStr = "true"
) )
//nolint:nakedret
func (r *DynamoNimRequestReconciler) doRegisterDynamoComponent() (err error) {
logs := log.Log.WithValues("func", "doRegisterYataiComponent")
ctx, cancel := context.WithTimeout(context.TODO(), time.Minute*5)
defer cancel()
logs.Info("getting yatai client")
yataiClient, yataiConf, err := r.getYataiClient(ctx)
if err != nil {
err = errors.Wrap(err, "get yatai client")
return
}
if yataiClient == nil || yataiConf == nil {
logs.Info("can't get yatai client, skip registering")
return
}
yataiClient_ := *yataiClient
yataiConf_ := *yataiConf
namespace, err := commonconfig.GetYataiImageBuilderNamespace(ctx, func(ctx context.Context, namespace, name string) (*corev1.Secret, error) {
secret := &corev1.Secret{}
err := r.Get(ctx, types.NamespacedName{
Namespace: namespace,
Name: name,
}, secret)
return secret, errors.Wrap(err, "get secret")
})
if err != nil {
err = errors.Wrap(err, "get yatai image builder namespace")
return
}
_, err = yataiClient_.RegisterYataiComponent(ctx, yataiConf_.ClusterName, &schemasv1.RegisterYataiComponentSchema{
Name: modelschemas.YataiComponentNameImageBuilder,
KubeNamespace: namespace,
Version: version.Version,
SelectorLabels: map[string]string{
"app.kubernetes.io/name": "yatai-image-builder",
},
Manifest: &modelschemas.YataiComponentManifestSchema{
SelectorLabels: map[string]string{
"app.kubernetes.io/name": "yatai-image-builder",
},
LatestCRDVersion: "v1alpha1",
},
})
err = errors.Wrap(err, "register yatai component")
return err
}
func (r *DynamoNimRequestReconciler) registerDynamoComponent() {
logs := log.Log.WithValues("func", "registerYataiComponent")
err := r.doRegisterDynamoComponent()
if err != nil {
logs.Error(err, "registerYataiComponent")
}
ticker := time.NewTicker(time.Minute * 5)
for range ticker.C {
err := r.doRegisterDynamoComponent()
if err != nil {
logs.Error(err, "registerYataiComponent")
}
}
}
// SetupWithManager sets up the controller with the Manager. // SetupWithManager sets up the controller with the Manager.
func (r *DynamoNimRequestReconciler) SetupWithManager(mgr ctrl.Manager) error { func (r *DynamoNimRequestReconciler) SetupWithManager(mgr ctrl.Manager) error {
logs := log.Log.WithValues("func", "SetupWithManager")
if os.Getenv("DISABLE_YATAI_COMPONENT_REGISTRATION") != trueStr {
go r.registerDynamoComponent()
} else {
logs.Info("yatai component registration is disabled")
}
err := ctrl.NewControllerManagedBy(mgr). err := ctrl.NewControllerManagedBy(mgr).
For(&nvidiacomv1alpha1.DynamoNimRequest{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})). For(&nvidiacomv1alpha1.DynamoNimRequest{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
......
...@@ -27,12 +27,11 @@ import ( ...@@ -27,12 +27,11 @@ import (
"emperror.dev/errors" "emperror.dev/errors"
compounaiCommon "github.com/ai-dynamo/dynamo/deploy/dynamo/operator/api/dynamo/common" compounaiCommon "github.com/ai-dynamo/dynamo/deploy/dynamo/operator/api/dynamo/common"
"github.com/ai-dynamo/dynamo/deploy/dynamo/operator/api/dynamo/modelschemas" "github.com/ai-dynamo/dynamo/deploy/dynamo/operator/api/dynamo/schemas"
"github.com/ai-dynamo/dynamo/deploy/dynamo/operator/api/dynamo/schemasv1"
yataiclient "github.com/ai-dynamo/dynamo/deploy/dynamo/operator/api/dynamo/yatai-client" yataiclient "github.com/ai-dynamo/dynamo/deploy/dynamo/operator/api/dynamo/yatai-client"
"github.com/ai-dynamo/dynamo/deploy/dynamo/operator/api/v1alpha1" "github.com/ai-dynamo/dynamo/deploy/dynamo/operator/api/v1alpha1"
commonconfig "github.com/ai-dynamo/dynamo/deploy/dynamo/operator/pkg/dynamo/config" commonconfig "github.com/ai-dynamo/dynamo/deploy/dynamo/operator/internal/config"
commonconsts "github.com/ai-dynamo/dynamo/deploy/dynamo/operator/pkg/dynamo/consts" commonconsts "github.com/ai-dynamo/dynamo/deploy/dynamo/operator/internal/consts"
"github.com/huandu/xstrings" "github.com/huandu/xstrings"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors" k8serrors "k8s.io/apimachinery/pkg/api/errors"
...@@ -79,17 +78,17 @@ type ServiceConfig struct { ...@@ -79,17 +78,17 @@ type ServiceConfig struct {
Config Config `yaml:"config"` Config Config `yaml:"config"`
} }
func RetrieveDynamoNimDownloadURL(ctx context.Context, dynamoDeployment *v1alpha1.DynamoDeployment, secretGetter SecretGetter, recorder EventRecorder) (*string, *string, error) { func RetrieveDynamoNimDownloadURL(ctx context.Context, dynamoDeployment *v1alpha1.DynamoDeployment, recorder EventRecorder) (*string, *string, error) {
dynamoNimDownloadURL := "" dynamoNimDownloadURL := ""
dynamoNimApiToken := "" dynamoNimApiToken := ""
var dynamoNim *schemasv1.BentoFullSchema var dynamoNim *schemas.DynamoNIM
dynamoNimRepositoryName, _, dynamoNimVersion := xstrings.Partition(dynamoDeployment.Spec.DynamoNim, ":") dynamoNimRepositoryName, _, dynamoNimVersion := xstrings.Partition(dynamoDeployment.Spec.DynamoNim, ":")
var err error var err error
var yataiClient_ **yataiclient.YataiClient var yataiClient_ **yataiclient.YataiClient
var yataiConf_ **commonconfig.YataiConfig var yataiConf_ **commonconfig.YataiConfig
yataiClient_, yataiConf_, err = GetYataiClient(ctx, secretGetter) yataiClient_, yataiConf_, err = GetYataiClient(ctx)
if err != nil { if err != nil {
err = errors.Wrap(err, "get yatai client") err = errors.Wrap(err, "get yatai client")
return nil, nil, err return nil, nil, err
...@@ -111,8 +110,8 @@ func RetrieveDynamoNimDownloadURL(ctx context.Context, dynamoDeployment *v1alpha ...@@ -111,8 +110,8 @@ func RetrieveDynamoNimDownloadURL(ctx context.Context, dynamoDeployment *v1alpha
} }
recorder.Eventf(dynamoDeployment, corev1.EventTypeNormal, "GenerateImageBuilderPod", "Got dynamoNim %s from yatai service", dynamoDeployment.Spec.DynamoNim) recorder.Eventf(dynamoDeployment, corev1.EventTypeNormal, "GenerateImageBuilderPod", "Got dynamoNim %s from yatai service", dynamoDeployment.Spec.DynamoNim)
if dynamoNim.TransmissionStrategy != nil && *dynamoNim.TransmissionStrategy == modelschemas.TransmissionStrategyPresignedURL { if dynamoNim.TransmissionStrategy != nil && *dynamoNim.TransmissionStrategy == schemas.TransmissionStrategyPresignedURL {
var dynamoNim_ *schemasv1.BentoSchema var dynamoNim_ *schemas.DynamoNIM
recorder.Eventf(dynamoDeployment, corev1.EventTypeNormal, "GenerateImageBuilderPod", "Getting presigned url for dynamoNim %s from yatai service", dynamoDeployment.Spec.DynamoNim) recorder.Eventf(dynamoDeployment, corev1.EventTypeNormal, "GenerateImageBuilderPod", "Getting presigned url for dynamoNim %s from yatai service", dynamoDeployment.Spec.DynamoNim)
dynamoNim_, err = yataiClient.PresignBentoDownloadURL(ctx, dynamoNimRepositoryName, dynamoNimVersion) dynamoNim_, err = yataiClient.PresignBentoDownloadURL(ctx, dynamoNimRepositoryName, dynamoNimVersion)
if err != nil { if err != nil {
...@@ -175,35 +174,8 @@ func RetrieveDynamoNIMConfigurationFile(ctx context.Context, url string, yataiAp ...@@ -175,35 +174,8 @@ func RetrieveDynamoNIMConfigurationFile(ctx context.Context, url string, yataiAp
return yamlContent, nil return yamlContent, nil
} }
func GetYataiClientWithAuth(ctx context.Context, dynamoNimRequest *v1alpha1.DynamoNimRequest, secretGetter SecretGetter) (**yataiclient.YataiClient, **commonconfig.YataiConfig, error) { func GetYataiClient(ctx context.Context) (yataiClient **yataiclient.YataiClient, yataiConf **commonconfig.YataiConfig, err error) {
orgId, ok := dynamoNimRequest.Labels[commonconsts.NgcOrganizationHeaderName] yataiConf_, err := commonconfig.GetYataiConfig(ctx)
if !ok {
orgId = commonconsts.DefaultOrgId
}
userId, ok := dynamoNimRequest.Labels[commonconsts.NgcUserHeaderName]
if !ok {
userId = commonconsts.DefaultUserId
}
auth := yataiclient.DynamoAuthHeaders{
OrgId: orgId,
UserId: userId,
}
client, yataiConf, err := GetYataiClient(ctx, secretGetter)
if err != nil {
return nil, nil, err
}
(*client).SetAuth(auth)
return client, yataiConf, err
}
type SecretGetter func(ctx context.Context, namespace, name string) (*corev1.Secret, error)
func GetYataiClient(ctx context.Context, secretGetter SecretGetter) (yataiClient **yataiclient.YataiClient, yataiConf **commonconfig.YataiConfig, err error) {
yataiConf_, err := commonconfig.GetYataiConfig(ctx, secretGetter, commonconsts.YataiImageBuilderComponentName, false)
isNotFound := k8serrors.IsNotFound(err) isNotFound := k8serrors.IsNotFound(err)
if err != nil && !isNotFound { if err != nil && !isNotFound {
err = errors.Wrap(err, "get yatai config") err = errors.Wrap(err, "get yatai config")
...@@ -237,8 +209,8 @@ func ParseDynamoNIMConfig(ctx context.Context, yamlContent *bytes.Buffer) (*Dyna ...@@ -237,8 +209,8 @@ func ParseDynamoNIMConfig(ctx context.Context, yamlContent *bytes.Buffer) (*Dyna
return &config, err return &config, err
} }
func GetDynamoNIMConfig(ctx context.Context, dynamoDeployment *v1alpha1.DynamoDeployment, secretGetter SecretGetter, recorder EventRecorder) (*DynamoNIMConfig, error) { func GetDynamoNIMConfig(ctx context.Context, dynamoDeployment *v1alpha1.DynamoDeployment, recorder EventRecorder) (*DynamoNIMConfig, error) {
dynamoNimDownloadURL, dynamoNimApiToken, err := RetrieveDynamoNimDownloadURL(ctx, dynamoDeployment, secretGetter, recorder) dynamoNimDownloadURL, dynamoNimApiToken, err := RetrieveDynamoNimDownloadURL(ctx, dynamoDeployment, recorder)
if err != nil { if err != nil {
return nil, err return nil, err
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment