Unverified Commit 02a33f15 authored by julienmancuso's avatar julienmancuso Committed by GitHub
Browse files

feat: add dynamoDeployment CR finalizer (#623)

parent cb0ceb81
......@@ -23,7 +23,7 @@ version: 25.2.0-rc3
home: https://nvidia.com
dependencies:
- name: dynamo-operator
version: 0.1.1
version: 0.1.2
repository: file://components/operator
condition: dynamo-operator.enabled
- name: dynamo-api-store
......
......@@ -27,7 +27,7 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.1.1
version: 0.1.2
# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
......@@ -35,5 +35,5 @@ version: 0.1.1
appVersion: "0.1.0"
dependencies:
- name: dynamo-crds
version: 0.1.1
version: 0.1.2
repository: file://charts/dynamo-crds
\ No newline at end of file
......@@ -16,5 +16,5 @@ apiVersion: v2
name: dynamo-crds
description: A Helm chart for CRDs of dynamo operator
type: application
version: 0.1.1
version: 0.1.2
dependencies: []
\ No newline at end of file
......@@ -389,6 +389,8 @@ spec:
minReplicas:
type: integer
type: object
dynamoNamespace:
type: string
dynamoNim:
type: string
dynamoTag:
......
......@@ -36,7 +36,7 @@ type DynamoDeploymentSpec struct {
// value is the DynamoNimDeployment override for that service
// if not set, the DynamoNimDeployment will be used as is
// +kubebuilder:validation:Optional
Services map[string]*DynamoNimDeployment `json:"services,omitempty"`
Services map[string]*DynamoNimDeploymentOverridesSpec `json:"services,omitempty"`
// Environment variables to be set in the deployment
// +kubebuilder:validation:Optional
Envs []corev1.EnvVar `json:"envs,omitempty"`
......
......@@ -36,19 +36,30 @@ const (
// DynamoNimDeploymentSpec defines the desired state of DynamoNimDeployment
type DynamoNimDeploymentSpec struct {
DynamoNim string `json:"dynamoNim"`
// contains the tag of the DynamoNim: for example, "my_package:MyService"
DynamoTag string `json:"dynamoTag"`
DynamoNimDeploymentSharedSpec `json:",inline"`
}
type DynamoNimDeploymentOverridesSpec struct {
DynamoNimDeploymentSharedSpec `json:",inline"`
}
type DynamoNimDeploymentSharedSpec struct {
// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
// Important: Run "make" to regenerate code after modifying this file
Annotations map[string]string `json:"annotations,omitempty"`
Labels map[string]string `json:"labels,omitempty"`
DynamoNim string `json:"dynamoNim"`
// contains the tag of the DynamoNim: for example, "my_package:MyService"
DynamoTag string `json:"dynamoTag"`
// contains the name of the service
ServiceName string `json:"serviceName,omitempty"`
// dynamo namespace of the service (allows to override the dynamo namespace of the service defined in annotations inside the dynamo archive)
DynamoNamespace *string `json:"dynamoNamespace,omitempty"`
Resources *dynamoCommon.Resources `json:"resources,omitempty"`
Autoscaling *Autoscaling `json:"autoscaling,omitempty"`
Envs []corev1.EnvVar `json:"envs,omitempty"`
......
......@@ -200,15 +200,15 @@ func (in *DynamoDeploymentSpec) DeepCopyInto(out *DynamoDeploymentSpec) {
*out = *in
if in.Services != nil {
in, out := &in.Services, &out.Services
*out = make(map[string]*DynamoNimDeployment, len(*in))
*out = make(map[string]*DynamoNimDeploymentOverridesSpec, len(*in))
for key, val := range *in {
var outVal *DynamoNimDeployment
var outVal *DynamoNimDeploymentOverridesSpec
if val == nil {
(*out)[key] = nil
} else {
inVal := (*in)[key]
in, out := &inVal, &outVal
*out = new(DynamoNimDeployment)
*out = new(DynamoNimDeploymentOverridesSpec)
(*in).DeepCopyInto(*out)
}
(*out)[key] = outVal
......@@ -342,7 +342,23 @@ func (in *DynamoNimDeploymentList) DeepCopyObject() runtime.Object {
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *DynamoNimDeploymentSpec) DeepCopyInto(out *DynamoNimDeploymentSpec) {
func (in *DynamoNimDeploymentOverridesSpec) DeepCopyInto(out *DynamoNimDeploymentOverridesSpec) {
*out = *in
in.DynamoNimDeploymentSharedSpec.DeepCopyInto(&out.DynamoNimDeploymentSharedSpec)
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DynamoNimDeploymentOverridesSpec.
func (in *DynamoNimDeploymentOverridesSpec) DeepCopy() *DynamoNimDeploymentOverridesSpec {
if in == nil {
return nil
}
out := new(DynamoNimDeploymentOverridesSpec)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *DynamoNimDeploymentSharedSpec) DeepCopyInto(out *DynamoNimDeploymentSharedSpec) {
*out = *in
if in.Annotations != nil {
in, out := &in.Annotations, &out.Annotations
......@@ -358,6 +374,11 @@ func (in *DynamoNimDeploymentSpec) DeepCopyInto(out *DynamoNimDeploymentSpec) {
(*out)[key] = val
}
}
if in.DynamoNamespace != nil {
in, out := &in.DynamoNamespace, &out.DynamoNamespace
*out = new(string)
**out = **in
}
if in.Resources != nil {
in, out := &in.Resources, &out.Resources
*out = new(common.Resources)
......@@ -430,6 +451,22 @@ func (in *DynamoNimDeploymentSpec) DeepCopyInto(out *DynamoNimDeploymentSpec) {
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DynamoNimDeploymentSharedSpec.
func (in *DynamoNimDeploymentSharedSpec) DeepCopy() *DynamoNimDeploymentSharedSpec {
if in == nil {
return nil
}
out := new(DynamoNimDeploymentSharedSpec)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *DynamoNimDeploymentSpec) DeepCopyInto(out *DynamoNimDeploymentSpec) {
*out = *in
in.DynamoNimDeploymentSharedSpec.DeepCopyInto(&out.DynamoNimDeploymentSharedSpec)
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DynamoNimDeploymentSpec.
func (in *DynamoNimDeploymentSpec) DeepCopy() *DynamoNimDeploymentSpec {
if in == nil {
......
......@@ -23,9 +23,11 @@ import (
"crypto/tls"
"flag"
"os"
"time"
// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
clientv3 "go.etcd.io/etcd/client/v3"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"sigs.k8s.io/controller-runtime/pkg/cache"
......@@ -41,6 +43,7 @@ import (
nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/dynamo/operator/api/v1alpha1"
"github.com/ai-dynamo/dynamo/deploy/dynamo/operator/internal/controller"
commonController "github.com/ai-dynamo/dynamo/deploy/dynamo/operator/internal/controller_common"
"github.com/ai-dynamo/dynamo/deploy/dynamo/operator/internal/etcd"
istioclientsetscheme "istio.io/client-go/pkg/clientset/versioned/scheme"
//+kubebuilder:scaffold:imports
)
......@@ -150,13 +153,25 @@ func main() {
os.Exit(1)
}
// Create etcd client
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{etcdAddr},
DialTimeout: 5 * time.Second,
DialKeepAliveTime: 10 * time.Second,
DialKeepAliveTimeout: 3 * time.Second,
})
if err != nil {
setupLog.Error(err, "unable to create etcd client")
os.Exit(1)
}
if err = (&controller.DynamoNimDeploymentReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("yatai-deployment"),
Config: ctrlConfig,
NatsAddr: natsAddr,
EtcdAddr: etcdAddr,
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("yatai-deployment"),
Config: ctrlConfig,
NatsAddr: natsAddr,
EtcdAddr: etcdAddr,
EtcdStorage: etcd.NewStorage(cli),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "DynamoNimDeployment")
os.Exit(1)
......
This source diff could not be displayed because it is too large. You can view the blob instead.
......@@ -389,6 +389,8 @@ spec:
minReplicas:
type: integer
type: object
dynamoNamespace:
type: string
dynamoNim:
type: string
dynamoTag:
......
......@@ -11,7 +11,6 @@ require (
github.com/cisco-open/k8s-objectmatcher v1.9.0
github.com/ettle/strcase v0.2.0
github.com/huandu/xstrings v1.4.0
github.com/jinzhu/copier v0.4.0
github.com/mitchellh/hashstructure/v2 v2.0.2
github.com/onsi/ginkgo/v2 v2.19.0
github.com/onsi/gomega v1.33.1
......@@ -20,6 +19,7 @@ require (
github.com/rs/xid v1.4.0
github.com/sergeymakinen/go-quote v1.1.0
github.com/sirupsen/logrus v1.9.3
go.etcd.io/etcd/client/v3 v3.5.14
gopkg.in/yaml.v2 v2.4.0
istio.io/api v1.23.1
istio.io/client-go v1.23.1
......@@ -36,6 +36,8 @@ require (
require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/coreos/go-semver v0.3.1 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/distribution/reference v0.6.0 // indirect
github.com/docker/distribution v2.8.3+incompatible // indirect
......@@ -76,6 +78,8 @@ require (
github.com/prometheus/procfs v0.15.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/x448/float16 v0.8.4 // indirect
go.etcd.io/etcd/api/v3 v3.5.14 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.14 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.26.0 // indirect
golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc // indirect
......@@ -88,6 +92,8 @@ require (
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 // indirect
google.golang.org/grpc v1.65.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
......
......@@ -10,6 +10,10 @@ github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UF
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cisco-open/k8s-objectmatcher v1.9.0 h1:/sfuO0BD09fpynZjXsqeZrh28Juc4VEwc2P6Ov/Q6fM=
github.com/cisco-open/k8s-objectmatcher v1.9.0/go.mod h1:CH4E6qAK+q+JwKFJn0DaTNqxrbmWCaDQzGthKLK4nZ0=
github.com/coreos/go-semver v0.3.1 h1:yi21YpKnrx1gt5R+la8n5WgS0kCrsPp33dmEyHReZr4=
github.com/coreos/go-semver v0.3.1/go.mod h1:irMmmIw/7yzSRPWryHsK7EYSg09caPQL03VsM8rvUec=
github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8iXXhfZs=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
......@@ -47,6 +51,7 @@ github.com/go-openapi/swag v0.22.8 h1:/9RjDSQ0vbFR+NyjGMkFTsA1IA0fmhKSThmfGZjicb
github.com/go-openapi/swag v0.22.8/go.mod h1:6QT22icPLEqAM/z/TChgb4WAveCHF92+2gF0CNjHpPI=
github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI=
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=
......@@ -71,8 +76,6 @@ github.com/huandu/xstrings v1.4.0 h1:D17IlohoQq4UcpqD7fDk80P7l+lwAmlFaBHgOipl2FU
github.com/huandu/xstrings v1.4.0/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE=
github.com/imdario/mergo v0.3.13 h1:lFzP57bqS/wsqKssCGmtLAb8A0wKjLGrve2q3PPVcBk=
github.com/imdario/mergo v0.3.13/go.mod h1:4lJ1jqUDcsbIECGy0RUJAXNIhg+6ocWgb1ALK2O4oXg=
github.com/jinzhu/copier v0.4.0 h1:w3ciUoD19shMCRargcpm0cm91ytaBhDvuRpz1ODO/U8=
github.com/jinzhu/copier v0.4.0/go.mod h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
......@@ -146,6 +149,12 @@ github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.etcd.io/etcd/api/v3 v3.5.14 h1:vHObSCxyB9zlF60w7qzAdTcGaglbJOpSj1Xj9+WGxq0=
go.etcd.io/etcd/api/v3 v3.5.14/go.mod h1:BmtWcRlQvwa1h3G2jvKYwIQy4PkHlDej5t7uLMUdJUU=
go.etcd.io/etcd/client/pkg/v3 v3.5.14 h1:SaNH6Y+rVEdxfpA2Jr5wkEvN6Zykme5+YnbCkxvuWxQ=
go.etcd.io/etcd/client/pkg/v3 v3.5.14/go.mod h1:8uMgAokyG1czCtIdsq+AGyYQMvpIKnSvPjFMunkgeZI=
go.etcd.io/etcd/client/v3 v3.5.14 h1:CWfRs4FDaDoSz81giL7zPpZH2Z35tbOrAJkkjMqOupg=
go.etcd.io/etcd/client/v3 v3.5.14/go.mod h1:k3XfdV/VIHy/97rqWjoUzrj9tk7GgJGH9J8L4dNXmAk=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
......@@ -200,6 +209,10 @@ gomodules.xyz/jsonpatch/v2 v2.4.0 h1:Ci3iUJyx9UeRx7CeFN8ARgGbkESwJK+KB9lLcWxY/Zw
gomodules.xyz/jsonpatch/v2 v2.4.0/go.mod h1:AH3dM2RI6uoBZxn3LVrfvJ3E0/9dG4cSrbuBJT4moAY=
google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157 h1:7whR9kGa5LUwFtpLm2ArCEejtnxlGeLbAyjFY8sGNFw=
google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157/go.mod h1:99sLkeliLXfdj2J75X3Ho+rrVCaJze0uwN7zDDkjPVU=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 h1:BwIjyKYGsK9dMCBOorzRri8MQwmi7mT9rGHsCEinZkA=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY=
google.golang.org/grpc v1.65.0 h1:bs/cUb4lp1G5iImFFd3u5ixQzweKizoZJAwBNLR42lc=
google.golang.org/grpc v1.65.0/go.mod h1:WgYC2ypjlB0EiQi6wdKixMqukr6lBc0Vo+oOgjrM5ZQ=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
......
......@@ -46,6 +46,10 @@ const (
PendingState = "pending"
)
type etcdStorage interface {
DeleteKeys(ctx context.Context, prefix string) error
}
// DynamoDeploymentReconciler reconciles a DynamoDeployment object
type DynamoDeploymentReconciler struct {
client.Client
......@@ -106,6 +110,15 @@ func (r *DynamoDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Req
logger.Info("Reconciliation done")
}()
deleted, err := commonController.HandleFinalizer(ctx, dynamoDeployment, r.Client, r)
if err != nil {
reason = "failed_to_handle_the_finalizer"
return ctrl.Result{}, err
}
if deleted {
return ctrl.Result{}, nil
}
// fetch the DynamoNIMConfig
dynamoNIMConfig, err := nim.GetDynamoNIMConfig(ctx, dynamoDeployment, r.Recorder)
if err != nil {
......@@ -114,7 +127,7 @@ func (r *DynamoDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Req
}
// generate the DynamoNimDeployments from the config
dynamoNimDeployments, err := nim.GenerateDynamoNIMDeployments(dynamoDeployment, dynamoNIMConfig)
dynamoNimDeployments, err := nim.GenerateDynamoNIMDeployments(ctx, dynamoDeployment, dynamoNIMConfig)
if err != nil {
reason = "failed_to_generate_the_DynamoNimDeployments"
return ctrl.Result{}, err
......@@ -123,7 +136,7 @@ func (r *DynamoDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Req
// merge the DynamoNimDeployments with the DynamoNimDeployments from the CRD
for serviceName, deployment := range dynamoNimDeployments {
if _, ok := dynamoDeployment.Spec.Services[serviceName]; ok {
err := mergo.Merge(deployment, dynamoDeployment.Spec.Services[serviceName], mergo.WithOverride)
err := mergo.Merge(&deployment.Spec.DynamoNimDeploymentSharedSpec, dynamoDeployment.Spec.Services[serviceName].DynamoNimDeploymentSharedSpec, mergo.WithOverride)
if err != nil {
reason = "failed_to_merge_the_DynamoNimDeployments"
return ctrl.Result{}, err
......@@ -211,6 +224,11 @@ func mergeEnvs(common, specific []corev1.EnvVar) []corev1.EnvVar {
return merged
}
func (r *DynamoDeploymentReconciler) FinalizeResource(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoDeployment) error {
// for now doing nothing
return nil
}
// SetupWithManager sets up the controller with the Manager.
func (r *DynamoDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
......
......@@ -42,6 +42,7 @@ import (
commonconfig "github.com/ai-dynamo/dynamo/deploy/dynamo/operator/internal/config"
commonconsts "github.com/ai-dynamo/dynamo/deploy/dynamo/operator/internal/consts"
"github.com/ai-dynamo/dynamo/deploy/dynamo/operator/internal/controller_common"
commonController "github.com/ai-dynamo/dynamo/deploy/dynamo/operator/internal/controller_common"
"github.com/ai-dynamo/dynamo/deploy/dynamo/operator/internal/envoy"
"github.com/cisco-open/k8s-objectmatcher/patch"
"github.com/huandu/xstrings"
......@@ -92,11 +93,12 @@ var ServicePortHTTPNonProxy = commonconsts.BentoServicePort + 1
// DynamoNimDeploymentReconciler reconciles a DynamoNimDeployment object
type DynamoNimDeploymentReconciler struct {
client.Client
Scheme *runtime.Scheme
Recorder record.EventRecorder
Config controller_common.Config
NatsAddr string
EtcdAddr string
Scheme *runtime.Scheme
Recorder record.EventRecorder
Config controller_common.Config
NatsAddr string
EtcdAddr string
EtcdStorage etcdStorage
}
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamonimdeployments,verbs=get;list;watch;create;update;patch;delete
......@@ -147,6 +149,15 @@ func (r *DynamoNimDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.
logs = logs.WithValues("dynamoNimDeployment", dynamoNimDeployment.Name, "namespace", dynamoNimDeployment.Namespace)
deleted, err := commonController.HandleFinalizer(ctx, dynamoNimDeployment, r.Client, r)
if err != nil {
logs.Error(err, "Failed to handle finalizer")
return ctrl.Result{}, err
}
if deleted {
return ctrl.Result{}, nil
}
if len(dynamoNimDeployment.Status.Conditions) == 0 {
logs.Info("Starting to reconcile DynamoNimDeployment")
logs.Info("Initializing DynamoNimDeployment status")
......@@ -391,6 +402,20 @@ func (r *DynamoNimDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.
return
}
func (r *DynamoNimDeploymentReconciler) FinalizeResource(ctx context.Context, dynamoNimDeployment *v1alpha1.DynamoNimDeployment) error {
logger := log.FromContext(ctx)
logger.Info("Finalizing the DynamoNimDeployment", "dynamoNimDeployment", dynamoNimDeployment)
if dynamoNimDeployment.Spec.ServiceName != "" && dynamoNimDeployment.Spec.DynamoNamespace != nil && *dynamoNimDeployment.Spec.DynamoNamespace != "" {
logger.Info("Deleting the etcd keys for the service", "service", dynamoNimDeployment.Spec.ServiceName, "dynamoNamespace", *dynamoNimDeployment.Spec.DynamoNamespace)
err := r.EtcdStorage.DeleteKeys(ctx, fmt.Sprintf("/%s/components/%s", *dynamoNimDeployment.Spec.DynamoNamespace, dynamoNimDeployment.Spec.ServiceName))
if err != nil {
logger.Error(err, "Failed to delete the etcd keys for the service", "service", dynamoNimDeployment.Spec.ServiceName, "dynamoNamespace", *dynamoNimDeployment.Spec.DynamoNamespace)
return err
}
}
return nil
}
func (r *DynamoNimDeploymentReconciler) computeAvailableStatusCondition(ctx context.Context, req ctrl.Request, deployment *appsv1.Deployment) error {
logs := log.FromContext(ctx)
if IsDeploymentReady(deployment) {
......@@ -1225,6 +1250,9 @@ monitoring.options.insecure=true`
if opt.dynamoNimDeployment.Spec.ServiceName != "" {
args = append(args, []string{"--service-name", opt.dynamoNimDeployment.Spec.ServiceName}...)
args = append(args, opt.dynamoNimDeployment.Spec.DynamoTag)
if opt.dynamoNimDeployment.Spec.DynamoNamespace != nil && *opt.dynamoNimDeployment.Spec.DynamoNamespace != "" {
args = append(args, fmt.Sprintf("--%s.ServiceArgs.dynamo.namespace=%s", opt.dynamoNimDeployment.Spec.ServiceName, *opt.dynamoNimDeployment.Spec.DynamoNamespace))
}
}
if len(opt.dynamoNimDeployment.Spec.Envs) > 0 {
......
......@@ -20,8 +20,11 @@
package controller
import (
"context"
"fmt"
"testing"
"github.com/ai-dynamo/dynamo/deploy/dynamo/operator/api/v1alpha1"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
......@@ -144,3 +147,85 @@ func TestIsDeploymentReady(t *testing.T) {
})
}
}
type mockEtcdStorage struct {
deleteKeysFunc func(ctx context.Context, prefix string) error
}
func (m *mockEtcdStorage) DeleteKeys(ctx context.Context, prefix string) error {
return m.deleteKeysFunc(ctx, prefix)
}
func TestDynamoNimDeploymentReconciler_FinalizeResource(t *testing.T) {
type fields struct {
EtcdStorage etcdStorage
}
type args struct {
ctx context.Context
dynamoNimDeployment *v1alpha1.DynamoNimDeployment
}
tests := []struct {
name string
fields fields
args args
wantErr bool
}{
{
name: "delete etcd keys",
fields: fields{
EtcdStorage: &mockEtcdStorage{
deleteKeysFunc: func(ctx context.Context, prefix string) error {
if prefix == "/default/components/service1" {
return nil
}
return fmt.Errorf("invalid prefix: %s", prefix)
},
},
},
args: args{
ctx: context.Background(),
dynamoNimDeployment: &v1alpha1.DynamoNimDeployment{
Spec: v1alpha1.DynamoNimDeploymentSpec{
DynamoNimDeploymentSharedSpec: v1alpha1.DynamoNimDeploymentSharedSpec{
ServiceName: "service1",
DynamoNamespace: &[]string{"default"}[0],
},
},
},
},
wantErr: false,
},
{
name: "delete etcd keys (error)",
fields: fields{
EtcdStorage: &mockEtcdStorage{
deleteKeysFunc: func(ctx context.Context, prefix string) error {
return fmt.Errorf("invalid prefix: %s", prefix)
},
},
},
args: args{
ctx: context.Background(),
dynamoNimDeployment: &v1alpha1.DynamoNimDeployment{
Spec: v1alpha1.DynamoNimDeploymentSpec{
DynamoNimDeploymentSharedSpec: v1alpha1.DynamoNimDeploymentSharedSpec{
ServiceName: "service1",
DynamoNamespace: &[]string{"default"}[0],
},
},
},
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := &DynamoNimDeploymentReconciler{
EtcdStorage: tt.fields.EtcdStorage,
}
if err := r.FinalizeResource(tt.args.ctx, tt.args.dynamoNimDeployment); (err != nil) != tt.wantErr {
t.Errorf("DynamoNimDeploymentReconciler.FinalizeResource() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
package controller_common
import (
"context"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"
)
const (
finalizerName = "nvidia.com/finalizer"
)
func AddFinalizer(obj client.Object) {
controllerutil.AddFinalizer(obj, finalizerName)
}
func RemoveFinalizer(obj client.Object) {
controllerutil.RemoveFinalizer(obj, finalizerName)
}
func ContainsFinalizer(obj client.Object) bool {
return controllerutil.ContainsFinalizer(obj, finalizerName)
}
type Finalizer[T client.Object] interface {
FinalizeResource(ctx context.Context, obj T) error
}
func HandleFinalizer[T client.Object](ctx context.Context, obj T, writer client.Writer, finalizer Finalizer[T]) (bool, error) {
logger := log.FromContext(ctx)
// Check if the CR is being deleted
if obj.GetDeletionTimestamp().IsZero() {
// object is not being deleted, add the finalizer if it doesn't exist
if !ContainsFinalizer(obj) {
logger.Info("Adding finalizer to object", "resourceVersion", obj.GetResourceVersion())
AddFinalizer(obj)
err := writer.Update(ctx, obj)
if err != nil {
logger.Error(err, "Failed to add finalizer")
return false, err
}
logger.Info("Finalizer added to object", "resourceVersion", obj.GetResourceVersion())
}
} else {
// object is being deleted, if the finalizer exists, call the finalizer and remove the finalizer
if ContainsFinalizer(obj) {
logger.Info("Calling finalizer", "resourceVersion", obj.GetResourceVersion())
err := finalizer.FinalizeResource(ctx, obj)
if err != nil {
logger.Error(err, "Failed to call finalizer")
return false, err
}
logger.Info("Removing finalizer from object", "resourceVersion", obj.GetResourceVersion())
RemoveFinalizer(obj)
err = writer.Update(ctx, obj)
if err != nil {
logger.Error(err, "Failed to remove finalizer")
return false, err
}
logger.Info("Finalizer removed from object", "resourceVersion", obj.GetResourceVersion())
return true, nil
}
}
return false, nil
}
package controller_common
import (
"context"
"errors"
"testing"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)
// finalize mock object
type FinalizerMock[T client.Object] struct {
FinalizeResourceFunc func(ctx context.Context, obj T) error
}
func (f *FinalizerMock[T]) FinalizeResource(ctx context.Context, obj T) error {
return f.FinalizeResourceFunc(ctx, obj)
}
// client.Object mock object
type ObjectMock struct {
client.Object
GetDeletionTimestampFunc func() *metav1.Time
GetResourceVersionFunc func() string
GetFinalizersFunc func() []string
}
func (o *ObjectMock) GetDeletionTimestamp() *metav1.Time {
return o.GetDeletionTimestampFunc()
}
func (o *ObjectMock) GetFinalizers() []string {
return o.GetFinalizersFunc()
}
func (o *ObjectMock) SetFinalizers(finalizers []string) {
}
func (o *ObjectMock) GetResourceVersion() string {
return o.GetResourceVersionFunc()
}
// writer mock object
type WriterMock struct {
client.Writer
UpdateFunc func(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error
}
func (w *WriterMock) Update(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error {
return w.UpdateFunc(ctx, obj, opts...)
}
func TestHandleFinalizer(t *testing.T) {
type args struct {
ctx context.Context
obj client.Object
writer client.Writer
finalizer Finalizer[client.Object]
}
tests := []struct {
name string
args args
want bool
wantErr bool
}{
{
name: "deleted object with finalizer - nominal case",
args: args{
ctx: context.Background(),
obj: &ObjectMock{
GetDeletionTimestampFunc: func() *metav1.Time {
return &metav1.Time{Time: time.Now()}
},
GetFinalizersFunc: func() []string {
return []string{finalizerName}
},
GetResourceVersionFunc: func() string {
return "1"
},
},
writer: &WriterMock{
UpdateFunc: func(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error {
return nil
},
},
finalizer: &FinalizerMock[client.Object]{
FinalizeResourceFunc: func(ctx context.Context, obj client.Object) error {
return nil
},
},
},
want: true,
wantErr: false,
},
{
name: "deleted object with finalizer - object update error",
args: args{
ctx: context.Background(),
obj: &ObjectMock{
GetDeletionTimestampFunc: func() *metav1.Time {
return &metav1.Time{Time: time.Now()}
},
GetFinalizersFunc: func() []string {
return []string{finalizerName}
},
GetResourceVersionFunc: func() string {
return "1"
},
},
writer: &WriterMock{
UpdateFunc: func(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error {
return errors.New("update error")
},
},
finalizer: &FinalizerMock[client.Object]{
FinalizeResourceFunc: func(ctx context.Context, obj client.Object) error {
return nil
},
},
},
want: false,
wantErr: true,
},
{
name: "deleted object with finalizer - finalize error",
args: args{
ctx: context.Background(),
obj: &ObjectMock{
GetDeletionTimestampFunc: func() *metav1.Time {
return &metav1.Time{Time: time.Now()}
},
GetFinalizersFunc: func() []string {
return []string{finalizerName}
},
GetResourceVersionFunc: func() string {
return "1"
},
},
finalizer: &FinalizerMock[client.Object]{
FinalizeResourceFunc: func(ctx context.Context, obj client.Object) error {
return errors.New("finalize error")
},
},
},
want: false,
wantErr: true,
},
{
name: "non deleted object without finalizer - nominal case",
args: args{
ctx: context.Background(),
obj: &ObjectMock{
GetDeletionTimestampFunc: func() *metav1.Time {
return nil
},
GetFinalizersFunc: func() []string {
return []string{}
},
GetResourceVersionFunc: func() string {
return "1"
},
},
writer: &WriterMock{
UpdateFunc: func(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error {
return nil
},
},
},
want: false,
wantErr: false,
},
{
name: "non deleted object without finalizer - update error",
args: args{
ctx: context.Background(),
obj: &ObjectMock{
GetDeletionTimestampFunc: func() *metav1.Time {
return nil
},
GetFinalizersFunc: func() []string {
return []string{}
},
GetResourceVersionFunc: func() string {
return "1"
},
},
writer: &WriterMock{
UpdateFunc: func(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error {
return errors.New("update error")
},
},
},
want: false,
wantErr: true,
},
{
name: "non deleted object with finalizer - nominal case",
args: args{
ctx: context.Background(),
obj: &ObjectMock{
GetDeletionTimestampFunc: func() *metav1.Time {
return nil
},
GetFinalizersFunc: func() []string {
return []string{finalizerName}
},
},
},
want: false,
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := HandleFinalizer(tt.args.ctx, tt.args.obj, tt.args.writer, tt.args.finalizer)
if (err != nil) != tt.wantErr {
t.Errorf("HandleFinalizer() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got != tt.want {
t.Errorf("HandleFinalizer() = %v, want %v", got, tt.want)
}
})
}
}
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package etcd
import (
"context"
clientv3 "go.etcd.io/etcd/client/v3"
)
type Storage struct {
etcdClient *clientv3.Client
}
func NewStorage(etcdClient *clientv3.Client) *Storage {
return &Storage{etcdClient: etcdClient}
}
func (s *Storage) DeleteKeys(ctx context.Context, prefix string) error {
_, err := s.etcdClient.Delete(ctx, prefix, clientv3.WithPrefix())
return err
}
......@@ -78,6 +78,17 @@ type ServiceConfig struct {
Config Config `yaml:"config"`
}
func (s ServiceConfig) GetNamespace() *string {
if s.Config.Dynamo == nil || s.Config.Dynamo.Namespace == "" {
return nil
}
return &s.Config.Dynamo.Namespace
}
func GetDefaultDynamoNamespace(ctx context.Context, dynamoDeployment *v1alpha1.DynamoDeployment) string {
return fmt.Sprintf("dynamo-%s", dynamoDeployment.Name)
}
func RetrieveDynamoNimDownloadURL(ctx context.Context, dynamoDeployment *v1alpha1.DynamoDeployment, recorder EventRecorder) (*string, *string, error) {
dynamoNimDownloadURL := ""
dynamoNimApiToken := ""
......@@ -222,7 +233,7 @@ func GetDynamoNIMConfig(ctx context.Context, dynamoDeployment *v1alpha1.DynamoDe
}
// generate DynamoNIMDeployment from config
func GenerateDynamoNIMDeployments(parentDynamoDeployment *v1alpha1.DynamoDeployment, config *DynamoNIMConfig) (map[string]*v1alpha1.DynamoNimDeployment, error) {
func GenerateDynamoNIMDeployments(ctx context.Context, parentDynamoDeployment *v1alpha1.DynamoDeployment, config *DynamoNIMConfig) (map[string]*v1alpha1.DynamoNimDeployment, error) {
dynamoServices := make(map[string]string)
deployments := make(map[string]*v1alpha1.DynamoNimDeployment)
for _, service := range config.Services {
......@@ -233,7 +244,13 @@ func GenerateDynamoNIMDeployments(parentDynamoDeployment *v1alpha1.DynamoDeploym
deployment.Spec.DynamoNim = strings.ReplaceAll(parentDynamoDeployment.Spec.DynamoNim, ":", "--")
deployment.Spec.ServiceName = service.Name
if service.Config.Dynamo != nil && service.Config.Dynamo.Enabled {
dynamoServices[service.Name] = fmt.Sprintf("%s/%s", service.Config.Dynamo.Name, service.Config.Dynamo.Namespace)
dynamoNamespace := service.Config.Dynamo.Namespace
if dynamoNamespace == "" {
// if no namespace is specified, use the default namespace
dynamoNamespace = GetDefaultDynamoNamespace(ctx, parentDynamoDeployment)
}
deployment.Spec.DynamoNamespace = &dynamoNamespace
dynamoServices[service.Name] = fmt.Sprintf("%s/%s", service.Config.Dynamo.Name, dynamoNamespace)
} else {
// dynamo is not enabled
if config.EntryService == service.Name {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment