dynamocomponentdeployment_controller.go 46.1 KB
Newer Older
1
/*
2
 * SPDX-FileCopyrightText: Copyright (c) 2022 Atalaya Tech. Inc
3
 * SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
4
5
6
7
8
9
10
11
12
13
14
15
16
 * SPDX-License-Identifier: Apache-2.0
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
17
 * Modifications Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES
18
19
20
21
22
23
24
 */

package controller

import (
	"context"
	"fmt"
25
	"maps"
26
	"slices"
27
28
29
30
31
32
33
34
35
	"time"

	appsv1 "k8s.io/api/apps/v1"
	autoscalingv2 "k8s.io/api/autoscaling/v2"
	corev1 "k8s.io/api/core/v1"
	networkingv1 "k8s.io/api/networking/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

	"emperror.dev/errors"
36
	configv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/config/v1alpha1"
37
	"github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
38
	"github.com/ai-dynamo/dynamo/deploy/operator/internal/checkpoint"
39
40
41
42
	"github.com/ai-dynamo/dynamo/deploy/operator/internal/common"
	commonconsts "github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
	commonController "github.com/ai-dynamo/dynamo/deploy/operator/internal/controller_common"
	"github.com/ai-dynamo/dynamo/deploy/operator/internal/dynamo"
43
	"github.com/ai-dynamo/dynamo/deploy/operator/internal/observability"
44
45
46
47
48
49
50
51
52
	networkingv1beta1 "istio.io/client-go/pkg/apis/networking/v1beta1"
	k8serrors "k8s.io/apimachinery/pkg/api/errors"
	"k8s.io/apimachinery/pkg/api/meta"
	"k8s.io/apimachinery/pkg/types"
	"k8s.io/apimachinery/pkg/util/intstr"
	"k8s.io/client-go/tools/record"
	ctrl "sigs.k8s.io/controller-runtime"
	"sigs.k8s.io/controller-runtime/pkg/builder"
	"sigs.k8s.io/controller-runtime/pkg/client"
53
	"sigs.k8s.io/controller-runtime/pkg/event"
54
55
	"sigs.k8s.io/controller-runtime/pkg/log"
	"sigs.k8s.io/controller-runtime/pkg/predicate"
56
57
58

	leaderworkersetv1 "sigs.k8s.io/lws/api/leaderworkerset/v1"
	volcanov1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
59
60
61
)

const (
62
63
64
65
66
	DefaultClusterName                                  = "default"
	DefaultServiceAccountName                           = "default"
	KubeAnnotationDeploymentStrategy                    = "nvidia.com/deployment-strategy"
	KubeAnnotationDeploymentRollingUpdateMaxSurge       = "nvidia.com/deployment-rolling-update-max-surge"
	KubeAnnotationDeploymentRollingUpdateMaxUnavailable = "nvidia.com/deployment-rolling-update-max-unavailable"
67
	SchedulerNameVolcano                                = "volcano"
68
69
)

70
71
// DynamoComponentDeploymentReconciler reconciles a DynamoComponentDeployment object
type DynamoComponentDeploymentReconciler struct {
72
	client.Client
73
	Recorder              record.EventRecorder
74
75
	Config                *configv1alpha1.OperatorConfiguration
	RuntimeConfig         *commonController.RuntimeConfig
76
	DockerSecretRetriever dockerSecretRetriever
77
78
}

79
80
81
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamocomponentdeployments,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamocomponentdeployments/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamocomponentdeployments/finalizers,verbs=update
82
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamocheckpoints,verbs=get;list
83
84
85
86
87
88
89
90
91
92
93
94
95
96

//+kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch
//+kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=events,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=autoscaling,resources=horizontalpodautoscalers,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=networking.k8s.io,resources=ingressclasses,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=networking.k8s.io,resources=ingresses,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=events.k8s.io,resources=events,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=coordination.k8s.io,resources=leases,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=networking.istio.io,resources=virtualservices,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=persistentvolumeclaims,verbs=get;list;create;delete

97
98
99
// +kubebuilder:rbac:groups=scheduling.volcano.sh,resources=podgroups,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=leaderworkerset.x-k8s.io,resources=leaderworkersets,verbs=get;list;watch;create;update;patch;delete

100
101
102
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// TODO(user): Modify the Reconcile function to compare the state specified by
103
// the DynamoComponentDeployment object against the actual cluster state, and then
104
105
106
107
108
109
110
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.18.2/pkg/reconcile
//
//nolint:gocyclo,nakedret
111
func (r *DynamoComponentDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, err error) {
112
113
	logs := log.FromContext(ctx)

114
115
	dynamoComponentDeployment := &v1alpha1.DynamoComponentDeployment{}
	err = r.Get(ctx, req.NamespacedName, dynamoComponentDeployment)
116
117
118
119
	if err != nil {
		if k8serrors.IsNotFound(err) {
			// Object not found, return.  Created objects are automatically garbage collected.
			// For additional cleanup logic use finalizers.
120
			logs.Info("DynamoComponentDeployment resource not found. Ignoring since object must be deleted.")
121
122
123
124
			err = nil
			return
		}
		// Error reading the object - requeue the request.
125
		logs.Error(err, "Failed to get DynamoComponentDeployment.")
126
127
128
		return
	}

129
	logs = logs.WithValues("dynamoComponentDeployment", dynamoComponentDeployment.Name, "namespace", dynamoComponentDeployment.Namespace)
130

131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
	// Setup defer to handle errors and update status
	defer func() {
		if err == nil {
			return
		}
		reconcileErr := err
		logs.Error(reconcileErr, "Failed to reconcile DynamoComponentDeployment.")
		r.Recorder.Eventf(dynamoComponentDeployment, corev1.EventTypeWarning, "ReconcileError",
			"Failed to reconcile DynamoComponentDeployment: %v", reconcileErr)
		if _, statusErr := r.setStatusConditions(ctx, req,
			metav1.Condition{
				Type:    v1alpha1.DynamoGraphDeploymentConditionTypeAvailable,
				Status:  metav1.ConditionFalse,
				Reason:  "Reconciling",
				Message: fmt.Sprintf("Failed to reconcile DynamoComponentDeployment: %v", reconcileErr),
			},
		); statusErr != nil {
			logs.Error(statusErr, "Failed to update DynamoComponentDeployment status after reconcile error")
		}
	}()

152
	deleted, err := commonController.HandleFinalizer(ctx, dynamoComponentDeployment, r.Client, r)
153
154
155
156
157
158
159
160
	if err != nil {
		logs.Error(err, "Failed to handle finalizer")
		return ctrl.Result{}, err
	}
	if deleted {
		return ctrl.Result{}, nil
	}

161
162
163
164
165
	if len(dynamoComponentDeployment.Status.Conditions) == 0 {
		logs.Info("Starting to reconcile DynamoComponentDeployment")
		logs.Info("Initializing DynamoComponentDeployment status")
		r.Recorder.Event(dynamoComponentDeployment, corev1.EventTypeNormal, "Reconciling", "Starting to reconcile DynamoComponentDeployment")
		dynamoComponentDeployment, err = r.setStatusConditions(ctx, req,
166
			metav1.Condition{
167
				Type:    v1alpha1.DynamoGraphDeploymentConditionTypeAvailable,
168
169
				Status:  metav1.ConditionUnknown,
				Reason:  "Reconciling",
170
				Message: "Starting to reconcile DynamoComponentDeployment",
171
172
			},
			metav1.Condition{
173
				Type:    v1alpha1.DynamoGraphDeploymentConditionTypeDynamoComponentReady,
174
175
				Status:  metav1.ConditionUnknown,
				Reason:  "Reconciling",
176
				Message: "Starting to reconcile DynamoComponentDeployment",
177
178
179
180
181
182
183
			},
		)
		if err != nil {
			return
		}
	}

184
	// Create the appropriate workload resource based on deployment type
185
	var componentReconcileResult ComponentReconcileResult
186
	if r.RuntimeConfig.LWSEnabled && dynamoComponentDeployment.IsMultinode() {
187
		componentReconcileResult, err = r.reconcileLeaderWorkerSetResources(ctx, dynamoComponentDeployment)
188
	} else {
189
		componentReconcileResult, err = r.reconcileDeploymentResources(ctx, dynamoComponentDeployment)
190
	}
191
192
193
194
	if err != nil {
		return ctrl.Result{}, fmt.Errorf("failed to reconcile the resources: %w", err)
	}
	modified := componentReconcileResult.modified
195
196

	// create or update api-server service
197
	serviceModified, err := r.createOrUpdateOrDeleteServices(ctx, generateResourceOption{
198
		dynamoComponentDeployment: dynamoComponentDeployment,
199
200
	})
	if err != nil {
201
		return ctrl.Result{}, fmt.Errorf("failed to create or update the service: %w", err)
202
203
	}

204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
	// create or update headless service for model endpoint discovery
	componentMap := map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
		dynamoComponentDeployment.Name: &dynamoComponentDeployment.Spec.DynamoComponentDeploymentSharedSpec,
	}
	if err := dynamo.ReconcileModelServicesForComponents(
		ctx,
		r,
		dynamoComponentDeployment,
		componentMap,
		dynamoComponentDeployment.Namespace,
	); err != nil {
		logs.Error(err, "Failed to reconcile model service")
		return ctrl.Result{}, err
	}

219
	// create or update api-server ingresses
220
	ingressModified, err := r.createOrUpdateOrDeleteIngress(ctx, generateResourceOption{
221
		dynamoComponentDeployment: dynamoComponentDeployment,
222
	})
223
	if err != nil {
224
		return ctrl.Result{}, fmt.Errorf("failed to create or update the ingress: %w", err)
225
226
	}

227
	if serviceModified || ingressModified {
228
229
230
231
		modified = true
	}

	if !modified {
232
		r.Recorder.Eventf(dynamoComponentDeployment, corev1.EventTypeNormal, "UpdateDynamoGraphDeployment", "No changes to dynamo deployment %s", dynamoComponentDeployment.Name)
233
234
235
	}

	logs.Info("Finished reconciling.")
236
	r.Recorder.Eventf(dynamoComponentDeployment, corev1.EventTypeNormal, "Update", "All resources updated!")
237

238
239
240
	err = r.setStatusConditionAndServiceReplicaStatus(ctx, dynamoComponentDeployment, componentReconcileResult)
	if err != nil {
		return ctrl.Result{}, fmt.Errorf("failed to set status condition and service replica status: %w", err)
241
242
	}

243
244
245
	return
}

246
247
248
249
250
type ComponentReconcileResult struct {
	modified             bool
	status               metav1.ConditionStatus
	reason               string
	message              string
251
	serviceReplicaStatus *v1alpha1.ServiceReplicaStatus
252
253
254
255
256
257
258
259
260
261
262
}

func (r *DynamoComponentDeploymentReconciler) reconcileDeploymentResources(ctx context.Context, dynamoComponentDeployment *v1alpha1.DynamoComponentDeployment) (ComponentReconcileResult, error) {
	logger := log.FromContext(ctx)
	deploymentModified, deployment, err := r.createOrUpdateOrDeleteDeployments(ctx, generateResourceOption{
		dynamoComponentDeployment: dynamoComponentDeployment,
	})
	if err != nil {
		return ComponentReconcileResult{}, fmt.Errorf("failed to create or update the deployment: %w", err)
	}

263
264
265
266
267
268
269
270
271
272
	logger.V(1).Info("Deployment sync completed",
		"deploymentModified", deploymentModified,
		"deploymentName", deployment.Name,
		"deploymentGeneration", deployment.Generation,
		"deploymentObservedGeneration", deployment.Status.ObservedGeneration,
		"deploymentReplicas", deployment.Status.Replicas,
		"deploymentUpdatedReplicas", deployment.Status.UpdatedReplicas,
		"deploymentAvailableReplicas", deployment.Status.AvailableReplicas,
		"deploymentReadyReplicas", deployment.Status.ReadyReplicas)

273
	serviceReplicaStatus := &v1alpha1.ServiceReplicaStatus{
274
275
		ComponentKind:     v1alpha1.ComponentKindDeployment,
		ComponentName:     deployment.Name,
276
		ComponentNames:    []string{deployment.Name},
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
		Replicas:          deployment.Status.Replicas,
		UpdatedReplicas:   deployment.Status.UpdatedReplicas,
		ReadyReplicas:     &deployment.Status.ReadyReplicas,
		AvailableReplicas: &deployment.Status.AvailableReplicas,
	}

	if IsDeploymentReady(deployment) {
		return ComponentReconcileResult{
			modified:             deploymentModified,
			status:               metav1.ConditionTrue,
			reason:               "DeploymentReady",
			message:              "Deployment is ready",
			serviceReplicaStatus: serviceReplicaStatus,
		}, nil
	}
	return ComponentReconcileResult{
		modified:             deploymentModified,
		status:               metav1.ConditionFalse,
		reason:               "DeploymentNotReady",
		message:              "Deployment is not ready",
		serviceReplicaStatus: serviceReplicaStatus,
	}, nil
}

func (r *DynamoComponentDeploymentReconciler) reconcileLeaderWorkerSetResources(ctx context.Context, dynamoComponentDeployment *v1alpha1.DynamoComponentDeployment) (ComponentReconcileResult, error) {
	logger := log.FromContext(ctx)

	desiredReplicas := int32(1)
	if dynamoComponentDeployment.Spec.Replicas != nil {
		desiredReplicas = *dynamoComponentDeployment.Spec.Replicas
	}

	anyModified := false
	leaderWorkerSets := make([]*leaderworkersetv1.LeaderWorkerSet, 0, desiredReplicas)
	for i := range int(desiredReplicas) {
		volcanoPodGroupModified, _, err := commonController.SyncResource(ctx, r, dynamoComponentDeployment, func(ctx context.Context) (*volcanov1beta1.PodGroup, bool, error) {
			return r.generateVolcanoPodGroup(ctx, generateResourceOption{
314
315
				dynamoComponentDeployment: dynamoComponentDeployment,
				instanceID:                &i,
316
317
318
319
320
321
322
323
			})
		})
		if err != nil {
			return ComponentReconcileResult{}, fmt.Errorf("failed to sync the PodGroup: %w", err)
		}

		leaderWorkerSetModified, lwsObj, err := commonController.SyncResource(ctx, r, dynamoComponentDeployment, func(ctx context.Context) (*leaderworkersetv1.LeaderWorkerSet, bool, error) {
			return r.generateLeaderWorkerSet(ctx, generateResourceOption{
324
325
				dynamoComponentDeployment: dynamoComponentDeployment,
				instanceID:                &i,
326
327
328
329
330
331
332
333
334
335
336
337
338
339
			})
		})
		if err != nil {
			return ComponentReconcileResult{}, fmt.Errorf("failed to sync the LeaderWorkerSet: %w", err)
		}

		if leaderWorkerSetModified || volcanoPodGroupModified {
			anyModified = true
		}
		leaderWorkerSets = append(leaderWorkerSets, lwsObj)
	}

	// Clean up any excess LeaderWorkerSets (if replicas were decreased)
	for i := int(desiredReplicas); ; i++ {
340
		nextLWSName := lwsInstanceName(dynamoComponentDeployment, i)
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
		lwsToDelete := &leaderworkersetv1.LeaderWorkerSet{}
		err := r.Get(ctx, types.NamespacedName{
			Name:      nextLWSName,
			Namespace: dynamoComponentDeployment.Namespace,
		}, lwsToDelete)

		if err != nil {
			if k8serrors.IsNotFound(err) {
				break
			}
			return ComponentReconcileResult{}, fmt.Errorf("failed to get the LeaderWorkerSet for deletion: %w", err)
		}

		err = r.Delete(ctx, lwsToDelete)
		if err != nil {
			return ComponentReconcileResult{}, fmt.Errorf("failed to delete the LeaderWorkerSet: %w", err)
		}

		podGroupName := nextLWSName
		podGroupToDelete := &volcanov1beta1.PodGroup{}
		err = r.Get(ctx, types.NamespacedName{
			Name:      podGroupName,
			Namespace: dynamoComponentDeployment.Namespace,
		}, podGroupToDelete)

		if err != nil {
			if !k8serrors.IsNotFound(err) {
				logger.Error(err, "Failed to get PodGroup for deletion", "podGroupName", podGroupName)
			}
		} else {
			err = r.Delete(ctx, podGroupToDelete)
			if err != nil {
				logger.Error(err, "Failed to delete PodGroup", "podGroupName", podGroupName)
			}
		}

		anyModified = true
	}
379
380

	allReady := true
381
	lwsReplicaStatuses := []v1alpha1.ServiceReplicaStatus{}
382
383
384
385
	for _, leaderWorkerSet := range leaderWorkerSets {
		if !IsLeaderWorkerSetReady(leaderWorkerSet) {
			allReady = false
		}
386
		lwsReplicaStatuses = append(lwsReplicaStatuses, getLeaderWorkerSetReplicasStatus(leaderWorkerSet))
387
388
389
	}

	if allReady {
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
		return ComponentReconcileResult{
			modified:             anyModified,
			status:               metav1.ConditionTrue,
			reason:               "AllLeaderWorkerSetsReady",
			message:              "All LeaderWorkerSets are ready",
			serviceReplicaStatus: combineLWSReplicaStatuses(lwsReplicaStatuses),
		}, nil
	}
	return ComponentReconcileResult{
		modified:             anyModified,
		status:               metav1.ConditionFalse,
		reason:               "SomeLeaderWorkerSetsNotReady",
		message:              "Some LeaderWorkerSets are not ready",
		serviceReplicaStatus: combineLWSReplicaStatuses(lwsReplicaStatuses),
	}, nil

}

func (r *DynamoComponentDeploymentReconciler) setStatusConditionAndServiceReplicaStatus(ctx context.Context, dynamoComponentDeployment *v1alpha1.DynamoComponentDeployment, componentReconcileResult ComponentReconcileResult) error {
409
	availableCondition := metav1.Condition{
410
411
412
413
414
415
		Type:    v1alpha1.DynamoGraphDeploymentConditionTypeAvailable,
		Status:  componentReconcileResult.status,
		Reason:  componentReconcileResult.reason,
		Message: componentReconcileResult.message,
	}

416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
	var componentReadyReason, componentReadyMessage string
	if componentReconcileResult.status == metav1.ConditionTrue {
		componentReadyReason = "ComponentReady"
		componentReadyMessage = "DynamoComponent is ready"
	} else {
		componentReadyReason = "ComponentNotReady"
		componentReadyMessage = "DynamoComponent is not ready"
	}

	componentReadyCondition := metav1.Condition{
		Type:    v1alpha1.DynamoGraphDeploymentConditionTypeDynamoComponentReady,
		Status:  componentReconcileResult.status,
		Reason:  componentReadyReason,
		Message: componentReadyMessage,
	}

	meta.SetStatusCondition(&dynamoComponentDeployment.Status.Conditions, availableCondition)
	meta.SetStatusCondition(&dynamoComponentDeployment.Status.Conditions, componentReadyCondition)
434
	dynamoComponentDeployment.Status.Service = componentReconcileResult.serviceReplicaStatus
435
	dynamoComponentDeployment.Status.ObservedGeneration = dynamoComponentDeployment.Generation
436
437
438
439

	err := r.Status().Update(ctx, dynamoComponentDeployment)
	if err != nil {
		return fmt.Errorf("failed to update DynamoComponentDeployment status: %w", err)
440
	}
441
442
443
444
445
446
447
	return nil
}

func getLeaderWorkerSetReplicasStatus(leaderWorkerSet *leaderworkersetv1.LeaderWorkerSet) v1alpha1.ServiceReplicaStatus {
	return v1alpha1.ServiceReplicaStatus{
		ComponentKind:   v1alpha1.ComponentKindLeaderWorkerSet,
		ComponentName:   leaderWorkerSet.Name,
448
		ComponentNames:  []string{leaderWorkerSet.Name},
449
450
451
452
453
454
		Replicas:        leaderWorkerSet.Status.Replicas,
		UpdatedReplicas: leaderWorkerSet.Status.UpdatedReplicas,
		ReadyReplicas:   &leaderWorkerSet.Status.ReadyReplicas,
	}
}

455
func combineLWSReplicaStatuses(serviceReplicaStatuses []v1alpha1.ServiceReplicaStatus) *v1alpha1.ServiceReplicaStatus {
456
	if len(serviceReplicaStatuses) == 0 {
457
		return nil
458
459
460
461
462
463
464
	}

	firstServiceStatus := serviceReplicaStatuses[0]
	var readyReplicas int32 = 0
	if firstServiceStatus.ReadyReplicas != nil {
		readyReplicas = *firstServiceStatus.ReadyReplicas
	}
465
	allNames := append([]string{}, firstServiceStatus.ComponentNames...)
466
467
468
469
470
471
	for _, serviceReplicaStatus := range serviceReplicaStatuses[1:] {
		firstServiceStatus.Replicas += serviceReplicaStatus.Replicas
		firstServiceStatus.UpdatedReplicas += serviceReplicaStatus.UpdatedReplicas
		if serviceReplicaStatus.ReadyReplicas != nil {
			readyReplicas += *serviceReplicaStatus.ReadyReplicas
		}
472
		allNames = append(allNames, serviceReplicaStatus.ComponentNames...)
473
474
	}

475
476
	slices.Sort(allNames)
	firstServiceStatus.ComponentNames = allNames
477
	firstServiceStatus.ReadyReplicas = &readyReplicas
478
	return &firstServiceStatus
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
}

// IsLeaderWorkerSetReady determines if a LeaderWorkerSet is fully ready and available
func IsLeaderWorkerSetReady(leaderWorkerSet *leaderworkersetv1.LeaderWorkerSet) bool {
	if leaderWorkerSet == nil {
		return false
	}

	desiredReplicas := int32(1)
	if leaderWorkerSet.Spec.Replicas != nil {
		desiredReplicas = *leaderWorkerSet.Spec.Replicas
	}

	// Special case: if no replicas are desired, the LeaderWorkerSet is considered ready
	if desiredReplicas == 0 {
		return true
	}

	status := leaderWorkerSet.Status

	if status.ReadyReplicas < desiredReplicas {
		return false
	}

	// Look for the Available condition specifically - this is defined in the CRD for LeaderWorkerSet
	for _, cond := range leaderWorkerSet.Status.Conditions {
		if cond.Type == string(leaderworkersetv1.LeaderWorkerSetAvailable) {
			return cond.Status == metav1.ConditionTrue
		}
	}

	return false
}

func (r *DynamoComponentDeploymentReconciler) generateVolcanoPodGroup(ctx context.Context, opt generateResourceOption) (*volcanov1beta1.PodGroup, bool, error) {
	logs := log.FromContext(ctx)
	logs.Info("Generating Volcano PodGroup")

	if opt.instanceID == nil {
		return nil, false, errors.New("generateVolcanoPodGroup: instanceID cannot be nil")
	}
	instanceID := *opt.instanceID

	if instanceID < 0 {
		return nil, false, fmt.Errorf("generateVolcanoPodGroup: instanceID cannot be negative, got %d", instanceID)
	}

526
	podGroupName := lwsInstanceName(opt.dynamoComponentDeployment, instanceID)
527
528
529
530
531
532

	kubeNs := opt.dynamoComponentDeployment.Namespace

	labels := make(map[string]string)
	labels["instance-id"] = fmt.Sprintf("%d", instanceID)

533
	minMember := opt.dynamoComponentDeployment.GetNumberOfNodes()
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549

	podGroup := &volcanov1beta1.PodGroup{
		ObjectMeta: metav1.ObjectMeta{
			Name:      podGroupName,
			Namespace: kubeNs,
			Labels:    labels,
		},
		Spec: volcanov1beta1.PodGroupSpec{
			MinMember: minMember,
		},
	}

	return podGroup, false, nil
}

func (r *DynamoComponentDeploymentReconciler) generateLeaderPodTemplateSpec(ctx context.Context, opt generateResourceOption, kubeName string, labels map[string]string, instanceID int) (*corev1.PodTemplateSpec, error) {
550
	leaderPodTemplateSpec, err := r.generatePodTemplateSpec(ctx, opt, dynamo.RoleLeader)
551
552
553
554
	if err != nil {
		return nil, errors.Wrap(err, "failed to generate leader pod template")
	}

555
	maps.Copy(leaderPodTemplateSpec.ObjectMeta.Labels, labels)
556
557
558
559
560
561
562
563
564
	leaderPodTemplateSpec.ObjectMeta.Labels["role"] = "leader"
	leaderPodTemplateSpec.ObjectMeta.Labels["instance-id"] = fmt.Sprintf("%d", instanceID)
	delete(leaderPodTemplateSpec.ObjectMeta.Labels, commonconsts.KubeLabelDynamoSelector)

	if leaderPodTemplateSpec.ObjectMeta.Annotations == nil {
		leaderPodTemplateSpec.ObjectMeta.Annotations = make(map[string]string)
	}
	leaderPodTemplateSpec.ObjectMeta.Annotations["scheduling.k8s.io/group-name"] = kubeName

565
	leaderPodTemplateSpec.Spec.SchedulerName = SchedulerNameVolcano
566

567
	err = checkMainContainer(&leaderPodTemplateSpec.Spec)
568

569
570
	if err != nil {
		return nil, errors.Wrap(err, "generateLeaderPodTemplateSpec: failed to check main container")
571
572
573
574
575
	}

	return leaderPodTemplateSpec, nil
}

576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
func checkMainContainer(spec *corev1.PodSpec) error {

	if len(spec.Containers) == 0 {
		return errors.New("No containers found in pod spec")
	}

	mainContainerFound := false
	for _, container := range spec.Containers {
		if container.Name != commonconsts.MainContainerName {
			continue
		}

		if len(container.Command) == 0 {
			return errors.New("container Command cannot be nil for LWS pod")
		}

		if len(container.Args) == 0 {
			return errors.New("container Args cannot be empty for LWS pod")
		}

		mainContainerFound = true
		break
	}

	if !mainContainerFound {
		return errors.New("main container not found in pod spec")
	}

	return nil
}

607
func (r *DynamoComponentDeploymentReconciler) generateWorkerPodTemplateSpec(ctx context.Context, opt generateResourceOption, kubeName string, labels map[string]string, instanceID int) (*corev1.PodTemplateSpec, error) {
608
	workerPodTemplateSpec, err := r.generatePodTemplateSpec(ctx, opt, dynamo.RoleWorker)
609
610
611
612
	if err != nil {
		return nil, errors.Wrap(err, "failed to generate worker pod template")
	}

613
	maps.Copy(workerPodTemplateSpec.ObjectMeta.Labels, labels)
614
615
616
617
	workerPodTemplateSpec.ObjectMeta.Labels["role"] = "worker"
	workerPodTemplateSpec.ObjectMeta.Labels["instance-id"] = fmt.Sprintf("%d", instanceID)
	delete(workerPodTemplateSpec.ObjectMeta.Labels, commonconsts.KubeLabelDynamoSelector)

618
	workerPodTemplateSpec.Spec.SchedulerName = SchedulerNameVolcano
619
620
621
622
623
624

	if workerPodTemplateSpec.ObjectMeta.Annotations == nil {
		workerPodTemplateSpec.ObjectMeta.Annotations = make(map[string]string)
	}
	workerPodTemplateSpec.ObjectMeta.Annotations["scheduling.k8s.io/group-name"] = kubeName

625
	err = checkMainContainer(&workerPodTemplateSpec.Spec)
626

627
628
	if err != nil {
		return nil, errors.Wrap(err, "generateWorkerPodTemplateSpec: failed to check LWS worker main container")
629
630
631
	}

	if opt.dynamoComponentDeployment.Spec.Resources == nil || opt.dynamoComponentDeployment.Spec.Resources.Limits == nil || opt.dynamoComponentDeployment.Spec.Resources.Limits.GPU == "" {
632
		return nil, fmt.Errorf("generateWorkerPodTemplateSpec: GPU limit is not set for LWS worker pod")
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
	}

	return workerPodTemplateSpec, nil
}

// generateLeaderWorkerSet creates a LeaderWorkerSet resource from the DynamoComponentDeployment
func (r *DynamoComponentDeploymentReconciler) generateLeaderWorkerSet(ctx context.Context, opt generateResourceOption) (*leaderworkersetv1.LeaderWorkerSet, bool, error) {
	logs := log.FromContext(ctx)
	logs.Info("Generating LeaderWorkerSet")

	if opt.instanceID == nil {
		return nil, false, errors.New("generateLeaderWorkerSet: instanceID cannot be nil")
	}
	instanceID := *opt.instanceID

	if instanceID < 0 {
		return nil, false, fmt.Errorf("generateLeaderWorkerSet: instanceID cannot be negative, got %d", instanceID)
	}

652
	kubeName := lwsInstanceName(opt.dynamoComponentDeployment, instanceID)
653
654

	kubeNs := opt.dynamoComponentDeployment.Namespace
655
	labels := r.getKubeLabels(opt.dynamoComponentDeployment)
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689

	if labels == nil {
		labels = make(map[string]string)
	}
	labels["instance-id"] = fmt.Sprintf("%d", instanceID)

	leaderWorkerSet := &leaderworkersetv1.LeaderWorkerSet{
		ObjectMeta: metav1.ObjectMeta{
			Name:      kubeName,
			Namespace: kubeNs,
			Labels:    labels,
		},
	}

	leaderPodLabels := make(map[string]string)
	for k, v := range labels {
		leaderPodLabels[k] = v
	}
	leaderPodTemplateSpec, err := r.generateLeaderPodTemplateSpec(ctx, opt, kubeName, leaderPodLabels, instanceID)
	if err != nil {
		return nil, false, errors.Wrap(err, "generateLeaderWorkerSet: failed to generate leader pod template")
	}

	workerPodLabels := make(map[string]string)
	for k, v := range labels {
		workerPodLabels[k] = v
	}
	workerPodTemplateSpec, err := r.generateWorkerPodTemplateSpec(ctx, opt, kubeName, workerPodLabels, instanceID)
	if err != nil {
		return nil, false, errors.Wrap(err, "generateLeaderWorkerSet: failed to generate worker pod template")
	}

	// Each individual LeaderWorkerSet always has exactly 1 replica
	singleReplica := int32(1)
690
	groupSize := opt.dynamoComponentDeployment.GetNumberOfNodes()
691
692
693
694
695
696
697
698
699
700
701
702
703
704

	leaderWorkerSet.Spec = leaderworkersetv1.LeaderWorkerSetSpec{
		Replicas:      &singleReplica,
		StartupPolicy: leaderworkersetv1.LeaderCreatedStartupPolicy,
		LeaderWorkerTemplate: leaderworkersetv1.LeaderWorkerTemplate{
			LeaderTemplate: leaderPodTemplateSpec,
			WorkerTemplate: *workerPodTemplateSpec,
			Size:           &groupSize,
		},
	}

	return leaderWorkerSet, false, nil
}

705
706
707
708
func lwsInstanceName(dcd *v1alpha1.DynamoComponentDeployment, instanceID int) string {
	return fmt.Sprintf("%s-%d", dcd.Name, instanceID)
}

709
func (r *DynamoComponentDeploymentReconciler) FinalizeResource(ctx context.Context, dynamoComponentDeployment *v1alpha1.DynamoComponentDeployment) error {
710
	logger := log.FromContext(ctx)
711
	logger.Info("Finalizing the DynamoComponentDeployment", "dynamoComponentDeployment", dynamoComponentDeployment)
712

713
714
715
	return nil
}

716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
// IsDeploymentReady determines if a Kubernetes Deployment is fully ready and available.
// It checks various status fields to ensure all replicas are available and the deployment
// configuration has been fully applied.
func IsDeploymentReady(deployment *appsv1.Deployment) bool {
	if deployment == nil {
		return false
	}
	// Paused deployments should not be considered ready
	if deployment.Spec.Paused {
		return false
	}
	// Default to 1 replica if not specified
	desiredReplicas := int32(1)
	if deployment.Spec.Replicas != nil {
		desiredReplicas = *deployment.Spec.Replicas
	}
	// Special case: if no replicas are desired, the deployment is considered ready
	if desiredReplicas == 0 {
		return true
	}
	status := deployment.Status
	// Check all basic status requirements:
	// 1. ObservedGeneration: Deployment controller has observed the latest configuration
	// 2. UpdatedReplicas: All replicas have been updated to the latest version
	// 3. AvailableReplicas: All desired replicas are available (schedulable and healthy)
741
	// 4. Replicas: Total replicas equals desired (no surge pods remaining from rolling update)
742
743
	if status.ObservedGeneration < deployment.Generation ||
		status.UpdatedReplicas < desiredReplicas ||
744
745
		status.AvailableReplicas < desiredReplicas ||
		status.Replicas != desiredReplicas {
746
747
748
749
750
751
752
753
754
755
756
757
758
		return false
	}
	// Finally, check for the DeploymentAvailable condition
	// This is Kubernetes' own assessment that the deployment is available
	for _, cond := range deployment.Status.Conditions {
		if cond.Type == appsv1.DeploymentAvailable && cond.Status == corev1.ConditionTrue {
			return true
		}
	}
	// If we get here, the basic checks passed but the Available condition wasn't found
	return false
}

759
760
func (r *DynamoComponentDeploymentReconciler) setStatusConditions(ctx context.Context, req ctrl.Request, conditions ...metav1.Condition) (dynamoComponentDeployment *v1alpha1.DynamoComponentDeployment, err error) {
	dynamoComponentDeployment = &v1alpha1.DynamoComponentDeployment{}
761
762
	maxRetries := 3
	for range maxRetries - 1 {
763
764
		if err = r.Get(ctx, req.NamespacedName, dynamoComponentDeployment); err != nil {
			err = errors.Wrap(err, "Failed to re-fetch DynamoComponentDeployment")
765
766
767
			return
		}
		for _, condition := range conditions {
768
			meta.SetStatusCondition(&dynamoComponentDeployment.Status.Conditions, condition)
769
		}
770
		if err = r.Status().Update(ctx, dynamoComponentDeployment); err != nil {
771
772
773
774
775
			if k8serrors.IsConflict(err) {
				time.Sleep(100 * time.Millisecond)
				continue
			}
			break
776
777
778
779
780
		} else {
			break
		}
	}
	if err != nil {
781
		err = errors.Wrap(err, "Failed to update DynamoComponentDeployment status")
782
783
		return
	}
784
785
	if err = r.Get(ctx, req.NamespacedName, dynamoComponentDeployment); err != nil {
		err = errors.Wrap(err, "Failed to re-fetch DynamoComponentDeployment")
786
787
788
789
790
		return
	}
	return
}

791
792
793
func (r *DynamoComponentDeploymentReconciler) createOrUpdateOrDeleteDeployments(ctx context.Context, opt generateResourceOption) (bool, *appsv1.Deployment, error) {
	modified, depl, err := commonController.SyncResource(ctx, r, opt.dynamoComponentDeployment, func(ctx context.Context) (*appsv1.Deployment, bool, error) {
		return r.generateDeployment(ctx, opt)
794
	})
795
	if err != nil {
796
		return false, nil, errors.Wrap(err, "create or update deployment")
797
	}
798
	return modified, depl, nil
799
800
}

801
802
func getResourceAnnotations(dynamoComponentDeployment *v1alpha1.DynamoComponentDeployment) map[string]string {
	resourceAnnotations := dynamoComponentDeployment.Spec.Annotations
803
804
805
806
807
808
809
	if resourceAnnotations == nil {
		resourceAnnotations = map[string]string{}
	}

	return resourceAnnotations
}

810
811
812
func (r *DynamoComponentDeploymentReconciler) createOrUpdateOrDeleteServices(ctx context.Context, opt generateResourceOption) (bool, error) {
	modified, _, err := commonController.SyncResource(ctx, r, opt.dynamoComponentDeployment, func(ctx context.Context) (*corev1.Service, bool, error) {
		return r.generateService(opt)
813
	})
814
	if err != nil {
815
		return false, err
816
	}
817
	return modified, nil
818
819
}

820
821
func (r *DynamoComponentDeploymentReconciler) createOrUpdateOrDeleteIngress(ctx context.Context, opt generateResourceOption) (bool, error) {
	modified, _, err := commonController.SyncResource(ctx, r, opt.dynamoComponentDeployment, func(ctx context.Context) (*networkingv1.Ingress, bool, error) {
822
823
		return r.generateIngress(ctx, opt)
	})
824
	if err != nil {
825
		return false, err
826
	}
827
	if r.Config.Ingress.UseVirtualService() {
828
829
830
831
832
833
834
		modified_, _, err := commonController.SyncResource(ctx, r, opt.dynamoComponentDeployment, func(ctx context.Context) (*networkingv1beta1.VirtualService, bool, error) {
			return r.generateVirtualService(ctx, opt)
		})
		if err != nil {
			return false, err
		}
		return modified || modified_, nil
835
	}
836
	return modified, nil
837
838
}

839
func (r *DynamoComponentDeploymentReconciler) generateIngress(ctx context.Context, opt generateResourceOption) (*networkingv1.Ingress, bool, error) {
840
	log := log.FromContext(ctx)
841
842
843
844
	log.Info("Starting generateIngress")

	ingress := &networkingv1.Ingress{
		ObjectMeta: metav1.ObjectMeta{
845
846
			Name:      opt.dynamoComponentDeployment.Name,
			Namespace: opt.dynamoComponentDeployment.Namespace,
847
848
		},
	}
849

850
	if opt.dynamoComponentDeployment.Spec.Ingress == nil || !opt.dynamoComponentDeployment.Spec.Ingress.Enabled || opt.dynamoComponentDeployment.Spec.Ingress.IngressControllerClassName == nil {
851
852
		log.Info("Ingress is not enabled")
		return ingress, true, nil
853
	}
854
	return dynamo.GenerateComponentIngress(ctx, opt.dynamoComponentDeployment.Name, opt.dynamoComponentDeployment.Namespace, *opt.dynamoComponentDeployment.Spec.Ingress), false, nil
855
856
}

857
func (r *DynamoComponentDeploymentReconciler) generateVirtualService(ctx context.Context, opt generateResourceOption) (*networkingv1beta1.VirtualService, bool, error) {
858
859
860
	log := log.FromContext(ctx)
	log.Info("Starting generateVirtualService")

861
862
	vs := &networkingv1beta1.VirtualService{
		ObjectMeta: metav1.ObjectMeta{
863
864
			Name:      opt.dynamoComponentDeployment.Name,
			Namespace: opt.dynamoComponentDeployment.Namespace,
865
		},
866
867
	}

868
	if !opt.dynamoComponentDeployment.Spec.Ingress.IsVirtualServiceEnabled() {
869
870
871
		log.Info("VirtualService is not enabled")
		return vs, true, nil
	}
872
	return dynamo.GenerateComponentVirtualService(ctx, opt.dynamoComponentDeployment.Name, opt.dynamoComponentDeployment.Namespace, *opt.dynamoComponentDeployment.Spec.Ingress), false, nil
873
874
}

875
func (r *DynamoComponentDeploymentReconciler) getKubeLabels(dynamoComponentDeployment *v1alpha1.DynamoComponentDeployment) map[string]string {
876
877
878
879
880
881
882
883
884
	labels := map[string]string{}
	if dynamoComponentDeployment != nil {
		if dynamoComponentDeployment.Spec.Labels != nil {
			maps.Copy(labels, dynamoComponentDeployment.Spec.Labels)
		}
		if dynamoComponentDeployment.Labels != nil {
			maps.Copy(labels, dynamoComponentDeployment.Labels)
		}
		dynamo.AddBaseModelLabel(labels, dynamoComponentDeployment.Spec.ModelRef)
885
	}
886
	return labels
887
888
}

889
890
func (r *DynamoComponentDeploymentReconciler) getKubeAnnotations(dynamoComponentDeployment *v1alpha1.DynamoComponentDeployment) map[string]string {
	annotations := map[string]string{}
891
892
893
894
895
896
897
898
	if dynamoComponentDeployment != nil {
		if dynamoComponentDeployment.Spec.Annotations != nil {
			maps.Copy(annotations, dynamoComponentDeployment.Spec.Annotations)
		}
		if dynamoComponentDeployment.Spec.ExtraPodMetadata != nil && dynamoComponentDeployment.Spec.ExtraPodMetadata.Annotations != nil {
			maps.Copy(annotations, dynamoComponentDeployment.Spec.ExtraPodMetadata.Annotations)
		}
		dynamo.AddBaseModelAnnotation(annotations, dynamoComponentDeployment.Spec.ModelRef)
899
900
901
902
903
	}
	return annotations
}

//nolint:nakedret
904
905
func (r *DynamoComponentDeploymentReconciler) generateDeployment(ctx context.Context, opt generateResourceOption) (kubeDeployment *appsv1.Deployment, toDelete bool, err error) {
	kubeNs := opt.dynamoComponentDeployment.Namespace
906

907
	labels := r.getKubeLabels(opt.dynamoComponentDeployment)
908

909
	annotations := r.getKubeAnnotations(opt.dynamoComponentDeployment)
910

911
	kubeName := opt.dynamoComponentDeployment.Name
912

913
914
915
916
917
918
919
920
921
922
	kubeDeployment = &appsv1.Deployment{
		ObjectMeta: metav1.ObjectMeta{
			Name:        kubeName,
			Namespace:   kubeNs,
			Labels:      labels,
			Annotations: annotations,
		},
	}

	// nolint: gosimple
923
	podTemplateSpec, err := r.generatePodTemplateSpec(ctx, opt, dynamo.RoleMain)
924
925
926
927
	if err != nil {
		return
	}

928
	maxSurge, maxUnavailable := getDeploymentRollingUpdateMaxSurgeAndMaxUnavailable(annotations)
929
930
931
932

	strategy := appsv1.DeploymentStrategy{
		Type: appsv1.RollingUpdateDeploymentStrategyType,
		RollingUpdate: &appsv1.RollingUpdateDeployment{
933
934
			MaxSurge:       &maxSurge,
			MaxUnavailable: &maxUnavailable,
935
936
937
		},
	}

938
	resourceAnnotations := getResourceAnnotations(opt.dynamoComponentDeployment)
939
940
	strategyStr := resourceAnnotations[KubeAnnotationDeploymentStrategy]
	if strategyStr != "" {
941
		strategyType := common.DeploymentStrategy(strategyStr)
942
		switch strategyType {
943
		case common.DeploymentStrategyRollingUpdate:
944
945
946
			strategy = appsv1.DeploymentStrategy{
				Type: appsv1.RollingUpdateDeploymentStrategyType,
				RollingUpdate: &appsv1.RollingUpdateDeployment{
947
948
					MaxSurge:       &maxSurge,
					MaxUnavailable: &maxUnavailable,
949
950
				},
			}
951
		case common.DeploymentStrategyRecreate:
952
953
954
955
956
957
			strategy = appsv1.DeploymentStrategy{
				Type: appsv1.RecreateDeploymentStrategyType,
			}
		}
	}

958
959
960
961
962
963
964
965
966
967
968
	// Checkpoint-restore pods must avoid overlap with prior replicas.
	// Enforce Recreate whenever the rendered template is a restore target so
	// the old pod is terminated before the restore placeholder is started.
	if podTemplateSpec != nil &&
		podTemplateSpec.Labels != nil &&
		podTemplateSpec.Labels[commonconsts.KubeLabelIsRestoreTarget] == commonconsts.KubeLabelValueTrue {
		strategy = appsv1.DeploymentStrategy{
			Type: appsv1.RecreateDeploymentStrategyType,
		}
	}

969
	kubeDeployment.Spec = appsv1.DeploymentSpec{
970
		Replicas: opt.dynamoComponentDeployment.Spec.Replicas,
971
972
		Selector: &metav1.LabelSelector{
			MatchLabels: map[string]string{
973
				commonconsts.KubeLabelDynamoSelector: kubeName,
974
975
			},
		},
976
977
		Template: *podTemplateSpec,
		Strategy: strategy,
978
979
980
981
982
	}

	return
}

983
984
985
986
987
988
989
990
991
992
993
994
995
996
func getDeploymentRollingUpdateMaxSurgeAndMaxUnavailable(annotations map[string]string) (intstr.IntOrString, intstr.IntOrString) {
	maxSurge := intstr.FromString("25%")
	maxUnavailable := intstr.FromString("25%")

	if annotations[KubeAnnotationDeploymentRollingUpdateMaxSurge] != "" {
		maxSurge = intstr.Parse(annotations[KubeAnnotationDeploymentRollingUpdateMaxSurge])
	}
	if annotations[KubeAnnotationDeploymentRollingUpdateMaxUnavailable] != "" {
		maxUnavailable = intstr.Parse(annotations[KubeAnnotationDeploymentRollingUpdateMaxUnavailable])
	}

	return maxSurge, maxUnavailable
}

997
type generateResourceOption struct {
998
999
	dynamoComponentDeployment *v1alpha1.DynamoComponentDeployment
	instanceID                *int
1000
}
1001
1002

//nolint:gocyclo,nakedret
1003
func (r *DynamoComponentDeploymentReconciler) generatePodTemplateSpec(ctx context.Context, opt generateResourceOption, role dynamo.Role) (podTemplateSpec *corev1.PodTemplateSpec, err error) {
1004
	podLabels := r.getKubeLabels(opt.dynamoComponentDeployment)
1005

1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
	// Convert user-provided metrics annotation into controller-managed label
	// By default (no annotation), metrics are enabled
	metricsAnnotationValue := ""
	if opt.dynamoComponentDeployment.Spec.Annotations != nil {
		metricsAnnotationValue = opt.dynamoComponentDeployment.Spec.Annotations[commonconsts.KubeAnnotationEnableMetrics]
	}
	switch metricsAnnotationValue {
	case commonconsts.KubeLabelValueFalse:
		// Explicitly disabled, don't add the label
	default:
		// Any other value (including empty) enables metrics
		podLabels[commonconsts.KubeLabelMetricsEnabled] = commonconsts.KubeLabelValueTrue
	}

1020
1021
1022
	// Add label for the dynamo graph deployment on the pods themselves
	podLabels[commonconsts.KubeLabelDynamoGraphDeploymentName] = opt.dynamoComponentDeployment.Spec.Labels[commonconsts.KubeLabelDynamoGraphDeploymentName]

1023
1024
1025
1026
1027
	// Add component type label if specified
	if opt.dynamoComponentDeployment.Spec.ComponentType != "" {
		podLabels[commonconsts.KubeLabelDynamoComponentType] = opt.dynamoComponentDeployment.Spec.ComponentType
	}

1028
1029
1030
1031
	if opt.dynamoComponentDeployment.Spec.SubComponentType != "" {
		podLabels[commonconsts.KubeLabelDynamoSubComponentType] = opt.dynamoComponentDeployment.Spec.SubComponentType
	}

1032
	podAnnotations := make(map[string]string)
1033

1034
	kubeName := opt.dynamoComponentDeployment.Name
1035

1036
	resourceAnnotations := opt.dynamoComponentDeployment.Spec.Annotations
1037
1038
1039
1040
1041

	if resourceAnnotations == nil {
		resourceAnnotations = make(map[string]string)
	}

1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
	// Resolve checkpoint for this component
	var checkpointInfo *checkpoint.CheckpointInfo
	if opt.dynamoComponentDeployment.Spec.Checkpoint != nil && opt.dynamoComponentDeployment.Spec.Checkpoint.Enabled {
		info, err := checkpoint.ResolveCheckpointForService(ctx, r.Client, opt.dynamoComponentDeployment.Namespace, opt.dynamoComponentDeployment.Spec.Checkpoint)
		if err != nil {
			return nil, errors.Wrap(err, "failed to resolve checkpoint")
		}
		checkpointInfo = info
	}

	podSpec, err := dynamo.GenerateBasePodSpecForController(opt.dynamoComponentDeployment, r.DockerSecretRetriever, r.Config, role, commonconsts.MultinodeDeploymentTypeLWS, checkpointInfo)
1053
	if err != nil {
1054
		err = errors.Wrap(err, "failed to generate base pod spec")
1055
1056
1057
		return nil, err
	}

1058
	// Ensure we have at least one container (the main container should be there from GenerateBasePodSpec)
1059
	if len(podSpec.Containers) == 0 {
1060
		return nil, errors.New("no containers found in base pod spec")
1061
1062
	}

1063
	podLabels[commonconsts.KubeLabelDynamoSelector] = kubeName
1064

1065
	extraPodMetadata := opt.dynamoComponentDeployment.Spec.ExtraPodMetadata
1066
1067

	if extraPodMetadata != nil {
1068
1069
		maps.Copy(podAnnotations, extraPodMetadata.Annotations)
		maps.Copy(podLabels, extraPodMetadata.Labels)
1070
	}
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
	// Restore labels are operator-controlled. Clear any stale/user-provided
	// value after metadata merge; the controller re-adds it only when the
	// checkpoint contract below is satisfied.
	delete(podLabels, commonconsts.KubeLabelIsRestoreTarget)

	// Explicit restore orchestration contract:
	// only mark pods as restore targets when checkpoint material is ready.
	if checkpointInfo != nil && checkpointInfo.Enabled && checkpointInfo.Ready {
		podLabels[commonconsts.KubeLabelIsRestoreTarget] = commonconsts.KubeLabelValueTrue
		if checkpointInfo.Hash != "" {
			podLabels[commonconsts.KubeLabelCheckpointHash] = checkpointInfo.Hash
		}
	}
1084

1085
1086
1087
1088
1089
1090
	// Propagate restart annotation to pod template to trigger rolling restart
	// This is the same mechanism used by kubectl rollout restart
	if restartAt, exists := resourceAnnotations[commonconsts.RestartAnnotation]; exists {
		podAnnotations[commonconsts.RestartAnnotation] = restartAt
	}

1091
1092
	if podSpec.ServiceAccountName == "" {
		serviceAccounts := &corev1.ServiceAccountList{}
1093
		err = r.List(ctx, serviceAccounts, client.InNamespace(opt.dynamoComponentDeployment.Namespace), client.MatchingLabels{
1094
			commonconsts.KubeLabelDynamoComponentPod: commonconsts.KubeLabelValueTrue,
1095
1096
		})
		if err != nil {
1097
			err = errors.Wrapf(err, "failed to list service accounts in namespace %s", opt.dynamoComponentDeployment.Namespace)
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
			return
		}
		if len(serviceAccounts.Items) > 0 {
			podSpec.ServiceAccountName = serviceAccounts.Items[0].Name
		} else {
			podSpec.ServiceAccountName = DefaultServiceAccountName
		}
	}

	podTemplateSpec = &corev1.PodTemplateSpec{
		ObjectMeta: metav1.ObjectMeta{
			Labels:      podLabels,
			Annotations: podAnnotations,
		},
1112
		Spec: *podSpec,
1113
1114
1115
1116
1117
	}

	return
}

1118
func (r *DynamoComponentDeploymentReconciler) generateService(opt generateResourceOption) (*corev1.Service, bool, error) {
1119
	dcd := opt.dynamoComponentDeployment
1120

1121
	deleteStub := &corev1.Service{
1122
		ObjectMeta: metav1.ObjectMeta{
1123
1124
			Name:      dcd.Name,
			Namespace: dcd.Namespace,
1125
1126
1127
		},
	}

1128
	isK8sDiscovery := commonController.IsK8sDiscoveryEnabled(r.Config.Discovery.Backend, dcd.Spec.Annotations)
1129

1130
1131
	if !(isK8sDiscovery || dcd.IsFrontendComponent()) {
		return deleteStub, true, nil
1132
1133
	}

1134
1135
	if dcd.Spec.DynamoNamespace == nil {
		return nil, false, fmt.Errorf("expected DynamoComponentDeployment %s to have a dynamoNamespace", dcd.Name)
1136
1137
	}

1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
	svc, err := dynamo.GenerateComponentService(dynamo.ComponentServiceParams{
		ServiceName:     dcd.Name,
		Namespace:       dcd.Namespace,
		ComponentType:   dcd.Spec.ComponentType,
		DynamoNamespace: *dcd.Spec.DynamoNamespace,
		ComponentName:   dcd.Spec.ServiceName,
		Labels:          r.getKubeLabels(dcd),
		Annotations:     r.getKubeAnnotations(dcd),
		IsK8sDiscovery:  isK8sDiscovery,
	})
	if err != nil {
		return nil, false, err
1150
	}
1151
1152
	if dcd.IsMultinode() {
		svc.Spec.Selector["role"] = "leader"
1153
	}
1154
	return svc, false, nil
1155
1156
1157
}

// SetupWithManager sets up the controller with the Manager.
1158
func (r *DynamoComponentDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error {
1159
	m := ctrl.NewControllerManagedBy(mgr).
1160
		For(&v1alpha1.DynamoComponentDeployment{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
1161
		Named(commonconsts.ResourceTypeDynamoComponentDeployment).
1162
1163
1164
1165
1166
1167
1168
		Owns(&appsv1.Deployment{}, builder.WithPredicates(predicate.Funcs{
			// ignore creation cause we don't want to be called again after we create the deployment
			CreateFunc:  func(ce event.CreateEvent) bool { return false },
			DeleteFunc:  func(de event.DeleteEvent) bool { return true },
			UpdateFunc:  func(de event.UpdateEvent) bool { return true },
			GenericFunc: func(ge event.GenericEvent) bool { return true },
		})).
1169
1170
1171
		Owns(&corev1.Service{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
		Owns(&networkingv1.Ingress{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
		Owns(&corev1.PersistentVolumeClaim{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
1172
		WithEventFilter(commonController.EphemeralDeploymentEventFilter(r.Config, r.RuntimeConfig))
1173

1174
	if r.RuntimeConfig.LWSEnabled {
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
		m.Owns(&leaderworkersetv1.LeaderWorkerSet{}, builder.WithPredicates(predicate.Funcs{
			// ignore creation cause we don't want to be called again after we create the LeaderWorkerSet
			CreateFunc:  func(ce event.CreateEvent) bool { return false },
			DeleteFunc:  func(de event.DeleteEvent) bool { return true },
			UpdateFunc:  func(de event.UpdateEvent) bool { return true },
			GenericFunc: func(ge event.GenericEvent) bool { return true },
		})).
			Owns(&volcanov1beta1.PodGroup{}, builder.WithPredicates(predicate.Funcs{
				// ignore creation cause we don't want to be called again after we create the LeaderWorkerSet
				CreateFunc:  func(ce event.CreateEvent) bool { return false },
				DeleteFunc:  func(de event.DeleteEvent) bool { return true },
				UpdateFunc:  func(de event.UpdateEvent) bool { return true },
				GenericFunc: func(ge event.GenericEvent) bool { return true },
			}))
	}

1191
	if r.Config.Ingress.UseVirtualService() {
1192
1193
		m.Owns(&networkingv1beta1.VirtualService{}, builder.WithPredicates(predicate.GenerationChangedPredicate{}))
	}
1194
	m.Owns(&autoscalingv2.HorizontalPodAutoscaler{})
1195
1196
1197
	// Wrap with metrics collection
	observedReconciler := observability.NewObservedReconciler(r, commonconsts.ResourceTypeDynamoComponentDeployment)
	return m.Complete(observedReconciler)
1198
}
1199
1200
1201
1202

func (r *DynamoComponentDeploymentReconciler) GetRecorder() record.EventRecorder {
	return r.Recorder
}