graph.go 66.2 KB
Newer Older
1
/*
2
 * SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
 * SPDX-License-Identifier: Apache-2.0
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

18
package dynamo
19
20
21

import (
	"context"
22
	"encoding/json"
23
	"fmt"
24
	"maps"
25
	"regexp"
26
	"sort"
27
28
	"strings"

29
30
31
32
	istioNetworking "istio.io/api/networking/v1beta1"

	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/util/intstr"
33
	"k8s.io/utils/ptr"
34

35
	configv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/config/v1alpha1"
36
	"github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
37
	"github.com/ai-dynamo/dynamo/deploy/operator/internal/checkpoint"
38
39
40
	commonconsts "github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
	"github.com/ai-dynamo/dynamo/deploy/operator/internal/controller_common"
	"github.com/ai-dynamo/dynamo/deploy/operator/internal/discovery"
41
	grovev1alpha1 "github.com/ai-dynamo/grove/operator/api/core/v1alpha1"
42
43
	"github.com/imdario/mergo"
	networkingv1beta1 "istio.io/client-go/pkg/apis/networking/v1beta1"
44
	corev1 "k8s.io/api/core/v1"
45
	networkingv1 "k8s.io/api/networking/v1"
46
	ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
47
48
)

49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
// RestartState holds the restart state for DGD services.
type RestartState struct {
	// Timestamp is the restart timestamp to apply as the annotation value.
	// Format: RFC3339
	Timestamp string
	// ServicesToAnnotate is the set of service names that should have the restart annotation.
	ServicesToAnnotate map[string]bool
}

// ShouldAnnotateService returns true if the given service should have a restart annotation.
func (s *RestartState) ShouldAnnotateService(serviceName string) bool {
	if s == nil || s.ServicesToAnnotate == nil {
		return false
	}
	return s.ServicesToAnnotate[serviceName]
}

// DetermineRestartState computes the restart state for DGD services.
func DetermineRestartState(dgd *v1alpha1.DynamoGraphDeployment, restartStatus *v1alpha1.RestartStatus) *RestartState {
	if restartStatus == nil {
		return nil
	}

	if dgd.Spec.Restart == nil || dgd.Spec.Restart.ID == "" {
		// Check if there's a completed restart we need to preserve
		if restartStatus.ObservedID != "" {
			return &RestartState{
				Timestamp:          restartStatus.ObservedID,
				ServicesToAnnotate: getAllServiceNames(dgd),
			}
		}
		return nil
	}

	specID := dgd.Spec.Restart.ID

	isNewRestart := restartStatus.ObservedID == "" ||
		dgd.Spec.Restart.ID != restartStatus.ObservedID

88
89
90
91
92
93
	if !isNewRestart && restartStatus.Phase == v1alpha1.RestartPhaseSuperseded {
		// Superseded: don't push any new annotations. Existing annotations
		// are preserved via the existingRestartAnnotations fallback path.
		return nil
	}

94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
	if !isNewRestart && restartStatus.Phase == v1alpha1.RestartPhaseCompleted {
		return &RestartState{
			Timestamp:          specID,
			ServicesToAnnotate: getAllServiceNames(dgd),
		}
	}

	if IsParallelRestart(dgd) {
		return &RestartState{
			Timestamp:          specID,
			ServicesToAnnotate: getAllServiceNames(dgd),
		}
	}

	// Sequential restart (default or specified)
	return &RestartState{
		Timestamp:          specID,
		ServicesToAnnotate: getServicesToAnnotateForSequentialRestart(dgd, restartStatus),
	}
}

// getAllServiceNames returns a map of all service names in the DGD.
func getAllServiceNames(dgd *v1alpha1.DynamoGraphDeployment) map[string]bool {
	services := make(map[string]bool, len(dgd.Spec.Services))
	for serviceName := range dgd.Spec.Services {
		services[serviceName] = true
	}
	return services
}

// IsParallelRestart returns true if the restart strategy is parallel.
func IsParallelRestart(dgd *v1alpha1.DynamoGraphDeployment) bool {
	if dgd.Spec.Restart == nil || dgd.Spec.Restart.Strategy == nil {
		return false // Default is sequential
	}
	return dgd.Spec.Restart.Strategy.Type == v1alpha1.RestartStrategyTypeParallel
}

// getServicesToAnnotateForSequentialRestart determines which services should be annotated
// for a sequential restart in progress.
func getServicesToAnnotateForSequentialRestart(dgd *v1alpha1.DynamoGraphDeployment, status *v1alpha1.RestartStatus) map[string]bool {
	services := make(map[string]bool)

	order := GetRestartOrder(dgd)
	if len(order) == 0 {
		return services
	}

	// New restart or Pending phase - only first service needs to be annotated
	if status == nil ||
		status.Phase == v1alpha1.RestartPhasePending ||
		len(status.InProgress) == 0 {
		services[order[0]] = true
		return services
	}

	// Find the max index among in-progress services
	inProgress := make(map[string]bool)
	for _, svc := range status.InProgress {
		inProgress[svc] = true
	}

	maxIndex := -1
	for i, svc := range order {
		if inProgress[svc] {
			if i > maxIndex {
				maxIndex = i
			}
		}
	}

	// Add all services up to and including maxIndex
	// Services before the in-progress one have completed and need their annotation preserved
	if maxIndex >= 0 {
		for i := 0; i <= maxIndex; i++ {
			services[order[i]] = true
		}
	}

	return services
}

// GetRestartOrder returns the order of services for sequential restart.
// If not specified, returns a deterministic alphabetical order.
func GetRestartOrder(dgd *v1alpha1.DynamoGraphDeployment) []string {
	if dgd.Spec.Restart != nil && dgd.Spec.Restart.Strategy != nil && len(dgd.Spec.Restart.Strategy.Order) > 0 {
		return dgd.Spec.Restart.Strategy.Order
	}

	order := make([]string, 0, len(dgd.Spec.Services))
	for serviceName := range dgd.Spec.Services {
		order = append(order, serviceName)
	}
	sort.Strings(order)
	return order
}

191
// ServiceConfig represents the YAML configuration structure for a service
192
type DynamoConfig struct {
193
194
195
196
	Enabled       bool   `yaml:"enabled"`
	Namespace     string `yaml:"namespace"`
	Name          string `yaml:"name"`
	ComponentType string `yaml:"component_type,omitempty"`
197
198
199
200
201
202
203
204
205
206
207
208
}

type Traffic struct {
	Timeout int `yaml:"timeout"`
}

type Autoscaling struct {
	MinReplicas int `yaml:"min_replicas"`
	MaxReplicas int `yaml:"max_replicas"`
}

type Config struct {
209
210
211
212
213
214
215
216
217
	Dynamo       *DynamoConfig          `yaml:"dynamo,omitempty"`
	Resources    *Resources             `yaml:"resources,omitempty"`
	Traffic      *Traffic               `yaml:"traffic,omitempty"`
	Autoscaling  *Autoscaling           `yaml:"autoscaling,omitempty"`
	HttpExposed  bool                   `yaml:"http_exposed,omitempty"`
	ApiEndpoints []string               `yaml:"api_endpoints,omitempty"`
	Workers      *int32                 `yaml:"workers,omitempty"`
	TotalGpus    *int32                 `yaml:"total_gpus,omitempty"`
	ExtraPodSpec *v1alpha1.ExtraPodSpec `yaml:"extraPodSpec,omitempty"`
218
219
220
221
222
223
224
225
}

type ServiceConfig struct {
	Name         string              `yaml:"name"`
	Dependencies []map[string]string `yaml:"dependencies,omitempty"`
	Config       Config              `yaml:"config"`
}

226
227
228
229
230
231
232
type Resources struct {
	CPU    *string           `yaml:"cpu,omitempty" json:"cpu,omitempty"`
	Memory *string           `yaml:"memory,omitempty" json:"memory,omitempty"`
	GPU    *string           `yaml:"gpu,omitempty" json:"gpu,omitempty"`
	Custom map[string]string `yaml:"custom,omitempty" json:"custom,omitempty"`
}

233
234
235
236
237
238
239
240
241
242
243
244
245
type DynDeploymentConfig = map[string]*DynDeploymentServiceConfig

// ServiceConfig represents the configuration for a specific service
type DynDeploymentServiceConfig struct {
	ServiceArgs *ServiceArgs `json:"ServiceArgs,omitempty"`
}

// ServiceArgs represents the arguments that can be passed to any service
type ServiceArgs struct {
	Workers   *int32     `json:"workers,omitempty"`
	Resources *Resources `json:"resources,omitempty"`
}

246
247
248
249
250
251
252
func (s ServiceConfig) GetNamespace() *string {
	if s.Config.Dynamo == nil || s.Config.Dynamo.Namespace == "" {
		return nil
	}
	return &s.Config.Dynamo.Namespace
}

253
254
255
256
257
258
func ParseDynDeploymentConfig(ctx context.Context, jsonContent []byte) (DynDeploymentConfig, error) {
	var config DynDeploymentConfig
	err := json.Unmarshal(jsonContent, &config)
	return config, err
}

259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
func (r RollingUpdateContext) InProgress() bool {
	return len(r.OldWorkerReplicas) > 0
}

// RollingUpdateContext provides information about an in-progress rolling update.
type RollingUpdateContext struct {
	// NewWorkerHash is the short hash (8 chars) for the new worker spec, used for DCD naming
	NewWorkerHash string
	// OldWorkerReplicas maps service name to the desired replica count for old workers.
	// Used by the controller to patch old worker DCDs directly.
	// Calculated as: max(0, desiredReplicas - newReadyReplicas)
	OldWorkerReplicas map[string]int32
	// NewWorkerReplicas maps service name to the desired replica count for new workers.
	// Calculated as: min(desiredReplicas, newReadyReplicas + 1) to gradually scale up.
	NewWorkerReplicas map[string]int32
}

// GenerateDynamoComponentsDeployments generates a map of DynamoComponentDeployments from a DynamoGraphConfig.
// The map key is a unique identifier for each DCD (serviceName).
func GenerateDynamoComponentsDeployments(
	ctx context.Context,
	parentDGD *v1alpha1.DynamoGraphDeployment,
	defaultIngressSpec *v1alpha1.IngressSpec,
	restartState *RestartState,
	existingRestartAnnotations map[string]string,
	rollingUpdateCtx RollingUpdateContext,
) (map[string]*v1alpha1.DynamoComponentDeployment, error) {
286
	deployments := make(map[string]*v1alpha1.DynamoComponentDeployment)
287

288
289
290
291
292
293
	// Generate DCDs for each service
	for componentName, component := range parentDGD.Spec.Services {
		dynamoNamespace := parentDGD.GetDynamoNamespaceForService(component)
		dcd, err := generateSingleDCD(ctx, parentDGD, componentName, component, dynamoNamespace, defaultIngressSpec, restartState, existingRestartAnnotations, rollingUpdateCtx)
		if err != nil {
			return nil, err
294
		}
295
296
		deployments[componentName] = dcd
	}
297

298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
	return deployments, nil
}

func GetDynamoNamespace(object metav1.Object, service *v1alpha1.DynamoComponentDeploymentSharedSpec) string {
	return v1alpha1.ComputeDynamoNamespace(service.GlobalDynamoNamespace, object.GetNamespace(), object.GetName())
}

// generateSingleDCD creates a DynamoComponentDeployment for a single service.
func generateSingleDCD(
	ctx context.Context,
	parentDGD *v1alpha1.DynamoGraphDeployment,
	componentName string,
	component *v1alpha1.DynamoComponentDeploymentSharedSpec,
	dynamoNamespace string,
	defaultIngressSpec *v1alpha1.IngressSpec,
	restartState *RestartState,
	existingRestartAnnotations map[string]string,
	rollingUpdateCtx RollingUpdateContext,
) (*v1alpha1.DynamoComponentDeployment, error) {
	deployment := &v1alpha1.DynamoComponentDeployment{}
	deployment.Spec.DynamoComponentDeploymentSharedSpec = *component
	deployment.Name = GetDCDResourceName(parentDGD, componentName, rollingUpdateCtx.NewWorkerHash)
	deployment.Spec.BackendFramework = parentDGD.Spec.BackendFramework
	deployment.Namespace = parentDGD.Namespace
	deployment.Spec.ServiceName = componentName
	deployment.Spec.DynamoNamespace = &dynamoNamespace

	labels := make(map[string]string)
326
	maps.Copy(labels, component.Labels)
327
328
329
	labels[commonconsts.KubeLabelDynamoComponent] = componentName
	labels[commonconsts.KubeLabelDynamoNamespace] = dynamoNamespace
	labels[commonconsts.KubeLabelDynamoGraphDeploymentName] = parentDGD.Name
330
331
	deployment.Spec.Labels = labels
	deployment.Labels = labels
332
333
334
335
336
337

	// only label worker DCDs with their hash for cleanup during rolling updates
	if IsWorkerComponent(component.ComponentType) {
		labels[commonconsts.KubeLabelDynamoWorkerHash] = rollingUpdateCtx.NewWorkerHash
	}

338
	propagateDGDAnnotations(parentDGD.GetAnnotations(), &deployment.Spec.DynamoComponentDeploymentSharedSpec)
339
	propagateDGDSpecMetadata(parentDGD.Spec.Annotations, parentDGD.Spec.Labels, &deployment.Spec.DynamoComponentDeploymentSharedSpec)
340
341
342
343
344

	// Apply restart annotation if this service should be restarted.
	if restartState.ShouldAnnotateService(componentName) {
		if deployment.Spec.Annotations == nil {
			deployment.Spec.Annotations = make(map[string]string)
345
		}
346
347
348
349
350
351
352
		deployment.Spec.Annotations[commonconsts.RestartAnnotation] = restartState.Timestamp
	} else if existingRestartAnnotations != nil {
		if existingRestartAt, ok := existingRestartAnnotations[componentName]; ok && existingRestartAt != "" {
			if deployment.Spec.Annotations == nil {
				deployment.Spec.Annotations = make(map[string]string)
			}
			deployment.Spec.Annotations[commonconsts.RestartAnnotation] = existingRestartAt
353
354
		}
	}
355

356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
	if component.ComponentType == commonconsts.ComponentTypePlanner {
		if deployment.Spec.ExtraPodSpec == nil {
			deployment.Spec.ExtraPodSpec = &v1alpha1.ExtraPodSpec{}
		}
		if deployment.Spec.ExtraPodSpec.PodSpec == nil {
			deployment.Spec.ExtraPodSpec.PodSpec = &corev1.PodSpec{}
		}
		deployment.Spec.ExtraPodSpec.PodSpec.ServiceAccountName = commonconsts.PlannerServiceAccountName
	}

	if deployment.IsFrontendComponent() && defaultIngressSpec != nil && deployment.Spec.Ingress == nil {
		deployment.Spec.Ingress = defaultIngressSpec
	}

	if len(parentDGD.Spec.Envs) > 0 {
		deployment.Spec.Envs = MergeEnvs(parentDGD.Spec.Envs, deployment.Spec.Envs)
	}

	if err := updateDynDeploymentConfig(deployment, commonconsts.DynamoServicePort); err != nil {
		return nil, err
	}
	if err := overrideWithDynDeploymentConfig(ctx, deployment); err != nil {
		return nil, err
	}

	// during a rolling update, the replica count is determined by the rollingUpdateCtx instead of the component spec
	if rollingUpdateCtx.InProgress() && IsWorkerComponent(component.ComponentType) && rollingUpdateCtx.NewWorkerReplicas[componentName] != 0 {
		deployment.Spec.Replicas = ptr.To(rollingUpdateCtx.NewWorkerReplicas[componentName])
	} else if component.Replicas != nil {
		deployment.Spec.Replicas = component.Replicas
	}

	return deployment, nil
389
390
}

391
392
393
// updateDynDeploymentConfig updates the runtime config object for the given dynamoDeploymentComponent
// It updates the port for the given service (if it is the main component)
func updateDynDeploymentConfig(dynamoDeploymentComponent *v1alpha1.DynamoComponentDeployment, newPort int) error {
394
	if dynamoDeploymentComponent.IsFrontendComponent() {
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
		dynamoDeploymentConfig := dynamoDeploymentComponent.GetDynamoDeploymentConfig()
		if dynamoDeploymentConfig != nil {
			var config map[string]any
			if err := json.Unmarshal(dynamoDeploymentConfig, &config); err != nil {
				return fmt.Errorf("failed to unmarshal %v: %w", commonconsts.DynamoDeploymentConfigEnvVar, err)
			}
			// Safely navigate and update the config
			if serviceConfig, ok := config[dynamoDeploymentComponent.Spec.ServiceName].(map[string]any); ok {
				serviceConfig["port"] = newPort
			}
			// Marshal back to JSON string
			updated, err := json.Marshal(config)
			if err != nil {
				return fmt.Errorf("failed to marshal updated config: %w", err)
			}
			dynamoDeploymentComponent.SetDynamoDeploymentConfig(updated)
		}
	}
	return nil
}

func overrideWithDynDeploymentConfig(ctx context.Context, dynamoDeploymentComponent *v1alpha1.DynamoComponentDeployment) error {
	dynamoDeploymentConfig := dynamoDeploymentComponent.GetDynamoDeploymentConfig()
	if dynamoDeploymentConfig == nil {
		return nil
	}
	dynDeploymentConfig, err := ParseDynDeploymentConfig(ctx, dynamoDeploymentConfig)
	if err != nil {
		return fmt.Errorf("failed to parse %v: %w", commonconsts.DynamoDeploymentConfigEnvVar, err)
	}
	componentDynConfig := dynDeploymentConfig[dynamoDeploymentComponent.Spec.ServiceName]
	if componentDynConfig != nil {
		if componentDynConfig.ServiceArgs != nil && componentDynConfig.ServiceArgs.Workers != nil {
			dynamoDeploymentComponent.Spec.Replicas = componentDynConfig.ServiceArgs.Workers
		}
		if componentDynConfig.ServiceArgs != nil && componentDynConfig.ServiceArgs.Resources != nil {
431
432
			requests := &v1alpha1.ResourceItem{}
			limits := &v1alpha1.ResourceItem{}
433
			if dynamoDeploymentComponent.Spec.Resources == nil {
434
				dynamoDeploymentComponent.Spec.Resources = &v1alpha1.Resources{
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
					Requests: requests,
					Limits:   limits,
				}
			} else {
				if dynamoDeploymentComponent.Spec.Resources.Requests != nil {
					requests = dynamoDeploymentComponent.Spec.Resources.Requests
				} else {
					dynamoDeploymentComponent.Spec.Resources.Requests = requests
				}
				if dynamoDeploymentComponent.Spec.Resources.Limits != nil {
					limits = dynamoDeploymentComponent.Spec.Resources.Limits
				} else {
					dynamoDeploymentComponent.Spec.Resources.Limits = limits
				}
			}
			if componentDynConfig.ServiceArgs.Resources.GPU != nil {
				requests.GPU = *componentDynConfig.ServiceArgs.Resources.GPU
				limits.GPU = *componentDynConfig.ServiceArgs.Resources.GPU
			}
			if componentDynConfig.ServiceArgs.Resources.CPU != nil {
				requests.CPU = *componentDynConfig.ServiceArgs.Resources.CPU
				limits.CPU = *componentDynConfig.ServiceArgs.Resources.CPU
			}
			if componentDynConfig.ServiceArgs.Resources.Memory != nil {
				requests.Memory = *componentDynConfig.ServiceArgs.Resources.Memory
				limits.Memory = *componentDynConfig.ServiceArgs.Resources.Memory
			}
			if componentDynConfig.ServiceArgs.Resources.Custom != nil {
				requests.Custom = componentDynConfig.ServiceArgs.Resources.Custom
				limits.Custom = componentDynConfig.ServiceArgs.Resources.Custom
			}
		}
	}
	return nil
}

471
func MergeEnvs(common, specific []corev1.EnvVar) []corev1.EnvVar {
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
	envMap := make(map[string]corev1.EnvVar)

	// Add all common environment variables.
	for _, env := range common {
		envMap[env.Name] = env
	}

	// Override or add with service-specific environment variables.
	for _, env := range specific {
		envMap[env.Name] = env
	}

	// Convert the map back to a slice.
	merged := make([]corev1.EnvVar, 0, len(envMap))
	for _, env := range envMap {
		merged = append(merged, env)
	}
489
490
491
	sort.Slice(merged, func(i, j int) bool {
		return merged[i].Name < merged[j].Name
	})
492
493
	return merged
}
494

495
496
497
498
499
500
501
502
503
// GetDCDResourceName returns the Kubernetes resource name for a DynamoComponentDeployment.
// If using for a non DCD resource (i.e. Ingress or VirtualService), use the empty string for the workerSuffix.
// For DCD Resources, Worker components include the workerSuffix; for non-workers, workerSuffix is ignored
func GetDCDResourceName(dgd *v1alpha1.DynamoGraphDeployment, serviceName string, workerSuffix string) string {
	baseName := fmt.Sprintf("%s-%s", dgd.Name, strings.ToLower(serviceName))
	if spec := dgd.Spec.Services[serviceName]; spec != nil && IsWorkerComponent(spec.ComponentType) && workerSuffix != "" {
		return baseName + "-" + workerSuffix
	}
	return baseName
504
}
505
506
507
508
509

type SecretsRetriever interface {
	GetSecrets(namespace, registry string) ([]string, error)
}

510
511
512
513
514
515
516
517
518
519
520
521
func resolveImagePullSecrets(retriever SecretsRetriever, namespace, image string) []corev1.LocalObjectReference {
	names, err := retriever.GetSecrets(namespace, image)
	if err != nil {
		return nil
	}
	refs := make([]corev1.LocalObjectReference, 0, len(names))
	for _, name := range names {
		refs = append(refs, corev1.LocalObjectReference{Name: name})
	}
	return refs
}

522
// applyCliqueStartupDependencies configures StartsAfter dependencies for cliques in a PodCliqueSet
523
524
525
526
527
528
// based on the backend framework and multinode deployment patterns.
//
// Rules:
// - For VLLM and SGLang: worker cliques start after leader clique
// - For TRTLLM: leader clique starts after worker cliques
// - Only applies to multinode deployments (numberOfNodes > 1)
529
// - Sets the PodCliqueSet StartupType to Explicit if any dependencies are configured
530
func applyCliqueStartupDependencies(
531
	gangSet *grovev1alpha1.PodCliqueSet,
532
533
534
535
	roles []ServiceRole,
	backendFramework BackendFramework,
	numberOfNodes int32,
) {
536
537
538
	// enabled for TRTLLM multinode deployments only
	// TODO: reactivate for all backends when we have a better way to handle the readiness probe for the leader.
	enabled := backendFramework == BackendFrameworkTRTLLM && numberOfNodes > 1
539

540
	if !enabled {
541
542
		return // No dependencies for single-node deployments
	}
543

544
545
546
	// Build maps of leader and worker clique names
	var leaderCliqueName string
	var workerCliqueNames []string
547

548
549
550
551
552
553
554
	for _, r := range roles {
		cliqueName := strings.ToLower(r.Name)
		switch r.Role {
		case RoleLeader:
			leaderCliqueName = cliqueName
		case RoleWorker:
			workerCliqueNames = append(workerCliqueNames, cliqueName)
555
		}
556
557
558
559
560
561
562
563
564
565
566
567
	}

	// Apply dependencies to cliques
	hasDependencies := false
	for _, clique := range gangSet.Spec.Template.Cliques {
		// Find the corresponding role for this clique
		var cliqueRole Role
		for _, r := range roles {
			if strings.ToLower(r.Name) == clique.Name {
				cliqueRole = r.Role
				break
			}
568
569
		}

570
571
572
573
574
		// Determine dependencies for this clique
		startsAfter := getCliqueStartupDependencies(cliqueRole, backendFramework, leaderCliqueName, workerCliqueNames)
		if len(startsAfter) > 0 {
			clique.Spec.StartsAfter = startsAfter
			hasDependencies = true
575
		}
576
	}
577

578
579
580
581
582
583
	// Set explicit startup type if we have any dependencies
	if hasDependencies {
		explicitStartupType := grovev1alpha1.CliqueStartupTypeExplicit
		gangSet.Spec.Template.StartupType = &explicitStartupType
	}
}
584

585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
// getCliqueStartupDependencies determines the StartsAfter dependencies for a clique
// based on its role, backend framework, and available leader/worker clique names.
//
// Rules:
// - For VLLM and SGLang: worker cliques start after leader clique
// - For TRTLLM: leader clique starts after worker cliques
// - For other backends or single-node deployments: no dependencies
func getCliqueStartupDependencies(
	role Role,
	backendFramework BackendFramework,
	leaderCliqueName string,
	workerCliqueNames []string,
) []string {
	switch backendFramework {
	case BackendFrameworkVLLM, BackendFrameworkSGLang:
		// For vllm and sglang: worker cliques start after leader clique
		if role == RoleWorker && leaderCliqueName != "" {
			return []string{leaderCliqueName}
		}
	case BackendFrameworkTRTLLM:
		// For trtllm: leader clique starts after worker cliques
		if role == RoleLeader && len(workerCliqueNames) > 0 {
			return workerCliqueNames
608
609
		}
	}
610
611
612

	// No dependencies for other cases
	return nil
613
614
}

615
616
617
618
619
620
621
622
623
624
625
626
627
// ComponentServiceParams contains all the fields needed to generate a Kubernetes
// Service for a Dynamo component, independent of whether the caller is the DGD
// (Grove) or DCD controller.
type ComponentServiceParams struct {
	ServiceName     string
	Namespace       string
	ComponentType   string
	DynamoNamespace string
	ComponentName   string // original user-provided name, used in selector
	Labels          map[string]string
	Annotations     map[string]string
	IsK8sDiscovery  bool
}
628

629
func GenerateComponentService(params ComponentServiceParams) (*corev1.Service, error) {
630
	var servicePort corev1.ServicePort
631
	switch params.ComponentType {
632
	case commonconsts.ComponentTypeFrontend:
633
634
635
636
637
638
		servicePort = corev1.ServicePort{
			Name:       commonconsts.DynamoServicePortName,
			Port:       commonconsts.DynamoServicePort,
			TargetPort: intstr.FromString(commonconsts.DynamoContainerPortName),
			Protocol:   corev1.ProtocolTCP,
		}
639
640
641
642
643
644
645
646
647
	case commonconsts.ComponentTypeEPP:
		servicePort = corev1.ServicePort{
			Name:        commonconsts.EPPGRPCPortName,
			Port:        commonconsts.EPPGRPCPort,
			TargetPort:  intstr.FromInt(commonconsts.EPPGRPCPort),
			Protocol:    corev1.ProtocolTCP,
			AppProtocol: ptr.To("http2"),
		}
	default:
648
649
650
651
652
653
654
		servicePort = corev1.ServicePort{
			Name:       commonconsts.DynamoSystemPortName,
			Port:       commonconsts.DynamoSystemPort,
			TargetPort: intstr.FromString(commonconsts.DynamoSystemPortName),
			Protocol:   corev1.ProtocolTCP,
		}
	}
655
656

	labels := make(map[string]string)
657
	for k, v := range params.Labels {
658
659
		labels[k] = v
	}
660
	if params.IsK8sDiscovery {
661
662
663
664
		labels[commonconsts.KubeLabelDynamoDiscoveryBackend] = "kubernetes"
		labels[commonconsts.KubeLabelDynamoDiscoveryEnabled] = commonconsts.KubeLabelValueTrue
	}

665
666
667
668
669
670
671
672
673
674
675
	selector := map[string]string{
		commonconsts.KubeLabelDynamoComponentType: params.ComponentType,
		commonconsts.KubeLabelDynamoNamespace:     params.DynamoNamespace,
		commonconsts.KubeLabelDynamoComponent:     params.ComponentName,
	}

	annotations := make(map[string]string)
	for k, v := range params.Annotations {
		annotations[k] = v
	}

676
677
	service := &corev1.Service{
		ObjectMeta: metav1.ObjectMeta{
678
679
680
			// Service names must be DNS-1035 labels (no dots). Replace dots with
			// hyphens so model names like "Qwen3-0.6B" don't cause rejections.
			Name:        strings.ReplaceAll(params.ServiceName, ".", "-"),
681
682
683
			Namespace:   params.Namespace,
			Labels:      labels,
			Annotations: annotations,
684
685
		},
		Spec: corev1.ServiceSpec{
686
687
			Selector: selector,
			Ports:    []corev1.ServicePort{servicePort},
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
		},
	}
	return service, nil
}

func GenerateComponentIngress(ctx context.Context, componentName, componentNamespace string, ingressSpec v1alpha1.IngressSpec) *networkingv1.Ingress {
	resourceName := componentName
	ingress := &networkingv1.Ingress{
		ObjectMeta: metav1.ObjectMeta{
			Name:      resourceName,
			Namespace: componentNamespace,
		},
	}
	host := getIngressHost(ingressSpec)
	ingress.Spec = networkingv1.IngressSpec{
		IngressClassName: ingressSpec.IngressControllerClassName,
		Rules: []networkingv1.IngressRule{
			{
				Host: host,
				IngressRuleValue: networkingv1.IngressRuleValue{
					HTTP: &networkingv1.HTTPIngressRuleValue{
						Paths: []networkingv1.HTTPIngressPath{
							{
								Path:     "/",
								PathType: &[]networkingv1.PathType{networkingv1.PathTypePrefix}[0],
								Backend: networkingv1.IngressBackend{
									Service: &networkingv1.IngressServiceBackend{
										Name: resourceName,
										Port: networkingv1.ServiceBackendPort{
											Number: commonconsts.DynamoServicePort,
										},
									},
								},
							},
						},
					},
				},
			},
		},
	}
	if ingressSpec.TLS != nil {
		ingress.Spec.TLS = []networkingv1.IngressTLS{
			{
				Hosts:      []string{host},
				SecretName: ingressSpec.TLS.SecretName,
			},
		}
	}
	return ingress
}

func getIngressHost(ingressSpec v1alpha1.IngressSpec) string {
	host := ingressSpec.Host
	if ingressSpec.HostPrefix != nil {
		host = *ingressSpec.HostPrefix + host
	}
	ingressSuffix := commonconsts.DefaultIngressSuffix
	if ingressSpec.HostSuffix != nil {
		ingressSuffix = *ingressSpec.HostSuffix
	}
	return fmt.Sprintf("%s.%s", host, ingressSuffix)
}

func GenerateComponentVirtualService(ctx context.Context, componentName, componentNamespace string, ingressSpec v1alpha1.IngressSpec) *networkingv1beta1.VirtualService {
	vs := &networkingv1beta1.VirtualService{
		ObjectMeta: metav1.ObjectMeta{
			Name:      componentName,
			Namespace: componentNamespace,
		},
	}
758
759
760
761
762
763
764
765
766
767
768
769
770
	if ingressSpec.IsVirtualServiceEnabled() {
		vs.Spec = istioNetworking.VirtualService{
			Hosts: []string{
				getIngressHost(ingressSpec),
			},
			Gateways: []string{*ingressSpec.VirtualServiceGateway},
			Http: []*istioNetworking.HTTPRoute{
				{
					Match: []*istioNetworking.HTTPMatchRequest{
						{
							Uri: &istioNetworking.StringMatch{
								MatchType: &istioNetworking.StringMatch_Prefix{Prefix: "/"},
							},
771
772
						},
					},
773
774
775
776
777
778
779
					Route: []*istioNetworking.HTTPRouteDestination{
						{
							Destination: &istioNetworking.Destination{
								Host: componentName,
								Port: &istioNetworking.PortSelector{
									Number: commonconsts.DynamoServicePort,
								},
780
781
782
783
784
							},
						},
					},
				},
			},
785
		}
786
787
788
789
	}
	return vs
}

790
func GenerateDefaultIngressSpec(dynamoDeployment *v1alpha1.DynamoGraphDeployment, ingressConfig configv1alpha1.IngressConfiguration) v1alpha1.IngressSpec {
791
	res := v1alpha1.IngressSpec{
792
		Enabled:           ingressConfig.VirtualServiceGateway != "" || ingressConfig.ControllerClassName != "",
793
794
795
		Host:              dynamoDeployment.Name,
		UseVirtualService: ingressConfig.VirtualServiceGateway != "",
	}
796
797
	if ingressConfig.ControllerClassName != "" {
		res.IngressControllerClassName = &ingressConfig.ControllerClassName
798
	}
799
	if ingressConfig.ControllerTLSSecretName != "" {
800
		res.TLS = &v1alpha1.IngressTLSSpec{
801
			SecretName: ingressConfig.ControllerTLSSecretName,
802
803
		}
	}
804
805
	if ingressConfig.HostSuffix != "" {
		res.HostSuffix = &ingressConfig.HostSuffix
806
807
808
809
810
811
	}
	if ingressConfig.VirtualServiceGateway != "" {
		res.VirtualServiceGateway = &ingressConfig.VirtualServiceGateway
	}
	return res
}
812
813
814
815
816
817
818

// Define Role enum for leader/worker/main
// Use this type everywhere instead of string for role

type Role string

const (
819
820
821
822
	RoleLeader     Role = "leader"
	RoleWorker     Role = "worker"
	RoleMain       Role = "main"
	RoleCheckpoint Role = "checkpoint"
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
)

// Update ServiceRole struct for expandRolesForService

type ServiceRole struct {
	Name     string
	Role     Role
	Replicas int32
}

// Update expandRolesForService to use Role
func expandRolesForService(serviceName string, serviceReplicas *int32, numberOfNodes int32) []ServiceRole {
	var roles []ServiceRole
	if numberOfNodes > 1 {
		roles = append(roles, ServiceRole{Name: serviceName + "-" + commonconsts.GroveRoleSuffixLeader, Role: RoleLeader, Replicas: 1})
		roles = append(roles, ServiceRole{Name: serviceName + "-" + commonconsts.GroveRoleSuffixWorker, Role: RoleWorker, Replicas: numberOfNodes - 1})
	} else {
840
841
842
843
844
		replicas := int32(1)
		if serviceReplicas != nil {
			replicas = *serviceReplicas
		}
		roles = append(roles, ServiceRole{Name: serviceName, Role: RoleMain, Replicas: replicas})
845
846
847
848
849
850
851
852
853
854
855
856
	}
	return roles
}

// Define BackendFramework enum for sglang, vllm, trtllm

type BackendFramework string

const (
	BackendFrameworkSGLang BackendFramework = "sglang"
	BackendFrameworkVLLM   BackendFramework = "vllm"
	BackendFrameworkTRTLLM BackendFramework = "trtllm"
857
	BackendFrameworkNoop   BackendFramework = "noop"
858
859
)

860
861
862
863
864
865
866
867
868
869
870
871
// ParseBackendFramework converts a string to BackendFramework type.
// Returns an error if the framework string is not recognized.
func ParseBackendFramework(framework string) (BackendFramework, error) {
	bf := BackendFramework(framework)
	switch bf {
	case BackendFrameworkVLLM, BackendFrameworkSGLang, BackendFrameworkTRTLLM, BackendFrameworkNoop:
		return bf, nil
	default:
		return "", fmt.Errorf("unsupported backend framework: %s (valid values: vllm, sglang, trtllm)", framework)
	}
}

872
873
874
// Backend interface for modular backend logic
// Each backend (SGLang, VLLM, etc.) implements this interface
type Backend interface {
875
	UpdateContainer(container *corev1.Container, numberOfNodes int32, role Role, component *v1alpha1.DynamoComponentDeploymentSharedSpec, serviceName string, multinodeDeployer MultinodeDeployer)
876
	UpdatePodSpec(podSpec *corev1.PodSpec, numberOfNodes int32, role Role, component *v1alpha1.DynamoComponentDeploymentSharedSpec, serviceName string, multinodeDeployer MultinodeDeployer)
877
878
879
880
881
}

// NoopBackend does no processing - used for non-worker components like frontend, planner, router
type NoopBackend struct{}

882
func (b *NoopBackend) UpdateContainer(container *corev1.Container, numberOfNodes int32, role Role, component *v1alpha1.DynamoComponentDeploymentSharedSpec, serviceName string, multinodeDeployer MultinodeDeployer) {
883
884
885
	// No-op: frontend, planner, router, etc. don't need backend-specific processing
}

886
func (b *NoopBackend) UpdatePodSpec(podSpec *corev1.PodSpec, numberOfNodes int32, role Role, component *v1alpha1.DynamoComponentDeploymentSharedSpec, serviceName string, multinodeDeployer MultinodeDeployer) {
887
888
889
	// No-op: frontend, planner, router, etc. don't need backend-specific processing
}

890
891
892
type MultinodeDeployer interface {
	GetLeaderHostname(serviceName string) string
	GetHostNames(serviceName string, numberOfNodes int32) []string
893
	GetNodeRank() (string, bool) // returns (rank, needsShellInterpretation)
894
	NeedsDNSWait() bool          // returns true if DNS wait is needed to launch multinode components
895
896
}

897
// BackendFactory creates backend instances based on the framework type
898
func BackendFactory(backendFramework BackendFramework, operatorConfig *configv1alpha1.OperatorConfiguration, parentGraphDeploymentName string) Backend {
899
900
901
902
	switch backendFramework {
	case BackendFrameworkSGLang:
		return &SGLangBackend{}
	case BackendFrameworkVLLM:
903
		return &VLLMBackend{ParentGraphDeploymentName: parentGraphDeploymentName}
904
	case BackendFrameworkTRTLLM:
905
		return &TRTLLMBackend{
906
			MpiRunSecretName: operatorConfig.MPI.SSHSecretName,
907
		}
908
909
910
911
912
913
914
	case BackendFrameworkNoop:
		return &NoopBackend{}
	default:
		return nil
	}
}

915
916
917
918
919
920
921
922
923
924
925
func MultinodeDeployerFactory(multinodeDeploymentType commonconsts.MultinodeDeploymentType) MultinodeDeployer {
	switch multinodeDeploymentType {
	case commonconsts.MultinodeDeploymentTypeGrove:
		return &GroveMultinodeDeployer{}
	case commonconsts.MultinodeDeploymentTypeLWS:
		return &LWSMultinodeDeployer{}
	default:
		return nil
	}
}

926
927
// IsWorkerComponent checks if a component is a worker that needs backend framework detection
func IsWorkerComponent(componentType string) bool {
928
929
930
	return componentType == commonconsts.ComponentTypeWorker ||
		componentType == commonconsts.ComponentTypePrefill ||
		componentType == commonconsts.ComponentTypeDecode
931
932
}

933
934
935
// AddStandardEnvVars adds the standard environment variables that are common to
// both checkpoint jobs and generated worker pods.
func AddStandardEnvVars(container *corev1.Container, operatorConfig *configv1alpha1.OperatorConfiguration) {
936
	standardEnvVars := []corev1.EnvVar{}
937
	if operatorConfig.Infrastructure.NATSAddress != "" {
938
		standardEnvVars = append(standardEnvVars, corev1.EnvVar{
939
			Name:  "NATS_SERVER",
940
			Value: operatorConfig.Infrastructure.NATSAddress,
941
942
943
		})
	}

944
	if operatorConfig.Infrastructure.ETCDAddress != "" {
945
		standardEnvVars = append(standardEnvVars, corev1.EnvVar{
946
			Name:  "ETCD_ENDPOINTS",
947
			Value: operatorConfig.Infrastructure.ETCDAddress,
948
949
		})
	}
950

951
	if operatorConfig.Infrastructure.ModelExpressURL != "" {
952
		standardEnvVars = append(standardEnvVars, corev1.EnvVar{
953
			Name:  "MODEL_EXPRESS_URL",
954
			Value: operatorConfig.Infrastructure.ModelExpressURL,
955
956
		})
	}
957
	if operatorConfig.Infrastructure.PrometheusEndpoint != "" {
958
959
		standardEnvVars = append(standardEnvVars, corev1.EnvVar{
			Name:  "PROMETHEUS_ENDPOINT",
960
			Value: operatorConfig.Infrastructure.PrometheusEndpoint,
961
962
963
964
		})
	}
	// merge the env vars to allow users to override the standard env vars
	container.Env = MergeEnvs(standardEnvVars, container.Env)
965
966
}

967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
// applyDefaultSecurityContext sets secure defaults for pod security context.
// Currently only sets fsGroup to solve volume permission issues.
// Does NOT set runAsUser/runAsGroup/runAsNonRoot to maintain backward compatibility
// with images that may expect to run as root.
// User-provided security context values (via extraPodSpec) will override these defaults.
func applyDefaultSecurityContext(podSpec *corev1.PodSpec) {
	// Initialize SecurityContext if not present
	if podSpec.SecurityContext == nil {
		podSpec.SecurityContext = &corev1.PodSecurityContext{}
	}

	// Only set fsGroup by default
	// This fixes volume permission issues without forcing a specific UID/GID
	// which maintains compatibility with both root and non-root images
	if podSpec.SecurityContext.FSGroup == nil {
		podSpec.SecurityContext.FSGroup = ptr.To(int64(commonconsts.DefaultSecurityContextFSGroup))
	}
}

986
987
988
// GenerateBasePodSpec creates a basic PodSpec with common logic shared between controller and grove
// Includes standard environment variables (DYNAMO_PORT, NATS_SERVER, ETCD_ENDPOINTS)
// Deployment-specific environment merging should be handled by the caller
989
990
//
//nolint:gocyclo
991
func GenerateBasePodSpec(
992
	component *v1alpha1.DynamoComponentDeploymentSharedSpec,
993
994
	backendFramework BackendFramework,
	secretsRetriever SecretsRetriever,
995
	parentGraphDeploymentName string,
996
997
998
	namespace string,
	role Role,
	numberOfNodes int32,
999
	operatorConfig *configv1alpha1.OperatorConfiguration,
1000
1001
	multinodeDeploymentType commonconsts.MultinodeDeploymentType,
	serviceName string,
1002
	checkpointInfo *checkpoint.CheckpointInfo, // Optional checkpoint info (resolved by ResolveCheckpointForService)
1003
) (*corev1.PodSpec, error) {
1004
	// Start with base container generated per component type
1005
	componentContext := generateComponentContext(component, parentGraphDeploymentName, namespace, numberOfNodes, NewDiscoveryContext(operatorConfig.Discovery.Backend, component.Annotations))
1006
1007
	componentDefaults := ComponentDefaultsFactory(component.ComponentType)
	container, err := componentDefaults.GetBaseContainer(componentContext)
1008
	if err != nil {
1009
		return nil, fmt.Errorf("failed to get base container: %w", err)
1010
	}
1011

1012
1013
1014
1015
	if component.ExtraPodSpec != nil && component.ExtraPodSpec.MainContainer != nil {
		main := component.ExtraPodSpec.MainContainer.DeepCopy()
		if main != nil {
			// merge the extraPodSpec from the parent deployment with the extraPodSpec from the service
1016
			containerEnvs := container.Env
1017
			err = mergo.Merge(&container, *main, mergo.WithOverride)
1018
			if err != nil {
1019
				return nil, fmt.Errorf("failed to merge extraPodSpec: %w", err)
1020
			}
1021
1022

			// main container fields that require special handling
1023
			container.Env = MergeEnvs(containerEnvs, container.Env)
1024
1025
1026
1027
1028
			// Note: startup probe does not have its own top level field so it must be passed in extraPodSpec.MainContainer
			// We want to overwrite entirely if provided rather than merge
			if main.StartupProbe != nil {
				container.StartupProbe = main.StartupProbe
			}
1029
1030
		}
	}
1031
	container.Env = MergeEnvs(container.Env, component.Envs)
1032

1033
1034
1035
1036
1037
1038
1039
1040
1041
	// Merge probes entirely if they are passed (no partial merge)
	if component.LivenessProbe != nil {
		container.LivenessProbe = component.LivenessProbe.DeepCopy()
	}
	if component.ReadinessProbe != nil {
		container.ReadinessProbe = component.ReadinessProbe.DeepCopy()
	}

	overrideResources, err := controller_common.GetResourcesConfig(component.Resources)
1042
	if err != nil {
1043
		return nil, fmt.Errorf("failed to get resources config: %w", err)
1044
	}
1045
1046
1047
1048
1049
1050
	// Requests
	if overrideResources != nil && len(overrideResources.Requests) > 0 {
		if container.Resources.Requests == nil {
			container.Resources.Requests = corev1.ResourceList{}
		}
		maps.Copy(container.Resources.Requests, overrideResources.Requests)
1051
	}
1052
1053
1054
1055
1056
1057
1058
1059
1060

	// Limits
	if overrideResources != nil && len(overrideResources.Limits) > 0 {
		if container.Resources.Limits == nil {
			container.Resources.Limits = corev1.ResourceList{}
		}
		maps.Copy(container.Resources.Limits, overrideResources.Limits)
	}

1061
1062
1063
1064
1065
1066
1067
1068
	// Claims
	if overrideResources != nil && len(overrideResources.Claims) > 0 {
		if container.Resources.Claims == nil {
			container.Resources.Claims = []corev1.ResourceClaim{}
		}
		container.Resources.Claims = append(container.Resources.Claims, overrideResources.Claims...)
	}

1069
1070
	shouldDisableImagePullSecret := component.Annotations[commonconsts.KubeAnnotationDisableImagePullSecretDiscovery] == commonconsts.KubeLabelValueTrue

1071
	imagePullSecrets := []corev1.LocalObjectReference{}
1072
	if !shouldDisableImagePullSecret && secretsRetriever != nil && component.ExtraPodSpec != nil && component.ExtraPodSpec.MainContainer != nil && component.ExtraPodSpec.MainContainer.Image != "" {
1073
		imagePullSecrets = resolveImagePullSecrets(secretsRetriever, namespace, component.ExtraPodSpec.MainContainer.Image)
1074
1075
1076
1077
1078
1079
1080
1081
1082
	}
	if component.EnvFromSecret != nil {
		container.EnvFrom = append(container.EnvFrom, corev1.EnvFromSource{
			SecretRef: &corev1.SecretEnvSource{
				LocalObjectReference: corev1.LocalObjectReference{Name: *component.EnvFromSecret},
			},
		})
	}

1083
	AddStandardEnvVars(&container, operatorConfig)
1084

1085
	volumes := make([]corev1.Volume, 0, len(component.VolumeMounts)+1) // +1 for shared memory volume
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104

	for _, volumeMount := range component.VolumeMounts {
		if volumeMount.Name == "" {
			return nil, fmt.Errorf("volumeMount.name is required when volumeMounts is set")
		}

		// Determine mount point
		mountPoint := volumeMount.MountPoint
		if volumeMount.UseAsCompilationCache && mountPoint == "" {
			// Use backend-specific default for compilation cache
			defaultMountPoint := getDefaultCompilationCacheMountPoint(backendFramework)
			if defaultMountPoint == "" {
				return nil, fmt.Errorf("volumeMount with useAsCompilationCache=true requires an explicit mountPoint for backend framework %s (no default available)", backendFramework)
			}
			mountPoint = defaultMountPoint
		} else if !volumeMount.UseAsCompilationCache && mountPoint == "" {
			return nil, fmt.Errorf("volumeMount.mountPoint is required when useAsCompilationCache is false")
		}

1105
		volumes = append(volumes, corev1.Volume{
1106
			Name: volumeMount.Name,
1107
1108
			VolumeSource: corev1.VolumeSource{
				PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
1109
					ClaimName: volumeMount.Name,
1110
1111
1112
				},
			},
		})
1113

1114
		container.VolumeMounts = append(container.VolumeMounts, corev1.VolumeMount{
1115
1116
			Name:      volumeMount.Name,
			MountPath: mountPoint,
1117
1118
1119
		})
	}
	// Apply backend-specific container modifications
1120
1121
	multinodeDeployer := MultinodeDeployerFactory(multinodeDeploymentType)
	if multinodeDeployer == nil {
1122
		return nil, fmt.Errorf("unsupported multinode deployment type: %s", multinodeDeploymentType)
1123
	}
1124
	backend := BackendFactory(backendFramework, operatorConfig, parentGraphDeploymentName)
1125
	if backend == nil {
1126
		return nil, fmt.Errorf("unsupported backend framework: %s", backendFramework)
1127
	}
1128
	backend.UpdateContainer(&container, numberOfNodes, role, component, serviceName, multinodeDeployer)
1129
1130

	// get base podspec from component
1131
	podSpec, err := componentDefaults.GetBasePodSpec(componentContext)
1132
	if err != nil {
1133
		return nil, fmt.Errorf("failed to get base podspec: %w", err)
1134
1135
	}

1136
1137
1138
1139
1140
	// Check if user provided their own security context before merging
	userProvidedSecurityContext := component.ExtraPodSpec != nil &&
		component.ExtraPodSpec.PodSpec != nil &&
		component.ExtraPodSpec.PodSpec.SecurityContext != nil

1141
	if component.ExtraPodSpec != nil && component.ExtraPodSpec.PodSpec != nil {
1142
1143
1144
		// merge extraPodSpec PodSpec with base podspec
		err := mergo.Merge(&podSpec, component.ExtraPodSpec.PodSpec.DeepCopy(), mergo.WithOverride)
		if err != nil {
1145
			return nil, fmt.Errorf("failed to merge extraPodSpec: %w", err)
1146
		}
1147
	}
1148

1149
1150
1151
1152
1153
1154
1155
	// Apply default security context ONLY if user didn't provide any security context
	// If user provides ANY securityContext (even partial), they get full control with no defaults injected
	// This allows users to intentionally set fields to nil (e.g., to run as root)
	if !userProvidedSecurityContext {
		applyDefaultSecurityContext(&podSpec)
	}

1156
	if controller_common.IsK8sDiscoveryEnabled(operatorConfig.Discovery.Backend, component.Annotations) {
1157
1158
1159
		if podSpec.ServiceAccountName == "" {
			podSpec.ServiceAccountName = discovery.GetK8sDiscoveryServiceAccountName(parentGraphDeploymentName)
		}
1160
1161
	}

1162
	podSpec.Volumes = append(podSpec.Volumes, volumes...)
1163
1164
	ApplySharedMemoryVolumeAndMount(&podSpec, &container, component.SharedMemory)
	podSpec.Containers = append(podSpec.Containers, container)
1165
	podSpec.ImagePullSecrets = controller_common.AppendUniqueImagePullSecrets(podSpec.ImagePullSecrets, imagePullSecrets)
1166

1167
	backend.UpdatePodSpec(&podSpec, numberOfNodes, role, component, serviceName, multinodeDeployer)
1168

1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
	// Inject auto-generated frontend sidecar if configured
	if component.FrontendSidecar != nil {
		sidecar, err := generateFrontendSidecar(component.FrontendSidecar, componentContext, operatorConfig)
		if err != nil {
			return nil, fmt.Errorf("failed to generate frontend sidecar: %w", err)
		}
		podSpec.Containers = append(podSpec.Containers, sidecar)

		if !shouldDisableImagePullSecret && secretsRetriever != nil {
			podSpec.ImagePullSecrets = controller_common.AppendUniqueImagePullSecrets(
				podSpec.ImagePullSecrets,
				resolveImagePullSecrets(secretsRetriever, namespace, component.FrontendSidecar.Image),
			)
		}
	}

1185
1186
1187
1188
1189
1190
1191
1192
	// Inject GMS sidecar with DRA shared GPU access when GPU memory service is enabled.
	if isGMSEnabled(component) {
		claimTemplateName := GMSResourceClaimTemplateName(parentGraphDeploymentName, serviceName)
		if err := applyGPUMemoryService(&podSpec, component, claimTemplateName); err != nil {
			return nil, fmt.Errorf("failed to apply GPU memory service: %w", err)
		}
	}

1193
	return &podSpec, nil
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
}

func setMetricsLabels(labels map[string]string, dynamoGraphDeployment *v1alpha1.DynamoGraphDeployment) {
	// Convert user-provided metrics annotation into controller-managed label
	// By default (no annotation), metrics are enabled
	if metricsAnnotationValue, ok := dynamoGraphDeployment.Annotations[commonconsts.KubeAnnotationEnableMetrics]; ok && metricsAnnotationValue == commonconsts.KubeLabelValueFalse {
		// Explicitly disabled, don't add the label
		return
	}
	// Any other value (including empty) enables metrics
	labels[commonconsts.KubeLabelMetricsEnabled] = commonconsts.KubeLabelValueTrue
}

1207
func generateComponentContext(component *v1alpha1.DynamoComponentDeploymentSharedSpec, parentGraphDeploymentName string, namespace string, numberOfNodes int32, discovery DiscoveryContext) ComponentContext {
1208
	dynamoNamespace := v1alpha1.ComputeDynamoNamespace(component.GlobalDynamoNamespace, namespace, parentGraphDeploymentName)
1209
1210
1211
1212
1213
	var workerHashSuffix string
	if IsWorkerComponent(component.ComponentType) && component.Labels[commonconsts.KubeLabelDynamoWorkerHash] != "" {
		workerHashSuffix = component.Labels[commonconsts.KubeLabelDynamoWorkerHash]
	}

1214
1215
	componentContext := ComponentContext{
		numberOfNodes:                  numberOfNodes,
1216
		ComponentType:                  component.ComponentType,
1217
1218
		ParentGraphDeploymentName:      parentGraphDeploymentName,
		ParentGraphDeploymentNamespace: namespace,
1219
		Discovery:                      discovery,
1220
		DynamoNamespace:                dynamoNamespace,
1221
		EPPConfig:                      component.EPPConfig,
1222
		WorkerHashSuffix:               workerHashSuffix,
1223
1224
1225
1226
	}
	return componentContext
}

1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
// generateFrontendSidecar builds a fully configured frontend sidecar container
// using the same FrontendDefaults logic as standalone frontend services.
// This eliminates the need for users to manually specify Dynamo env vars, probes,
// and ports when running the frontend as a sidecar (e.g., GAIE deployments).
func generateFrontendSidecar(
	spec *v1alpha1.FrontendSidecarSpec,
	parentContext ComponentContext,
	operatorConfig *configv1alpha1.OperatorConfiguration,
) (corev1.Container, error) {
	frontendContext := ComponentContext{
		numberOfNodes:                  1,
		ComponentType:                  commonconsts.ComponentTypeFrontend,
		ParentGraphDeploymentName:      parentContext.ParentGraphDeploymentName,
		ParentGraphDeploymentNamespace: parentContext.ParentGraphDeploymentNamespace,
1241
		Discovery:                      parentContext.Discovery,
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
		DynamoNamespace:                parentContext.DynamoNamespace,
	}

	frontendDefaults := NewFrontendDefaults()
	container, err := frontendDefaults.GetBaseContainer(frontendContext)
	if err != nil {
		return corev1.Container{}, fmt.Errorf("failed to get frontend base container: %w", err)
	}

	container.Name = commonconsts.FrontendSidecarContainerName
	container.Image = spec.Image

	if len(spec.Args) > 0 {
		container.Args = spec.Args
	}

	if spec.EnvFromSecret != nil {
		container.EnvFrom = append(container.EnvFrom, corev1.EnvFromSource{
			SecretRef: &corev1.SecretEnvSource{
				LocalObjectReference: corev1.LocalObjectReference{Name: *spec.EnvFromSecret},
			},
		})
	}

	if len(spec.Envs) > 0 {
		container.Env = MergeEnvs(container.Env, spec.Envs)
	}

1270
	AddStandardEnvVars(&container, operatorConfig)
1271
1272
1273
1274

	return container, nil
}

1275
1276
// GeneratePodSpecForComponent creates a PodSpec for Grove deployments (simplified wrapper)
func GeneratePodSpecForComponent(
1277
	component *v1alpha1.DynamoComponentDeploymentSharedSpec,
1278
1279
1280
1281
1282
	backendFramework BackendFramework,
	secretsRetriever SecretsRetriever,
	dynamoDeployment *v1alpha1.DynamoGraphDeployment,
	role Role,
	numberOfNodes int32,
1283
	operatorConfig *configv1alpha1.OperatorConfiguration,
1284
1285
	multinodeDeploymentType commonconsts.MultinodeDeploymentType,
	serviceName string,
1286
	checkpointInfo *checkpoint.CheckpointInfo, // Optional checkpoint info
1287
) (*corev1.PodSpec, error) {
1288
1289
1290
	if len(dynamoDeployment.Spec.Envs) > 0 {
		component.Envs = MergeEnvs(dynamoDeployment.Spec.Envs, component.Envs)
	}
1291
1292

	propagateDGDAnnotations(dynamoDeployment.GetAnnotations(), component)
1293
	propagateDGDSpecMetadata(dynamoDeployment.Spec.Annotations, dynamoDeployment.Spec.Labels, component)
1294

1295
	podSpec, err := GenerateBasePodSpec(component, backendFramework, secretsRetriever, dynamoDeployment.Name, dynamoDeployment.Namespace, role, numberOfNodes, operatorConfig, multinodeDeploymentType, serviceName, checkpointInfo)
1296
	if err != nil {
1297
		return nil, err
1298
1299
1300
1301
	}
	return podSpec, nil
}

1302
1303
1304
1305
1306
1307
// dgdPropagatedAnnotationKeys lists DGD metadata annotations that are propagated
// to component-level annotations (for both the DCD/controller and Grove paths).
// Service-level annotations take precedence (are never overwritten).
var dgdPropagatedAnnotationKeys = []string{
	commonconsts.KubeAnnotationEnableMetrics,
	commonconsts.KubeAnnotationDynamoDiscoveryBackend,
1308
	commonconsts.KubeAnnotationDynamoKubeDiscoveryMode,
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
	commonconsts.KubeAnnotationDynamoOperatorOriginVersion,
	commonconsts.KubeAnnotationVLLMDistributedExecutorBackend,
}

// propagateDGDAnnotations copies DGD-level annotations into the component
// annotations so that downstream logic can read them uniformly.
// Service-level annotations take precedence (are never overwritten).
func propagateDGDAnnotations(dgdAnnotations map[string]string, component *v1alpha1.DynamoComponentDeploymentSharedSpec) {
	for _, key := range dgdPropagatedAnnotationKeys {
		if val, exists := dgdAnnotations[key]; exists {
			if component.Annotations == nil {
				component.Annotations = make(map[string]string)
			}
			if _, serviceHas := component.Annotations[key]; !serviceHas {
				component.Annotations[key] = val
			}
		}
	}
}

1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
// propagateDGDSpecMetadata merges DGD spec-level annotations and labels into
// the component as a low-priority base. Service-level values take precedence.
func propagateDGDSpecMetadata(annotations, labels map[string]string, component *v1alpha1.DynamoComponentDeploymentSharedSpec) {
	for k, v := range annotations {
		if component.Annotations == nil {
			component.Annotations = make(map[string]string)
		}
		if _, exists := component.Annotations[k]; !exists {
			component.Annotations[k] = v
		}
	}
	for k, v := range labels {
		if component.Labels == nil {
			component.Labels = make(map[string]string)
		}
		if _, exists := component.Labels[k]; !exists {
			component.Labels[k] = v
		}
	}
}

1350
1351
// GenerateGrovePodCliqueSet generates a Grove PodCliqueSet for the given deployment, supporting both single-node and multinode cases.
func GenerateGrovePodCliqueSet(
1352
1353
	ctx context.Context,
	dynamoDeployment *v1alpha1.DynamoGraphDeployment,
1354
1355
	operatorConfig *configv1alpha1.OperatorConfiguration,
	runtimeConfig *controller_common.RuntimeConfig,
1356
	kubeClient ctrlclient.Reader,
1357
	secretsRetriever SecretsRetriever,
1358
1359
	restartState *RestartState,
	existingRestartAnnotations map[string]string,
1360
	checkpointInfoByService map[string]*checkpoint.CheckpointInfo, // Optional checkpoint info per service
1361
1362
) (*grovev1alpha1.PodCliqueSet, error) {
	gangSet := &grovev1alpha1.PodCliqueSet{}
1363
1364
	gangSet.Name = dynamoDeployment.Name
	gangSet.Namespace = dynamoDeployment.Namespace
1365
1366
	gangSet.Labels = maps.Clone(dynamoDeployment.Spec.Labels)
	gangSet.Annotations = maps.Clone(dynamoDeployment.Spec.Annotations)
1367
1368
1369
1370
	gangSet.Spec.Replicas = 1
	gangSet.Spec.Template.HeadlessServiceConfig = &grovev1alpha1.HeadlessServiceConfig{
		PublishNotReadyAddresses: true,
	}
1371
	gangSet.Spec.Template.StartupType = ptr.To(grovev1alpha1.CliqueStartupTypeAnyOrder)
1372
1373
	if operatorConfig.Orchestrators.Grove.TerminationDelay.Duration > 0 {
		gangSet.Spec.Template.TerminationDelay = &operatorConfig.Orchestrators.Grove.TerminationDelay
1374
	}
1375

1376
1377
1378
1379
	// Inject deployment-level topology constraint (PCS template).
	// specToGroveTopologyConstraint returns nil when input is nil, so this is a no-op without TAS.
	gangSet.Spec.Template.TopologyConstraint = specToGroveTopologyConstraint(dynamoDeployment.Spec.TopologyConstraint)

1380
1381
	// Validate kai-scheduler queue once if kai-scheduler is enabled
	var validatedQueueName string
1382
	if runtimeConfig.GroveEnabled && runtimeConfig.KaiSchedulerEnabled {
1383
1384
1385
1386
1387
1388
1389
		var err error
		validatedQueueName, err = DetermineKaiSchedulerQueue(ctx, dynamoDeployment.Annotations)
		if err != nil {
			return nil, fmt.Errorf("failed to determine kai-scheduler queue: %w", err)
		}
	}

1390
	discoveryBackend := controller_common.GetDiscoveryBackend(operatorConfig.Discovery.Backend, dynamoDeployment.Annotations)
1391
	discoveryContext := NewDiscoveryContext(operatorConfig.Discovery.Backend, dynamoDeployment.Annotations)
1392

1393
1394
	var scalingGroups []grovev1alpha1.PodCliqueScalingGroupConfig
	for serviceName, component := range dynamoDeployment.Spec.Services {
1395
		dynamoNamespace := GetDynamoNamespace(dynamoDeployment, component)
1396
		component.DynamoNamespace = &dynamoNamespace
1397
1398
1399
1400
1401
1402
		// Determine backend framework using hybrid approach
		backendFramework, err := getBackendFrameworkFromComponent(component, dynamoDeployment)
		if err != nil {
			return nil, fmt.Errorf("failed to determine backend framework for service %s: %w", serviceName, err)
		}

1403
1404
1405
1406
		if discoveryBackend != "" {
			if component.Annotations == nil {
				component.Annotations = make(map[string]string)
			}
1407
			component.Annotations[commonconsts.KubeAnnotationDynamoDiscoveryBackend] = string(discoveryBackend)
1408
1409
		}

1410
1411
1412
1413
1414
1415
		// Get checkpoint info for this service if available
		var checkpointInfo *checkpoint.CheckpointInfo
		if checkpointInfoByService != nil {
			checkpointInfo = checkpointInfoByService[serviceName]
		}

1416
		numberOfNodes := component.GetNumberOfNodes()
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
		isMultinode := numberOfNodes > 1
		roles := expandRolesForService(serviceName, component.Replicas, numberOfNodes)
		var cliqueNames []string

		for _, r := range roles {
			podSpec, err := GeneratePodSpecForComponent(
				component,
				backendFramework,
				secretsRetriever,
				dynamoDeployment,
				r.Role,
				numberOfNodes,
1429
				operatorConfig,
1430
1431
				commonconsts.MultinodeDeploymentTypeGrove,
				serviceName,
1432
				checkpointInfo,
1433
1434
1435
1436
			)
			if err != nil {
				return nil, fmt.Errorf("failed to generate podSpec for role %s: %w", r.Name, err)
			}
1437

1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
			if operatorConfig.Checkpoint.Enabled {
				if err := checkpoint.InjectCheckpointIntoPodSpec(
					ctx,
					kubeClient,
					dynamoDeployment.Namespace,
					podSpec,
					checkpointInfo,
				); err != nil {
					return nil, fmt.Errorf("failed to inject checkpoint config for role %s: %w", r.Name, err)
				}
			}
1449

1450
1451
1452
1453
1454
			minAvailable := int32(1)
			if isMultinode {
				minAvailable = r.Replicas
			}

1455
1456
1457
			clique := &grovev1alpha1.PodCliqueTemplateSpec{
				Name: strings.ToLower(r.Name),
				Spec: grovev1alpha1.PodCliqueSpec{
1458
1459
					RoleName:     strings.ToLower(r.Name),
					Replicas:     r.Replicas,
1460
					MinAvailable: ptr.To(minAvailable),
1461
					PodSpec:      *podSpec,
1462
1463
				},
			}
1464
1465
1466
1467
1468
1469
1470

			// For single-node services, set topology constraint directly on the clique.
			// For multinode services, the constraint goes on the PCSG instead;
			// child cliques inherit from PCSG and should NOT have explicit constraints.
			if !isMultinode {
				clique.TopologyConstraint = toGroveTopologyConstraint(component.TopologyConstraint)
			}
1471
			labels, err := generateLabels(component, dynamoDeployment, serviceName, discoveryContext)
1472
1473
1474
1475
1476
1477
1478
1479
			if err != nil {
				return nil, fmt.Errorf("failed to generate labels: %w", err)
			}
			clique.Labels = labels
			annotations, err := generateAnnotations(component)
			if err != nil {
				return nil, fmt.Errorf("failed to generate annotations: %w", err)
			}
1480
			checkpoint.ApplyRestorePodMetadata(labels, annotations, checkpointInfo)
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497

			// Apply restart annotation if this service should be restarted.
			// For services not in the current restart order, preserve their existing annotation
			// to avoid triggering unwanted rollouts when a new restart begins.
			if restartState.ShouldAnnotateService(serviceName) {
				if annotations == nil {
					annotations = make(map[string]string)
				}
				annotations[commonconsts.RestartAnnotation] = restartState.Timestamp
			} else if existingRestartAnnotations != nil {
				if existingTimestamp, ok := existingRestartAnnotations[serviceName]; ok {
					if annotations == nil {
						annotations = make(map[string]string)
					}
					annotations[commonconsts.RestartAnnotation] = existingTimestamp
				}
			}
1498
			clique.Annotations = annotations
1499
1500

			// Inject kai-scheduler settings if enabled
1501
			injectKaiSchedulerIfEnabled(clique, runtimeConfig, validatedQueueName)
1502

1503
1504
1505
1506
1507
1508
1509
1510
1511
			gangSet.Spec.Template.Cliques = append(gangSet.Spec.Template.Cliques, clique)
			cliqueNames = append(cliqueNames, strings.ToLower(r.Name))
		}

		// Apply startup dependencies for this service
		applyCliqueStartupDependencies(gangSet, roles, backendFramework, numberOfNodes)

		if isMultinode {
			scalingGroups = append(scalingGroups, grovev1alpha1.PodCliqueScalingGroupConfig{
1512
1513
1514
1515
1516
				Name:               strings.ToLower(serviceName),
				CliqueNames:        cliqueNames,
				Replicas:           component.Replicas,
				MinAvailable:       ptr.To(int32(1)),
				TopologyConstraint: toGroveTopologyConstraint(component.TopologyConstraint),
1517
1518
1519
1520
1521
1522
1523
			})
		}
	}
	if len(scalingGroups) > 0 {
		gangSet.Spec.Template.PodCliqueScalingGroupConfigs = scalingGroups
	}

1524
	return gangSet, nil
1525
1526
}

1527
1528
1529
1530
func generateLabels(
	component *v1alpha1.DynamoComponentDeploymentSharedSpec,
	dynamoDeployment *v1alpha1.DynamoGraphDeployment,
	componentName string,
1531
	discovery DiscoveryContext,
1532
) (map[string]string, error) {
1533
	labels := make(map[string]string)
1534
	labels[commonconsts.KubeLabelDynamoSelector] = GetDCDResourceName(dynamoDeployment, componentName, "")
1535
	labels[commonconsts.KubeLabelDynamoGraphDeploymentName] = dynamoDeployment.Name
1536
	labels[commonconsts.KubeLabelDynamoComponent] = componentName
1537
1538
1539
	if component.DynamoNamespace != nil {
		labels[commonconsts.KubeLabelDynamoNamespace] = *component.DynamoNamespace
	}
1540
1541
1542
	if component.ComponentType != "" {
		labels[commonconsts.KubeLabelDynamoComponentType] = component.ComponentType
	}
1543
1544
1545
	if component.SubComponentType != "" {
		labels[commonconsts.KubeLabelDynamoSubComponentType] = component.SubComponentType
	}
1546
1547
	// Add base model label if modelRef is specified
	AddBaseModelLabel(labels, component.ModelRef)
1548
	// Merge user-supplied labels first so they cannot overwrite checkpoint labels.
1549
1550
	setMetricsLabels(labels, dynamoDeployment)
	if component.Labels != nil {
1551
		if err := mergo.Merge(&labels, component.Labels, mergo.WithOverride); err != nil {
1552
1553
1554
1555
			return nil, fmt.Errorf("failed to merge labels: %w", err)
		}
	}
	if component.ExtraPodMetadata != nil {
1556
		if err := mergo.Merge(&labels, component.ExtraPodMetadata.Labels, mergo.WithOverride); err != nil {
1557
1558
1559
			return nil, fmt.Errorf("failed to merge extraPodMetadata labels: %w", err)
		}
	}
1560
	// Re-apply system labels after user merge to prevent override
1561
1562
1563
	labels[commonconsts.KubeLabelDynamoGraphDeploymentName] = dynamoDeployment.Name
	if component.ComponentType != "" {
		labels[commonconsts.KubeLabelDynamoComponentType] = component.ComponentType
1564
	}
1565
1566
1567
1568
1569
	if component.DynamoNamespace != nil {
		labels[commonconsts.KubeLabelDynamoNamespace] = *component.DynamoNamespace
	}
	if workerHash := component.Labels[commonconsts.KubeLabelDynamoWorkerHash]; workerHash != "" {
		labels[commonconsts.KubeLabelDynamoWorkerHash] = workerHash
1570
	}
1571
1572
1573
1574
1575
	// Discovery labels on pod template — needed for Pod reflector filtering in container mode
	if discovery.Backend == configv1alpha1.DiscoveryBackendKubernetes {
		labels[commonconsts.KubeLabelDynamoDiscoveryBackend] = "kubernetes"
		labels[commonconsts.KubeLabelDynamoDiscoveryEnabled] = commonconsts.KubeLabelValueTrue
	}
1576
1577
1578
	return labels, nil
}

1579
func generateAnnotations(component *v1alpha1.DynamoComponentDeploymentSharedSpec) (map[string]string, error) {
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
	annotations := make(map[string]string)
	if component.Annotations != nil {
		err := mergo.Merge(&annotations, component.Annotations, mergo.WithOverride)
		if err != nil {
			return nil, fmt.Errorf("failed to merge annotations: %w", err)
		}
	}
	if component.ExtraPodMetadata != nil {
		err := mergo.Merge(&annotations, component.ExtraPodMetadata.Annotations, mergo.WithOverride)
		if err != nil {
			return nil, fmt.Errorf("failed to merge extraPodMetadata annotations: %w", err)
		}
	}
	return annotations, nil
}

// detectBackendFrameworkFromArgs detects the backend framework from command/args
func detectBackendFrameworkFromArgs(command []string, args []string) (BackendFramework, error) {
	// Combine command and args to search through all parts
	allParts := append(command, args...)
	fullCommand := strings.Join(allParts, " ")

	// Pattern to match python -m dynamo.{backend}.something
	patterns := map[BackendFramework]*regexp.Regexp{
		BackendFrameworkVLLM:   regexp.MustCompile(`python[0-9.]*\s+[^|&;]*-m\s+[^|&;]*dynamo\.vllm[^|&;]*`),
		BackendFrameworkSGLang: regexp.MustCompile(`python[0-9.]*\s+[^|&;]*-m\s+[^|&;]*dynamo\.sglang[^|&;]*`),
		BackendFrameworkTRTLLM: regexp.MustCompile(`python[0-9.]*\s+[^|&;]*-m\s+[^|&;]*dynamo\.trtllm[^|&;]*`),
	}

	var detected []BackendFramework
	for framework, pattern := range patterns {
		if pattern.MatchString(fullCommand) {
			detected = append(detected, framework)
		}
	}

	if len(detected) == 0 {
1617
		return BackendFrameworkNoop, nil
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
	}

	if len(detected) > 1 {
		return "", fmt.Errorf("multiple backend frameworks detected from command: %v in %q", detected, fullCommand)
	}

	return detected[0], nil
}

// determineBackendFramework is the core logic for hybrid backend framework detection
// Takes extracted parameters and applies the detection logic
func determineBackendFramework(
	componentType string,
	command []string,
	args []string,
	explicitBackendFramework string,
) (BackendFramework, error) {
	// Check if this is a worker component - if not, use noop backend
1636
	if !IsWorkerComponent(componentType) {
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
		return BackendFrameworkNoop, nil
	}

	// Worker component - apply backend framework detection
	var detectedFramework BackendFramework
	var detectionError error

	// Try to detect from command/args
	if len(command) > 0 || len(args) > 0 {
		detected, err := detectBackendFrameworkFromArgs(command, args)
		if err == nil {
			detectedFramework = detected
		} else {
			detectionError = err
		}
	}

	// Get explicit framework
	var explicitFramework BackendFramework
	if explicitBackendFramework != "" {
		explicitFramework = BackendFramework(explicitBackendFramework)
	}

	// Validate consistency if both detected and explicit exist
1661
	if detectedFramework != "" && detectedFramework != BackendFrameworkNoop && explicitFramework != "" && detectedFramework != explicitFramework {
1662
1663
1664
1665
1666
		return "", fmt.Errorf("backend framework mismatch: detected %q from command but explicitly configured as %q",
			detectedFramework, explicitFramework)
	}

	// Return in order of preference: detected > explicit > error
1667
	if detectedFramework != "" && detectedFramework != BackendFrameworkNoop {
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
		return detectedFramework, nil
	}

	if explicitFramework != "" {
		return explicitFramework, nil
	}

	// If we couldn't detect and no explicit config, return error
	if detectionError != nil {
		return "", fmt.Errorf("could not determine backend framework: %w", detectionError)
	}

	// No command/args to detect from and no explicit config
1681
	return BackendFrameworkNoop, nil
1682
1683
1684
1685
1686
1687
1688
1689
}

// getBackendFrameworkFromComponent attempts to determine backend framework using hybrid approach:
// 1. Check if component is a worker - if not, return noop
// 2. For workers: try to detect from command/args, fall back to explicit config
// 3. Return error if worker has neither detection nor explicit config
// Also validates consistency between detected and explicit if both exist
func getBackendFrameworkFromComponent(
1690
	component *v1alpha1.DynamoComponentDeploymentSharedSpec,
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
	dynamoDeployment *v1alpha1.DynamoGraphDeployment,
) (BackendFramework, error) {
	// Extract command/args from component
	var command, args []string
	if component.ExtraPodSpec != nil && component.ExtraPodSpec.MainContainer != nil {
		command = component.ExtraPodSpec.MainContainer.Command
		args = component.ExtraPodSpec.MainContainer.Args
	}

	// Extract explicit backend framework from deployment
	explicitBackendFramework := dynamoDeployment.Spec.BackendFramework

	return determineBackendFramework(
		component.ComponentType,
		command,
		args,
		explicitBackendFramework,
	)
}

// ConvertDynamoComponentDeploymentToSpec converts a DynamoComponentDeployment to our component spec interface
// This is a helper for the controller to use our backend logic
1713
1714
func ConvertDynamoComponentDeploymentToSpec(dynComponent *v1alpha1.DynamoComponentDeployment) *v1alpha1.DynamoComponentDeploymentSharedSpec {
	return dynComponent.Spec.DynamoComponentDeploymentSharedSpec.DeepCopy()
1715
1716
}

1717
1718
// GetBackendFrameworkFromDynamoComponent determines backend framework for a DynamoComponentDeployment
func GetBackendFrameworkFromDynamoComponent(dynComponent *v1alpha1.DynamoComponentDeployment) (BackendFramework, error) {
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
	// Extract command/args from component
	var command, args []string
	if dynComponent.Spec.ExtraPodSpec != nil && dynComponent.Spec.ExtraPodSpec.MainContainer != nil {
		command = dynComponent.Spec.ExtraPodSpec.MainContainer.Command
		args = dynComponent.Spec.ExtraPodSpec.MainContainer.Args
	}

	// Extract explicit backend framework
	explicitBackendFramework := dynComponent.Spec.BackendFramework

	return determineBackendFramework(
		dynComponent.Spec.ComponentType,
		command,
		args,
		explicitBackendFramework,
	)
}

// GenerateBasePodSpecForController generates a PodSpec using backend logic for controller usage
// This preserves the base pod generation while allowing controller-specific enhancements
func GenerateBasePodSpecForController(
	dynComponent *v1alpha1.DynamoComponentDeployment,
	secretsRetriever SecretsRetriever,
1742
	operatorConfig *configv1alpha1.OperatorConfiguration,
1743
1744
	role Role,
	multinodeDeploymentType commonconsts.MultinodeDeploymentType,
1745
	checkpointInfo *checkpoint.CheckpointInfo, // Optional checkpoint info (resolved by caller)
1746
) (*corev1.PodSpec, error) {
1747
1748
1749
	// Convert to our interface
	componentSpec := ConvertDynamoComponentDeploymentToSpec(dynComponent)

1750
	numberOfNodes := componentSpec.GetNumberOfNodes()
1751
1752

	// Determine backend framework using hybrid approach
1753
	backendFramework, err := GetBackendFrameworkFromDynamoComponent(dynComponent)
1754
	if err != nil {
1755
		return nil, fmt.Errorf("failed to determine backend framework: %w", err)
1756
1757
1758
	}

	// Generate base PodSpec with standard env vars using merged component envs
1759
1760
1761
1762
	serviceName := dynComponent.Spec.ServiceName
	if serviceName == "" {
		serviceName = dynComponent.Name
	}
1763
1764
1765
1766
	podSpec, err := GenerateBasePodSpec(
		componentSpec,
		backendFramework,
		secretsRetriever,
1767
		dynComponent.GetParentGraphDeploymentName(),
1768
1769
1770
		dynComponent.Namespace,
		role,
		numberOfNodes,
1771
		operatorConfig,
1772
1773
		multinodeDeploymentType,
		serviceName,
1774
		checkpointInfo,
1775
1776
	)
	if err != nil {
1777
		return nil, err
1778
1779
1780
1781
1782
	}

	return podSpec, nil
}

1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
// getDefaultCompilationCacheMountPoint returns the default mount point for compilation cache based on backend framework
func getDefaultCompilationCacheMountPoint(backendFramework BackendFramework) string {
	switch backendFramework {
	case BackendFrameworkVLLM:
		return commonconsts.DefaultVLLMCacheMountPoint
	case BackendFrameworkSGLang, BackendFrameworkTRTLLM:
		// SGLang and TensorRT-LLM don't currently support compilation caches
		// Return empty string as these should not be used
		return ""
	default:
		// For unknown backends, don't assume compilation cache support
		return ""
	}
}