main.go 24 KB
Newer Older
1
/*
2
 * SPDX-FileCopyrightText: Copyright (c) 2022 Atalaya Tech. Inc
3
 * SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
4
5
6
7
8
9
10
11
12
13
14
15
16
 * SPDX-License-Identifier: Apache-2.0
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
17
 * Modifications Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES
18
19
20
21
22
 */

package main

import (
23
	"context"
24
25
	"crypto/tls"
	"flag"
26
	"fmt"
27
	"os"
28
	"time"
29
30
31

	// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
	// to ensure that exec-entrypoint and run can make use of them.
32

33
	corev1 "k8s.io/api/core/v1"
34
	apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
35
36
	"k8s.io/client-go/discovery/cached/memory"
	"k8s.io/client-go/dynamic"
37
38
	"k8s.io/client-go/informers"
	"k8s.io/client-go/kubernetes"
39
	_ "k8s.io/client-go/plugin/pkg/client/auth"
40
41
	"k8s.io/client-go/restmapper"
	"k8s.io/client-go/scale"
42
	k8sCache "k8s.io/client-go/tools/cache"
43
	"k8s.io/utils/ptr"
44
45
	"sigs.k8s.io/controller-runtime/pkg/cache"

46
47
	k8sruntime "k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/runtime/serializer"
48
49
50
51
52
53
54
55
	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
	clientgoscheme "k8s.io/client-go/kubernetes/scheme"
	ctrl "sigs.k8s.io/controller-runtime"
	"sigs.k8s.io/controller-runtime/pkg/healthz"
	"sigs.k8s.io/controller-runtime/pkg/log/zap"
	metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
	"sigs.k8s.io/controller-runtime/pkg/webhook"

56
57
58
	lwsscheme "sigs.k8s.io/lws/client-go/clientset/versioned/scheme"
	volcanoscheme "volcano.sh/apis/pkg/client/clientset/versioned/scheme"

59
	semver "github.com/Masterminds/semver/v3"
60
61
	configv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/config/v1alpha1"
	configvalidation "github.com/ai-dynamo/dynamo/deploy/operator/api/config/validation"
62
	nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
63
	nvidiacomv1beta1 "github.com/ai-dynamo/dynamo/deploy/operator/api/v1beta1"
64
65
66
67
	"github.com/ai-dynamo/dynamo/deploy/operator/internal/controller"
	commonController "github.com/ai-dynamo/dynamo/deploy/operator/internal/controller_common"
	"github.com/ai-dynamo/dynamo/deploy/operator/internal/modelendpoint"
	"github.com/ai-dynamo/dynamo/deploy/operator/internal/namespace_scope"
68
	"github.com/ai-dynamo/dynamo/deploy/operator/internal/observability"
69
70
71
72
	"github.com/ai-dynamo/dynamo/deploy/operator/internal/rbac"
	"github.com/ai-dynamo/dynamo/deploy/operator/internal/secret"
	"github.com/ai-dynamo/dynamo/deploy/operator/internal/secrets"
	internalwebhook "github.com/ai-dynamo/dynamo/deploy/operator/internal/webhook"
73
	webhookdefaulting "github.com/ai-dynamo/dynamo/deploy/operator/internal/webhook/defaulting"
74
	webhookvalidation "github.com/ai-dynamo/dynamo/deploy/operator/internal/webhook/validation"
75
	grovev1alpha1 "github.com/ai-dynamo/grove/operator/api/core/v1alpha1"
76
	istioclientsetscheme "istio.io/client-go/pkg/clientset/versioned/scheme"
77
	gaiev1 "sigs.k8s.io/gateway-api-inference-extension/api/v1"
78
79
80
81
	//+kubebuilder:scaffold:imports
)

var (
82
83
84
	crdScheme    = k8sruntime.NewScheme()
	setupLog     = ctrl.Log.WithName("setup")
	configScheme = k8sruntime.NewScheme()
85
86
)

87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
// LoadAndValidateOperatorConfig loads the operator configuration from a file,
// applies defaults via the scheme, and validates it.
func LoadAndValidateOperatorConfig(path string) (*configv1alpha1.OperatorConfiguration, error) {
	data, err := os.ReadFile(path)
	if err != nil {
		return nil, fmt.Errorf("failed to read config file %s: %w", path, err)
	}

	codecFactory := serializer.NewCodecFactory(configScheme)
	cfg := &configv1alpha1.OperatorConfiguration{}
	if err := k8sruntime.DecodeInto(codecFactory.UniversalDecoder(), data, cfg); err != nil {
		return nil, fmt.Errorf("failed to decode config file %s: %w", path, err)
	}

	// Validate the configuration
	if errs := configvalidation.ValidateOperatorConfiguration(cfg); len(errs) > 0 {
		return nil, fmt.Errorf("config validation failed: %s", errs.ToAggregate().Error())
	}

	return cfg, nil
}

109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
func createScalesGetter(mgr ctrl.Manager) (scale.ScalesGetter, error) {
	config := mgr.GetConfig()

	// Create kubernetes client for discovery
	kubeClient, err := kubernetes.NewForConfig(config)
	if err != nil {
		return nil, err
	}

	// Create cached discovery client
	cachedDiscovery := memory.NewMemCacheClient(kubeClient.Discovery())

	// Create REST mapper
	restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cachedDiscovery)

	scalesGetter, err := scale.NewForConfig(
		config,
		restMapper,
		dynamic.LegacyAPIPathResolverFunc,
		scale.NewDiscoveryScaleKindResolver(cachedDiscovery),
	)
	if err != nil {
		return nil, err
	}

	return scalesGetter, nil
}

137
138
139
140
func initCRDSchemes() {
	utilruntime.Must(clientgoscheme.AddToScheme(crdScheme))

	utilruntime.Must(nvidiacomv1alpha1.AddToScheme(crdScheme))
141

142
	utilruntime.Must(nvidiacomv1beta1.AddToScheme(crdScheme))
143

144
	utilruntime.Must(lwsscheme.AddToScheme(crdScheme))
145

146
	utilruntime.Must(volcanoscheme.AddToScheme(crdScheme))
147

148
	utilruntime.Must(grovev1alpha1.AddToScheme(crdScheme))
149

150
	utilruntime.Must(apiextensionsv1.AddToScheme(crdScheme))
151

152
	utilruntime.Must(istioclientsetscheme.AddToScheme(crdScheme))
153

154
	utilruntime.Must(gaiev1.Install(crdScheme))
155
156
157
	//+kubebuilder:scaffold:scheme
}

158
159
160
161
func initConfigScheme() {
	utilruntime.Must(configv1alpha1.AddToScheme(configScheme))
}

162
//nolint:gocyclo
163
func main() {
164
165
166
167
	initCRDSchemes()
	initConfigScheme()

	var configFile string
168
	var operatorVersion string
169
	flag.StringVar(&configFile, "config", "", "Path to operator configuration file (required)")
170
171
	flag.StringVar(&operatorVersion, "operator-version", "unknown",
		"Version of the operator (used in lease holder identity)")
172
173
174
175
176
	opts := zap.Options{
		Development: true,
	}
	opts.BindFlags(flag.CommandLine)
	flag.Parse()
177
	ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))
178

179
180
	if configFile == "" {
		setupLog.Error(nil, "--config flag is required")
181
182
183
		os.Exit(1)
	}

184
185
186
187
	// Load, default, and validate operator configuration
	operatorCfg, err := LoadAndValidateOperatorConfig(configFile)
	if err != nil {
		setupLog.Error(err, "failed to load operator configuration", "configFile", configFile)
188
189
		os.Exit(1)
	}
190
	setupLog.Info("Operator configuration loaded successfully", "configFile", configFile)
191

192
193
194
195
	// Validate and normalize operator version to semver
	if _, err := semver.NewVersion(operatorVersion); err != nil {
		setupLog.Error(err, "operator-version is not valid semver",
			"provided", operatorVersion, "error", err.Error())
196
197
		os.Exit(1)
	}
198
	setupLog.Info("Operator version configured", "version", operatorVersion)
199

200
201
	// Initialize runtime config (will be populated after detection)
	runtimeConfig := &commonController.RuntimeConfig{}
202

203
	mainCtx := ctrl.SetupSignalHandler()
204
205
206
207
208
209
210
211
212
213
214
215
216

	// if the enable-http2 flag is false (the default), http/2 should be disabled
	// due to its vulnerabilities. More specifically, disabling http/2 will
	// prevent from being vulnerable to the HTTP/2 Stream Cancellation and
	// Rapid Reset CVEs. For more information see:
	// - https://github.com/advisories/GHSA-qppj-fm5r-hxr3
	// - https://github.com/advisories/GHSA-4374-p667-p6c8
	disableHTTP2 := func(c *tls.Config) {
		setupLog.Info("disabling http/2")
		c.NextProtos = []string{"http/1.1"}
	}

	tlsOpts := []func(*tls.Config){}
217
	if !operatorCfg.Security.EnableHTTP2 {
218
219
220
221
		tlsOpts = append(tlsOpts, disableHTTP2)
	}

	webhookServer := webhook.NewServer(webhook.Options{
222
223
224
		Host:    operatorCfg.Server.Webhook.Host,
		Port:    operatorCfg.Server.Webhook.Port,
		CertDir: operatorCfg.Server.Webhook.CertDir,
225
226
227
		TLSOpts: tlsOpts,
	})

228
229
230
231
232
	metricsBindAddr := fmt.Sprintf("%s:%d", operatorCfg.Server.Metrics.BindAddress, operatorCfg.Server.Metrics.Port)
	healthProbeAddr := fmt.Sprintf(
		"%s:%d", operatorCfg.Server.HealthProbe.BindAddress, operatorCfg.Server.HealthProbe.Port,
	)

233
	mgrOpts := ctrl.Options{
234
		Scheme: crdScheme,
235
		Metrics: metricsserver.Options{
236
237
			BindAddress:   metricsBindAddr,
			SecureServing: operatorCfg.Server.Metrics.Secure,
238
239
			TLSOpts:       tlsOpts,
		},
240
		WebhookServer:           webhookServer,
241
242
243
244
		HealthProbeBindAddress:  healthProbeAddr,
		LeaderElection:          operatorCfg.LeaderElection.Enabled,
		LeaderElectionID:        operatorCfg.LeaderElection.ID,
		LeaderElectionNamespace: operatorCfg.LeaderElection.Namespace,
245
	}
246
247

	restrictedNamespace := operatorCfg.Namespace.Restricted
248
249
250
251
	if restrictedNamespace != "" {
		mgrOpts.Cache.DefaultNamespaces = map[string]cache.Config{
			restrictedNamespace: {},
		}
252
253
254
		setupLog.Info("Restricted namespace configured, launching in restricted mode", "namespace", restrictedNamespace)
	} else {
		setupLog.Info("No restricted namespace configured, launching in cluster-wide mode")
255
256
257
258
259
260
261
	}
	mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), mgrOpts)
	if err != nil {
		setupLog.Error(err, "unable to start manager")
		os.Exit(1)
	}

262
263
264
265
	// Initialize observability metrics
	setupLog.Info("Initializing observability metrics")
	observability.InitMetrics()

266
267
268
269
270
271
272
273
	// Initialize namespace scope mechanism
	var leaseManager *namespace_scope.LeaseManager
	var leaseWatcher *namespace_scope.LeaseWatcher

	if restrictedNamespace != "" {
		// Namespace-restricted mode: Create and maintain namespace scope marker lease
		setupLog.Info("Creating namespace scope marker lease manager",
			"namespace", restrictedNamespace,
274
275
			"leaseDuration", operatorCfg.Namespace.Scope.LeaseDuration.Duration,
			"renewInterval", operatorCfg.Namespace.Scope.LeaseRenewInterval.Duration)
276
277
278
279
280

		leaseManager, err = namespace_scope.NewLeaseManager(
			mgr.GetConfig(),
			restrictedNamespace,
			operatorVersion,
281
282
			operatorCfg.Namespace.Scope.LeaseDuration.Duration,
			operatorCfg.Namespace.Scope.LeaseRenewInterval.Duration,
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
		)
		if err != nil {
			setupLog.Error(err, "unable to create namespace scope marker lease manager")
			os.Exit(1)
		}

		// Start the lease manager
		if err = leaseManager.Start(mainCtx); err != nil {
			setupLog.Error(err, "unable to start namespace scope marker lease manager")
			os.Exit(1)
		}

		// Monitor for fatal lease errors
		// If lease renewal fails repeatedly, we must exit to prevent split-brain
		go func() {
			select {
			case err := <-leaseManager.Errors():
				setupLog.Error(err, "FATAL: Lease manager encountered unrecoverable error, shutting down to prevent split-brain")
				os.Exit(1)
			case <-mainCtx.Done():
				// Normal shutdown, error channel monitoring no longer needed
				return
			}
		}()

		// Ensure lease is released on shutdown
		defer func() {
			shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
			defer cancel()
			if err := leaseManager.Stop(shutdownCtx); err != nil {
				setupLog.Error(err, "failed to stop lease manager cleanly")
			}
		}()

		setupLog.Info("Namespace scope marker lease manager started successfully")
	} else {
		// Cluster-wide mode: Watch for namespace scope marker leases
		setupLog.Info("Setting up namespace scope marker lease watcher for cluster-wide mode")

		leaseWatcher, err = namespace_scope.NewLeaseWatcher(mgr.GetConfig())
		if err != nil {
			setupLog.Error(err, "unable to create namespace scope marker lease watcher")
			os.Exit(1)
		}

		// Start the lease watcher
		if err = leaseWatcher.Start(mainCtx); err != nil {
			setupLog.Error(err, "unable to start namespace scope marker lease watcher")
			os.Exit(1)
		}

		setupLog.Info("Namespace scope marker lease watcher started successfully")
335

336
337
		// Pass leaseWatcher to runtime config for namespace exclusion filtering
		runtimeConfig.ExcludedNamespaces = leaseWatcher
338
339
	}

340
341
	// Start resource counter background goroutine (after ExcludedNamespaces is set)
	setupLog.Info("Starting resource counter")
342
	go observability.StartResourceCounter(mainCtx, mgr.GetClient(), runtimeConfig.ExcludedNamespaces)
343

344
345
346
347
348
	// Detect orchestrators availability using discovery client.
	// Config overrides (*bool) take precedence over auto-detection:
	//   nil   = auto-detect (backward compatible default)
	//   false = forcibly disabled regardless of API availability
	//   true  = forcibly enabled; hard exit if API is not available (misconfiguration)
349
	setupLog.Info("Detecting Grove availability...")
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
	groveDetected := commonController.DetectGroveAvailability(mainCtx, mgr)
	switch {
	case operatorCfg.Orchestrators.Grove.Enabled == nil:
		runtimeConfig.GroveEnabled = groveDetected
	case *operatorCfg.Orchestrators.Grove.Enabled:
		if !groveDetected {
			setupLog.Error(nil, "Grove is explicitly enabled in config but the Grove API group was not detected in the cluster")
			os.Exit(1)
		}
		runtimeConfig.GroveEnabled = true
	default:
		setupLog.Info("Grove is explicitly disabled via config override")
		runtimeConfig.GroveEnabled = false
	}

365
	setupLog.Info("Detecting LWS availability...")
366
	lwsDetected := commonController.DetectLWSAvailability(mainCtx, mgr)
367
	setupLog.Info("Detecting Volcano availability...")
368
	volcanoDetected := commonController.DetectVolcanoAvailability(mainCtx, mgr)
369
	// LWS for multinode deployment usage depends on both LWS and Volcano availability
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
	switch {
	case operatorCfg.Orchestrators.LWS.Enabled == nil:
		runtimeConfig.LWSEnabled = lwsDetected && volcanoDetected
	case *operatorCfg.Orchestrators.LWS.Enabled:
		if !lwsDetected {
			setupLog.Error(nil, "LWS is explicitly enabled in config but the LWS API group was not detected in the cluster")
			os.Exit(1)
		}
		if !volcanoDetected {
			setupLog.Error(nil, "LWS is explicitly enabled in config but the Volcano API group was not detected in the cluster")
			os.Exit(1)
		}
		runtimeConfig.LWSEnabled = true
	default:
		setupLog.Info("LWS is explicitly disabled via config override")
		runtimeConfig.LWSEnabled = false
	}

388
389
	// Detect Kai-scheduler availability using discovery client
	setupLog.Info("Detecting Kai-scheduler availability...")
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
	kaiSchedulerDetected := commonController.DetectKaiSchedulerAvailability(mainCtx, mgr)
	switch {
	case operatorCfg.Orchestrators.KaiScheduler.Enabled == nil:
		runtimeConfig.KaiSchedulerEnabled = kaiSchedulerDetected
	case *operatorCfg.Orchestrators.KaiScheduler.Enabled:
		if !kaiSchedulerDetected {
			setupLog.Error(nil,
				"Kai-scheduler is explicitly enabled in config but the scheduling.run.ai API group was not detected in the cluster",
			)
			os.Exit(1)
		}
		runtimeConfig.KaiSchedulerEnabled = true
	default:
		setupLog.Info("Kai-scheduler is explicitly disabled via config override")
		runtimeConfig.KaiSchedulerEnabled = false
	}
406

407
	setupLog.Info("Detected orchestrators availability",
408
409
410
411
		"grove", runtimeConfig.GroveEnabled,
		"lws", runtimeConfig.LWSEnabled,
		"volcano", volcanoDetected,
		"kai-scheduler", runtimeConfig.KaiSchedulerEnabled,
412
413
	)

414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
	dockerSecretRetriever := secrets.NewDockerSecretIndexer(mgr.GetClient())
	// refresh whenever a secret is created/deleted/updated
	// Set up informer
	var factory informers.SharedInformerFactory
	if restrictedNamespace == "" {
		factory = informers.NewSharedInformerFactory(kubernetes.NewForConfigOrDie(mgr.GetConfig()), time.Hour*24)
	} else {
		factory = informers.NewFilteredSharedInformerFactory(
			kubernetes.NewForConfigOrDie(mgr.GetConfig()),
			time.Hour*24,
			restrictedNamespace,
			nil,
		)
	}
	secretInformer := factory.Core().V1().Secrets().Informer()
	// Start the informer factory
	go factory.Start(mainCtx.Done())
	// Wait for the initial sync
	if !k8sCache.WaitForCacheSync(mainCtx.Done(), secretInformer.HasSynced) {
		setupLog.Error(nil, "Failed to sync informer cache")
		os.Exit(1)
	}
	setupLog.Info("Secret informer cache synced and ready")
	_, err = secretInformer.AddEventHandler(k8sCache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			secret := obj.(*corev1.Secret)
			if secret.Type == corev1.SecretTypeDockerConfigJson {
				setupLog.Info("refreshing docker secrets index after secret creation...")
				err := dockerSecretRetriever.RefreshIndex(context.Background())
				if err != nil {
					setupLog.Error(err, "unable to refresh docker secrets index after secret creation")
				} else {
					setupLog.Info("docker secrets index refreshed after secret creation")
				}
			}
		},
		UpdateFunc: func(old, new interface{}) {
			newSecret := new.(*corev1.Secret)
			if newSecret.Type == corev1.SecretTypeDockerConfigJson {
				setupLog.Info("refreshing docker secrets index after secret update...")
				err := dockerSecretRetriever.RefreshIndex(context.Background())
				if err != nil {
					setupLog.Error(err, "unable to refresh docker secrets index after secret update")
				} else {
					setupLog.Info("docker secrets index refreshed after secret update")
				}
			}
		},
		DeleteFunc: func(obj interface{}) {
			secret := obj.(*corev1.Secret)
			if secret.Type == corev1.SecretTypeDockerConfigJson {
				setupLog.Info("refreshing docker secrets index after secret deletion...")
				err := dockerSecretRetriever.RefreshIndex(context.Background())
				if err != nil {
					setupLog.Error(err, "unable to refresh docker secrets index after secret deletion")
				} else {
					setupLog.Info("docker secrets index refreshed after secret deletion")
				}
			}
		},
	})
	if err != nil {
		setupLog.Error(err, "unable to add event handler to secret informer")
477
478
		os.Exit(1)
	}
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
	// launch a goroutine to refresh the docker secret indexer in any case every minute
	go func() {
		// Initial refresh
		if err := dockerSecretRetriever.RefreshIndex(context.Background()); err != nil {
			setupLog.Error(err, "initial docker secrets index refresh failed")
		}
		ticker := time.NewTicker(60 * time.Second)
		defer ticker.Stop()
		for {
			select {
			case <-mainCtx.Done():
				return
			case <-ticker.C:
				setupLog.Info("refreshing docker secrets index...")
				if err := dockerSecretRetriever.RefreshIndex(mainCtx); err != nil {
					setupLog.Error(err, "unable to refresh docker secrets index")
				}
				setupLog.Info("docker secrets index refreshed")
			}
		}
	}()
500
501
502
503

	// Create MPI SSH SecretReplicator for cross-namespace secret replication
	mpiSecretReplicator := secret.NewSecretReplicator(
		mgr.GetClient(),
504
505
		operatorCfg.MPI.SSHSecretNamespace,
		operatorCfg.MPI.SSHSecretName,
506
507
	)

508
509
510
	if err = (&controller.DynamoComponentDeploymentReconciler{
		Client:                mgr.GetClient(),
		Recorder:              mgr.GetEventRecorderFor("dynamocomponentdeployment"),
511
512
		Config:                operatorCfg,
		RuntimeConfig:         runtimeConfig,
513
		DockerSecretRetriever: dockerSecretRetriever,
514
	}).SetupWithManager(mgr); err != nil {
515
		setupLog.Error(err, "unable to create controller", "controller", "DynamoComponentDeployment")
516
517
		os.Exit(1)
	}
518
519
520
521
522
523
524
	// Create scale client for Grove resource scaling
	scaleClient, err := createScalesGetter(mgr)
	if err != nil {
		setupLog.Error(err, "unable to create scale client")
		os.Exit(1)
	}

525
526
527
	// Initialize RBAC manager for cross-namespace resource management
	rbacManager := rbac.NewManager(mgr.GetClient())

528
	if err = (&controller.DynamoGraphDeploymentReconciler{
529
530
		Client:                mgr.GetClient(),
		Recorder:              mgr.GetEventRecorderFor("dynamographdeployment"),
531
532
		Config:                operatorCfg,
		RuntimeConfig:         runtimeConfig,
533
		DockerSecretRetriever: dockerSecretRetriever,
534
		ScaleClient:           scaleClient,
535
		MPISecretReplicator:   mpiSecretReplicator,
536
		RBACManager:           rbacManager,
537
	}).SetupWithManager(mgr); err != nil {
538
		setupLog.Error(err, "unable to create controller", "controller", "DynamoGraphDeployment")
539
540
		os.Exit(1)
	}
541

542
	if err = (&controller.DynamoGraphDeploymentScalingAdapterReconciler{
543
544
545
546
547
		Client:        mgr.GetClient(),
		Scheme:        mgr.GetScheme(),
		Recorder:      mgr.GetEventRecorderFor("dgdscalingadapter"),
		Config:        operatorCfg,
		RuntimeConfig: runtimeConfig,
548
549
550
551
552
	}).SetupWithManager(mgr); err != nil {
		setupLog.Error(err, "unable to create controller", "controller", "DGDScalingAdapter")
		os.Exit(1)
	}

553
	if err = (&controller.DynamoGraphDeploymentRequestReconciler{
554
		Client:        mgr.GetClient(),
555
		APIReader:     mgr.GetAPIReader(),
556
557
558
559
		Recorder:      mgr.GetEventRecorderFor("dynamographdeploymentrequest"),
		Config:        operatorCfg,
		RuntimeConfig: runtimeConfig,
		RBACManager:   rbacManager,
560
561
562
563
	}).SetupWithManager(mgr); err != nil {
		setupLog.Error(err, "unable to create controller", "controller", "DynamoGraphDeploymentRequest")
		os.Exit(1)
	}
564
565
566
567
568

	if err = (&controller.DynamoModelReconciler{
		Client:         mgr.GetClient(),
		Recorder:       mgr.GetEventRecorderFor("dynamomodel"),
		EndpointClient: modelendpoint.NewClient(),
569
570
		Config:         operatorCfg,
		RuntimeConfig:  runtimeConfig,
571
572
573
574
	}).SetupWithManager(mgr); err != nil {
		setupLog.Error(err, "unable to create controller", "controller", "DynamoModel")
		os.Exit(1)
	}
575

576
	if err = (&controller.CheckpointReconciler{
577
578
579
580
		Client:        mgr.GetClient(),
		Config:        operatorCfg,
		RuntimeConfig: runtimeConfig,
		Recorder:      mgr.GetEventRecorderFor("checkpoint"),
581
582
583
584
585
	}).SetupWithManager(mgr); err != nil {
		setupLog.Error(err, "unable to create controller", "controller", "DynamoCheckpoint")
		os.Exit(1)
	}

586
	// Configure webhooks with lease-based namespace exclusion
587
	isClusterWide := operatorCfg.Namespace.Restricted == ""
588
589
	if isClusterWide {
		setupLog.Info("Configuring webhooks with lease-based namespace exclusion for cluster-wide mode")
590
		internalwebhook.SetExcludedNamespaces(runtimeConfig.ExcludedNamespaces)
591
592
	} else {
		setupLog.Info("Configuring webhooks for namespace-restricted mode (no lease checking)",
593
			"restrictedNamespace", operatorCfg.Namespace.Restricted)
594
595
		internalwebhook.SetExcludedNamespaces(nil)
	}
596

597
598
	// Register validation webhook handlers
	setupLog.Info("Registering validation webhooks")
599

600
601
602
603
604
	dcdHandler := webhookvalidation.NewDynamoComponentDeploymentHandler()
	if err = dcdHandler.RegisterWithManager(mgr); err != nil {
		setupLog.Error(err, "unable to register webhook", "webhook", "DynamoComponentDeployment")
		os.Exit(1)
	}
605

606
607
608
609
610
	dgdHandler := webhookvalidation.NewDynamoGraphDeploymentHandler(mgr)
	if err = dgdHandler.RegisterWithManager(mgr); err != nil {
		setupLog.Error(err, "unable to register webhook", "webhook", "DynamoGraphDeployment")
		os.Exit(1)
	}
611

612
613
614
615
616
	dmHandler := webhookvalidation.NewDynamoModelHandler()
	if err = dmHandler.RegisterWithManager(mgr); err != nil {
		setupLog.Error(err, "unable to register webhook", "webhook", "DynamoModel")
		os.Exit(1)
	}
617

618
619
620
	dgdrHandler := webhookvalidation.NewDynamoGraphDeploymentRequestHandler(
		isClusterWide, ptr.Deref(operatorCfg.GPU.DiscoveryEnabled, true),
	)
621
622
623
624
	if err = dgdrHandler.RegisterWithManager(mgr); err != nil {
		setupLog.Error(err, "unable to register webhook", "webhook", "DynamoGraphDeploymentRequest")
		os.Exit(1)
	}
625

626
	// Register the DGDR conversion webhook using the hub version (v1beta1).
627
	if err = ctrl.NewWebhookManagedBy(mgr).
628
		For(&nvidiacomv1beta1.DynamoGraphDeploymentRequest{}).
629
630
631
632
		Complete(); err != nil {
		setupLog.Error(err, "unable to register conversion webhook", "webhook", "DynamoGraphDeploymentRequest-conversion")
		os.Exit(1)
	}
633

634
	setupLog.Info("Validation webhooks registered successfully")
635

636
637
	// Register defaulting (mutating) webhook handlers
	setupLog.Info("Registering defaulting webhooks")
638

639
640
641
642
	dgdDefaulter := webhookdefaulting.NewDGDDefaulter(operatorVersion)
	if err = dgdDefaulter.RegisterWithManager(mgr); err != nil {
		setupLog.Error(err, "unable to register webhook", "webhook", "DynamoGraphDeployment-defaulting")
		os.Exit(1)
643
	}
644

645
646
647
648
649
650
	dgdrDefaulter := webhookdefaulting.NewDGDRDefaulter(operatorVersion)
	if err = dgdrDefaulter.RegisterWithManager(mgr); err != nil {
		setupLog.Error(err, "unable to register webhook", "webhook", "DynamoGraphDeploymentRequest-defaulting")
		os.Exit(1)
	}

651
	setupLog.Info("Defaulting webhooks registered successfully")
652
653
654
655
656
657
658
659
660
661
662
663
	//+kubebuilder:scaffold:builder

	if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
		setupLog.Error(err, "unable to set up health check")
		os.Exit(1)
	}
	if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil {
		setupLog.Error(err, "unable to set up ready check")
		os.Exit(1)
	}

	setupLog.Info("starting manager")
664
	if err := mgr.Start(mainCtx); err != nil {
665
666
667
668
		setupLog.Error(err, "problem running manager")
		os.Exit(1)
	}
}