Unverified Commit 3d67474e authored by Thomas Montfort's avatar Thomas Montfort Committed by GitHub
Browse files

feat(operator): DynamoGraphDeployment rollout restart mechanism (#5118)

parent f7ba417e
...@@ -10335,8 +10335,6 @@ spec: ...@@ -10335,8 +10335,6 @@ spec:
properties: properties:
conditions: conditions:
description: |- description: |-
INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
Important: Run "make" to regenerate code after modifying this file
Conditions captures the latest observed state of the component (including Conditions captures the latest observed state of the component (including
availability and readiness) using standard Kubernetes condition types. availability and readiness) using standard Kubernetes condition types.
items: items:
...@@ -10393,6 +10391,10 @@ spec: ...@@ -10393,6 +10391,10 @@ spec:
- type - type
type: object type: object
type: array type: array
observedGeneration:
description: ObservedGeneration is the most recent generation observed by the controller.
format: int64
type: integer
podSelector: podSelector:
additionalProperties: additionalProperties:
type: string type: string
......
...@@ -208,6 +208,34 @@ spec: ...@@ -208,6 +208,34 @@ spec:
rule: '!has(self.create) || self.create == false || (has(self.size) && has(self.storageClass) && has(self.volumeAccessMode))' rule: '!has(self.create) || self.create == false || (has(self.size) && has(self.storageClass) && has(self.volumeAccessMode))'
maxItems: 100 maxItems: 100
type: array type: array
restart:
description: Restart specifies the restart policy for the graph deployment.
properties:
id:
description: |-
ID is an arbitrary string that triggers a restart when changed.
Any modification to this value will initiate a restart of the graph deployment according to the strategy.
minLength: 1
type: string
strategy:
description: Strategy specifies the restart strategy for the graph deployment.
properties:
order:
description: Order specifies the order in which the services should be restarted.
items:
type: string
type: array
type:
default: Sequential
description: Type specifies the restart strategy type.
enum:
- Sequential
- Parallel
type: string
type: object
required:
- id
type: object
services: services:
additionalProperties: additionalProperties:
properties: properties:
...@@ -10530,6 +10558,23 @@ spec: ...@@ -10530,6 +10558,23 @@ spec:
- type - type
type: object type: object
type: array type: array
restart:
description: Restart contains the status of the restart of the graph deployment.
properties:
inProgress:
description: InProgress contains the names of the services that are currently being restarted.
items:
type: string
type: array
observedID:
description: |-
ObservedID is the restart ID that has been observed and is being processed.
Matches the Restart.ID field in the spec.
type: string
phase:
description: Phase is the phase of the restart.
type: string
type: object
services: services:
additionalProperties: additionalProperties:
description: ServiceReplicaStatus contains replica information for a single service. description: ServiceReplicaStatus contains replica information for a single service.
......
...@@ -172,6 +172,11 @@ func (i *IngressSpec) IsVirtualServiceEnabled() bool { ...@@ -172,6 +172,11 @@ func (i *IngressSpec) IsVirtualServiceEnabled() bool {
type DynamoComponentDeploymentStatus struct { type DynamoComponentDeploymentStatus struct {
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
// Important: Run "make" to regenerate code after modifying this file // Important: Run "make" to regenerate code after modifying this file
// ObservedGeneration is the most recent generation observed by the controller.
// +optional
ObservedGeneration int64 `json:"observedGeneration,omitempty"`
// Conditions captures the latest observed state of the component (including // Conditions captures the latest observed state of the component (including
// availability and readiness) using standard Kubernetes condition types. // availability and readiness) using standard Kubernetes condition types.
Conditions []metav1.Condition `json:"conditions"` Conditions []metav1.Condition `json:"conditions"`
......
...@@ -60,8 +60,42 @@ type DynamoGraphDeploymentSpec struct { ...@@ -60,8 +60,42 @@ type DynamoGraphDeploymentSpec struct {
// BackendFramework specifies the backend framework (e.g., "sglang", "vllm", "trtllm"). // BackendFramework specifies the backend framework (e.g., "sglang", "vllm", "trtllm").
// +kubebuilder:validation:Enum=sglang;vllm;trtllm // +kubebuilder:validation:Enum=sglang;vllm;trtllm
BackendFramework string `json:"backendFramework,omitempty"` BackendFramework string `json:"backendFramework,omitempty"`
// Restart specifies the restart policy for the graph deployment.
// +kubebuilder:validation:Optional
Restart *Restart `json:"restart,omitempty"`
}
type Restart struct {
// ID is an arbitrary string that triggers a restart when changed.
// Any modification to this value will initiate a restart of the graph deployment according to the strategy.
// +kubebuilder:validation:Required
// +kubebuilder:validation:MinLength=1
ID string `json:"id"`
// Strategy specifies the restart strategy for the graph deployment.
// +kubebuilder:validation:Optional
Strategy *RestartStrategy `json:"strategy,omitempty"`
} }
type RestartStrategy struct {
// Type specifies the restart strategy type.
// +kubebuilder:validation:Enum=Sequential;Parallel
// +kubebuilder:default=Sequential
Type RestartStrategyType `json:"type,omitempty"`
// Order specifies the order in which the services should be restarted.
// +kubebuilder:validation:Optional
Order []string `json:"order,omitempty"`
}
type RestartStrategyType string
const (
RestartStrategyTypeSequential RestartStrategyType = "Sequential"
RestartStrategyTypeParallel RestartStrategyType = "Parallel"
)
// DynamoGraphDeploymentStatus defines the observed state of DynamoGraphDeployment. // DynamoGraphDeploymentStatus defines the observed state of DynamoGraphDeployment.
type DynamoGraphDeploymentStatus struct { type DynamoGraphDeploymentStatus struct {
// State is a high-level textual status of the graph deployment lifecycle. // State is a high-level textual status of the graph deployment lifecycle.
...@@ -73,8 +107,33 @@ type DynamoGraphDeploymentStatus struct { ...@@ -73,8 +107,33 @@ type DynamoGraphDeploymentStatus struct {
// The map key is the service name from spec.services. // The map key is the service name from spec.services.
// +optional // +optional
Services map[string]ServiceReplicaStatus `json:"services,omitempty"` Services map[string]ServiceReplicaStatus `json:"services,omitempty"`
// Restart contains the status of the restart of the graph deployment.
// +optional
Restart *RestartStatus `json:"restart,omitempty"`
} }
// RestartStatus contains the status of the restart of the graph deployment.
type RestartStatus struct {
// ObservedID is the restart ID that has been observed and is being processed.
// Matches the Restart.ID field in the spec.
ObservedID string `json:"observedID,omitempty"`
// Phase is the phase of the restart.
Phase RestartPhase `json:"phase,omitempty"`
// InProgress contains the names of the services that are currently being restarted.
// +optional
InProgress []string `json:"inProgress,omitempty"`
}
type RestartPhase string
const (
RestartPhasePending RestartPhase = "Pending"
RestartPhaseRestarting RestartPhase = "Restarting"
RestartPhaseCompleted RestartPhase = "Completed"
RestartPhaseFailed RestartPhase = "Failed"
)
// ServiceReplicaStatus contains replica information for a single service. // ServiceReplicaStatus contains replica information for a single service.
type ServiceReplicaStatus struct { type ServiceReplicaStatus struct {
// ComponentKind is the underlying resource kind (e.g., "PodClique", "PodCliqueScalingGroup", "Deployment", "LeaderWorkerSet"). // ComponentKind is the underlying resource kind (e.g., "PodClique", "PodCliqueScalingGroup", "Deployment", "LeaderWorkerSet").
......
...@@ -751,6 +751,11 @@ func (in *DynamoGraphDeploymentSpec) DeepCopyInto(out *DynamoGraphDeploymentSpec ...@@ -751,6 +751,11 @@ func (in *DynamoGraphDeploymentSpec) DeepCopyInto(out *DynamoGraphDeploymentSpec
(*in)[i].DeepCopyInto(&(*out)[i]) (*in)[i].DeepCopyInto(&(*out)[i])
} }
} }
if in.Restart != nil {
in, out := &in.Restart, &out.Restart
*out = new(Restart)
(*in).DeepCopyInto(*out)
}
} }
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DynamoGraphDeploymentSpec. // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DynamoGraphDeploymentSpec.
...@@ -780,6 +785,11 @@ func (in *DynamoGraphDeploymentStatus) DeepCopyInto(out *DynamoGraphDeploymentSt ...@@ -780,6 +785,11 @@ func (in *DynamoGraphDeploymentStatus) DeepCopyInto(out *DynamoGraphDeploymentSt
(*out)[key] = *val.DeepCopy() (*out)[key] = *val.DeepCopy()
} }
} }
if in.Restart != nil {
in, out := &in.Restart, &out.Restart
*out = new(RestartStatus)
(*in).DeepCopyInto(*out)
}
} }
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DynamoGraphDeploymentStatus. // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DynamoGraphDeploymentStatus.
...@@ -1211,6 +1221,66 @@ func (in *Resources) DeepCopy() *Resources { ...@@ -1211,6 +1221,66 @@ func (in *Resources) DeepCopy() *Resources {
return out return out
} }
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Restart) DeepCopyInto(out *Restart) {
*out = *in
if in.Strategy != nil {
in, out := &in.Strategy, &out.Strategy
*out = new(RestartStrategy)
(*in).DeepCopyInto(*out)
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Restart.
func (in *Restart) DeepCopy() *Restart {
if in == nil {
return nil
}
out := new(Restart)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *RestartStatus) DeepCopyInto(out *RestartStatus) {
*out = *in
if in.InProgress != nil {
in, out := &in.InProgress, &out.InProgress
*out = make([]string, len(*in))
copy(*out, *in)
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RestartStatus.
func (in *RestartStatus) DeepCopy() *RestartStatus {
if in == nil {
return nil
}
out := new(RestartStatus)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *RestartStrategy) DeepCopyInto(out *RestartStrategy) {
*out = *in
if in.Order != nil {
in, out := &in.Order, &out.Order
*out = make([]string, len(*in))
copy(*out, *in)
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RestartStrategy.
func (in *RestartStrategy) DeepCopy() *RestartStrategy {
if in == nil {
return nil
}
out := new(RestartStrategy)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ScalingAdapter) DeepCopyInto(out *ScalingAdapter) { func (in *ScalingAdapter) DeepCopyInto(out *ScalingAdapter) {
*out = *in *out = *in
......
...@@ -10335,8 +10335,6 @@ spec: ...@@ -10335,8 +10335,6 @@ spec:
properties: properties:
conditions: conditions:
description: |- description: |-
INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
Important: Run "make" to regenerate code after modifying this file
Conditions captures the latest observed state of the component (including Conditions captures the latest observed state of the component (including
availability and readiness) using standard Kubernetes condition types. availability and readiness) using standard Kubernetes condition types.
items: items:
...@@ -10393,6 +10391,10 @@ spec: ...@@ -10393,6 +10391,10 @@ spec:
- type - type
type: object type: object
type: array type: array
observedGeneration:
description: ObservedGeneration is the most recent generation observed by the controller.
format: int64
type: integer
podSelector: podSelector:
additionalProperties: additionalProperties:
type: string type: string
......
...@@ -208,6 +208,34 @@ spec: ...@@ -208,6 +208,34 @@ spec:
rule: '!has(self.create) || self.create == false || (has(self.size) && has(self.storageClass) && has(self.volumeAccessMode))' rule: '!has(self.create) || self.create == false || (has(self.size) && has(self.storageClass) && has(self.volumeAccessMode))'
maxItems: 100 maxItems: 100
type: array type: array
restart:
description: Restart specifies the restart policy for the graph deployment.
properties:
id:
description: |-
ID is an arbitrary string that triggers a restart when changed.
Any modification to this value will initiate a restart of the graph deployment according to the strategy.
minLength: 1
type: string
strategy:
description: Strategy specifies the restart strategy for the graph deployment.
properties:
order:
description: Order specifies the order in which the services should be restarted.
items:
type: string
type: array
type:
default: Sequential
description: Type specifies the restart strategy type.
enum:
- Sequential
- Parallel
type: string
type: object
required:
- id
type: object
services: services:
additionalProperties: additionalProperties:
properties: properties:
...@@ -10530,6 +10558,23 @@ spec: ...@@ -10530,6 +10558,23 @@ spec:
- type - type
type: object type: object
type: array type: array
restart:
description: Restart contains the status of the restart of the graph deployment.
properties:
inProgress:
description: InProgress contains the names of the services that are currently being restarted.
items:
type: string
type: array
observedID:
description: |-
ObservedID is the restart ID that has been observed and is being processed.
Matches the Restart.ID field in the spec.
type: string
phase:
description: Phase is the phase of the restart.
type: string
type: object
services: services:
additionalProperties: additionalProperties:
description: ServiceReplicaStatus contains replica information for a single service. description: ServiceReplicaStatus contains replica information for a single service.
......
...@@ -95,6 +95,8 @@ const ( ...@@ -95,6 +95,8 @@ const (
GroveRoleSuffixWorker = "wkr" GroveRoleSuffixWorker = "wkr"
MainContainerName = "main" MainContainerName = "main"
RestartAnnotation = "nvidia.com/restartAt"
) )
type MultinodeDeploymentType string type MultinodeDeploymentType string
......
...@@ -307,6 +307,16 @@ func (r *DynamoComponentDeploymentReconciler) reconcileDeploymentResources(ctx c ...@@ -307,6 +307,16 @@ func (r *DynamoComponentDeploymentReconciler) reconcileDeploymentResources(ctx c
return ComponentReconcileResult{}, fmt.Errorf("failed to create or update the deployment: %w", err) return ComponentReconcileResult{}, fmt.Errorf("failed to create or update the deployment: %w", err)
} }
logger.V(1).Info("Deployment sync completed",
"deploymentModified", deploymentModified,
"deploymentName", deployment.Name,
"deploymentGeneration", deployment.Generation,
"deploymentObservedGeneration", deployment.Status.ObservedGeneration,
"deploymentReplicas", deployment.Status.Replicas,
"deploymentUpdatedReplicas", deployment.Status.UpdatedReplicas,
"deploymentAvailableReplicas", deployment.Status.AvailableReplicas,
"deploymentReadyReplicas", deployment.Status.ReadyReplicas)
serviceReplicaStatus := &v1alpha1.ServiceReplicaStatus{ serviceReplicaStatus := &v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindDeployment, ComponentKind: v1alpha1.ComponentKindDeployment,
ComponentName: deployment.Name, ComponentName: deployment.Name,
...@@ -317,7 +327,6 @@ func (r *DynamoComponentDeploymentReconciler) reconcileDeploymentResources(ctx c ...@@ -317,7 +327,6 @@ func (r *DynamoComponentDeploymentReconciler) reconcileDeploymentResources(ctx c
} }
if IsDeploymentReady(deployment) { if IsDeploymentReady(deployment) {
logger.Info("Deployment is ready. Setting available status condition to true.")
return ComponentReconcileResult{ return ComponentReconcileResult{
modified: deploymentModified, modified: deploymentModified,
status: metav1.ConditionTrue, status: metav1.ConditionTrue,
...@@ -458,6 +467,7 @@ func (r *DynamoComponentDeploymentReconciler) setStatusConditionAndServiceReplic ...@@ -458,6 +467,7 @@ func (r *DynamoComponentDeploymentReconciler) setStatusConditionAndServiceReplic
meta.SetStatusCondition(&dynamoComponentDeployment.Status.Conditions, condition) meta.SetStatusCondition(&dynamoComponentDeployment.Status.Conditions, condition)
dynamoComponentDeployment.Status.Service = componentReconcileResult.serviceReplicaStatus dynamoComponentDeployment.Status.Service = componentReconcileResult.serviceReplicaStatus
dynamoComponentDeployment.Status.ObservedGeneration = dynamoComponentDeployment.Generation
err := r.Status().Update(ctx, dynamoComponentDeployment) err := r.Status().Update(ctx, dynamoComponentDeployment)
if err != nil { if err != nil {
...@@ -763,9 +773,11 @@ func IsDeploymentReady(deployment *appsv1.Deployment) bool { ...@@ -763,9 +773,11 @@ func IsDeploymentReady(deployment *appsv1.Deployment) bool {
// 1. ObservedGeneration: Deployment controller has observed the latest configuration // 1. ObservedGeneration: Deployment controller has observed the latest configuration
// 2. UpdatedReplicas: All replicas have been updated to the latest version // 2. UpdatedReplicas: All replicas have been updated to the latest version
// 3. AvailableReplicas: All desired replicas are available (schedulable and healthy) // 3. AvailableReplicas: All desired replicas are available (schedulable and healthy)
// 4. Replicas: Total replicas equals desired (no surge pods remaining from rolling update)
if status.ObservedGeneration < deployment.Generation || if status.ObservedGeneration < deployment.Generation ||
status.UpdatedReplicas < desiredReplicas || status.UpdatedReplicas < desiredReplicas ||
status.AvailableReplicas < desiredReplicas { status.AvailableReplicas < desiredReplicas ||
status.Replicas != desiredReplicas {
return false return false
} }
// Finally, check for the DeploymentAvailable condition // Finally, check for the DeploymentAvailable condition
...@@ -1234,6 +1246,12 @@ func (r *DynamoComponentDeploymentReconciler) generatePodTemplateSpec(ctx contex ...@@ -1234,6 +1246,12 @@ func (r *DynamoComponentDeploymentReconciler) generatePodTemplateSpec(ctx contex
maps.Copy(podLabels, extraPodMetadata.Labels) maps.Copy(podLabels, extraPodMetadata.Labels)
} }
// Propagate restart annotation to pod template to trigger rolling restart
// This is the same mechanism used by kubectl rollout restart
if restartAt, exists := resourceAnnotations[commonconsts.RestartAnnotation]; exists {
podAnnotations[commonconsts.RestartAnnotation] = restartAt
}
if podSpec.ServiceAccountName == "" { if podSpec.ServiceAccountName == "" {
serviceAccounts := &corev1.ServiceAccountList{} serviceAccounts := &corev1.ServiceAccountList{}
err = r.List(ctx, serviceAccounts, client.InNamespace(opt.dynamoComponentDeployment.Namespace), client.MatchingLabels{ err = r.List(ctx, serviceAccounts, client.InNamespace(opt.dynamoComponentDeployment.Namespace), client.MatchingLabels{
......
...@@ -93,6 +93,26 @@ func TestIsDeploymentReady(t *testing.T) { ...@@ -93,6 +93,26 @@ func TestIsDeploymentReady(t *testing.T) {
}, },
want: false, want: false,
}, },
{
name: "not ready (surging)",
args: args{
deployment: &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Generation: 1,
},
Spec: appsv1.DeploymentSpec{
Replicas: &[]int32{2}[0],
},
Status: appsv1.DeploymentStatus{
ObservedGeneration: 1,
UpdatedReplicas: 1,
AvailableReplicas: 1,
Replicas: 2,
},
},
},
want: false,
},
{ {
name: "ready", name: "ready",
args: args{ args: args{
...@@ -107,6 +127,7 @@ func TestIsDeploymentReady(t *testing.T) { ...@@ -107,6 +127,7 @@ func TestIsDeploymentReady(t *testing.T) {
ObservedGeneration: 1, ObservedGeneration: 1,
UpdatedReplicas: 1, UpdatedReplicas: 1,
AvailableReplicas: 1, AvailableReplicas: 1,
Replicas: 1,
Conditions: []appsv1.DeploymentCondition{ Conditions: []appsv1.DeploymentCondition{
{ {
Type: appsv1.DeploymentAvailable, Type: appsv1.DeploymentAvailable,
...@@ -1855,6 +1876,7 @@ func Test_setStatusConditionAndServiceReplicaStatus(t *testing.T) { ...@@ -1855,6 +1876,7 @@ func Test_setStatusConditionAndServiceReplicaStatus(t *testing.T) {
wantConditionReason string wantConditionReason string
wantConditionMessage string wantConditionMessage string
wantServiceReplicaStatus *v1alpha1.ServiceReplicaStatus wantServiceReplicaStatus *v1alpha1.ServiceReplicaStatus
wantObservedGeneration int64
}{ }{
{ {
name: "deployment backed DCD that is unready", name: "deployment backed DCD that is unready",
...@@ -1976,10 +1998,12 @@ func Test_setStatusConditionAndServiceReplicaStatus(t *testing.T) { ...@@ -1976,10 +1998,12 @@ func Test_setStatusConditionAndServiceReplicaStatus(t *testing.T) {
g.Expect(err).NotTo(gomega.HaveOccurred()) g.Expect(err).NotTo(gomega.HaveOccurred())
// Create DynamoComponentDeployment // Create DynamoComponentDeployment
generation := int64(5)
dcd := &v1alpha1.DynamoComponentDeployment{ dcd := &v1alpha1.DynamoComponentDeployment{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "test-component", Name: "test-component",
Namespace: "default", Namespace: "default",
Generation: generation,
}, },
Spec: v1alpha1.DynamoComponentDeploymentSpec{ Spec: v1alpha1.DynamoComponentDeploymentSpec{
BackendFramework: string(dynamo.BackendFrameworkVLLM), BackendFramework: string(dynamo.BackendFrameworkVLLM),
...@@ -2031,6 +2055,9 @@ func Test_setStatusConditionAndServiceReplicaStatus(t *testing.T) { ...@@ -2031,6 +2055,9 @@ func Test_setStatusConditionAndServiceReplicaStatus(t *testing.T) {
// Assert the service replica status // Assert the service replica status
g.Expect(updatedDCD.Status.Service).To(gomega.Equal(tt.wantServiceReplicaStatus)) g.Expect(updatedDCD.Status.Service).To(gomega.Equal(tt.wantServiceReplicaStatus))
// Assert the observed generation
g.Expect(updatedDCD.Status.ObservedGeneration).To(gomega.Equal(generation))
}) })
} }
} }
......
...@@ -186,16 +186,19 @@ func (r *DynamoGraphDeploymentReconciler) Reconcile(ctx context.Context, req ctr ...@@ -186,16 +186,19 @@ func (r *DynamoGraphDeploymentReconciler) Reconcile(ctx context.Context, req ctr
return ctrl.Result{}, nil return ctrl.Result{}, nil
} }
reconcileResult, err := r.reconcileResources(ctx, dynamoDeployment) reconcileResult, err := r.reconcileResources(ctx, dynamoDeployment)
state = reconcileResult.State state = reconcileResult.State
reason = reconcileResult.Reason reason = reconcileResult.Reason
message = reconcileResult.Message message = reconcileResult.Message
dynamoDeployment.Status.Services = reconcileResult.ServiceStatus dynamoDeployment.Status.Services = reconcileResult.ServiceStatus
dynamoDeployment.Status.Restart = reconcileResult.RestartStatus
if err != nil { if err != nil {
logger.Error(err, "failed to reconcile the resources") logger.Error(err, "failed to reconcile the resources")
reason = "failed_to_reconcile_the_resources" reason = "failed_to_reconcile_the_resources"
return ctrl.Result{}, err return ctrl.Result{}, err
} }
return ctrl.Result{}, nil return ctrl.Result{}, nil
} }
...@@ -210,6 +213,7 @@ type ReconcileResult struct { ...@@ -210,6 +213,7 @@ type ReconcileResult struct {
Reason Reason Reason Reason
Message Message Message Message
ServiceStatus map[string]nvidiacomv1alpha1.ServiceReplicaStatus ServiceStatus map[string]nvidiacomv1alpha1.ServiceReplicaStatus
RestartStatus *nvidiacomv1alpha1.RestartStatus
} }
func (r *DynamoGraphDeploymentReconciler) reconcileResources(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) (ReconcileResult, error) { func (r *DynamoGraphDeploymentReconciler) reconcileResources(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) (ReconcileResult, error) {
...@@ -255,14 +259,6 @@ func (r *DynamoGraphDeploymentReconciler) reconcileResources(ctx context.Context ...@@ -255,14 +259,6 @@ func (r *DynamoGraphDeploymentReconciler) reconcileResources(ctx context.Context
return ReconcileResult{}, fmt.Errorf("failed to reconcile K8s discovery resources: %w", err) return ReconcileResult{}, fmt.Errorf("failed to reconcile K8s discovery resources: %w", err)
} }
// Orchestrator selection via single boolean annotation: nvidia.com/enable-grove
// Unset or not "false": Grove if available; else component mode
// "false": component mode (multinode -> LWS; single-node -> standard)
enableGrove := true
if dynamoDeployment.Annotations != nil && strings.ToLower(dynamoDeployment.Annotations[consts.KubeAnnotationEnableGrove]) == consts.KubeLabelValueFalse {
enableGrove = false
}
// Determine if any service is multinode // Determine if any service is multinode
hasMultinode := dynamoDeployment.HasAnyMultinodeService() hasMultinode := dynamoDeployment.HasAnyMultinodeService()
...@@ -275,18 +271,110 @@ func (r *DynamoGraphDeploymentReconciler) reconcileResources(ctx context.Context ...@@ -275,18 +271,110 @@ func (r *DynamoGraphDeploymentReconciler) reconcileResources(ctx context.Context
} }
} }
if enableGrove && r.Config.Grove.Enabled { // return error early if Grove and LWS is not available for multinode
logger.Info("Reconciling Grove resources", "enableGrove", enableGrove, "groveEnabled", r.Config.Grove.Enabled, "hasMultinode", hasMultinode, "lwsEnabled", r.Config.LWS.Enabled) if !r.isGrovePathway(dynamoDeployment) && hasMultinode && !r.Config.LWS.Enabled {
return r.reconcileGroveResources(ctx, dynamoDeployment)
}
if hasMultinode && !r.Config.LWS.Enabled {
err := fmt.Errorf("no multinode orchestrator available") err := fmt.Errorf("no multinode orchestrator available")
logger.Error(err, err.Error(), "hasMultinode", hasMultinode, "lwsEnabled", r.Config.LWS.Enabled, "enableGrove", enableGrove, "groveEnabled", r.Config.Grove.Enabled) logger.Error(err, err.Error(), "hasMultinode", hasMultinode, "lwsEnabled", r.Config.LWS.Enabled)
return ReconcileResult{}, fmt.Errorf("failed to reconcile Dynamo components deployments: %w", err) return ReconcileResult{}, fmt.Errorf("failed to reconcile Dynamo components deployments: %w", err)
} }
logger.Info("Reconciling Dynamo components deployments", "hasMultinode", hasMultinode, "lwsEnabled", r.Config.LWS.Enabled, "enableGrove", enableGrove, "groveEnabled", r.Config.Grove.Enabled)
return r.reconcileDynamoComponentsDeployments(ctx, dynamoDeployment)
restartStatus := r.computeRestartStatus(ctx, dynamoDeployment)
restartState := dynamo.DetermineRestartState(dynamoDeployment, restartStatus)
var result ReconcileResult
if r.isGrovePathway(dynamoDeployment) {
logger.Info("Reconciling Grove resources", "hasMultinode", hasMultinode, "lwsEnabled", r.Config.LWS.Enabled)
result, err = r.reconcileGroveResources(ctx, dynamoDeployment, restartState)
} else {
logger.Info("Reconciling Dynamo components deployments", "hasMultinode", hasMultinode, "lwsEnabled", r.Config.LWS.Enabled)
result, err = r.reconcileDynamoComponentsDeployments(ctx, dynamoDeployment, restartState)
}
if err != nil {
logger.Error(err, "Failed to reconcile Dynamo components deployments")
return ReconcileResult{}, fmt.Errorf("failed to reconcile Dynamo components deployments: %w", err)
}
result.RestartStatus = restartStatus
return result, nil
}
func (r *DynamoGraphDeploymentReconciler) isGrovePathway(dgd *nvidiacomv1alpha1.DynamoGraphDeployment) bool {
// Orchestrator selection via single boolean annotation: nvidia.com/enable-grove
// Unset or not "false": Grove if available; else component mode
// "false": component mode (multinode -> LWS; single-node -> standard)
enableGrove := true
if dgd.Annotations != nil && strings.ToLower(dgd.Annotations[consts.KubeAnnotationEnableGrove]) == consts.KubeLabelValueFalse {
enableGrove = false
}
return enableGrove && r.Config.Grove.Enabled
}
func (r *DynamoGraphDeploymentReconciler) getUpdatedInProgress(ctx context.Context, dgd *nvidiacomv1alpha1.DynamoGraphDeployment, inProgress []string) []string {
if r.isGrovePathway(dgd) {
return r.getUpdatedInProgressForGrove(ctx, dgd, inProgress)
}
return r.getUpdatedInProgressForComponent(ctx, dgd, inProgress)
}
// getUpgdatedInProgressForGrove checks which services are still in progress.
func (r *DynamoGraphDeploymentReconciler) getUpdatedInProgressForGrove(ctx context.Context, dgd *nvidiacomv1alpha1.DynamoGraphDeployment, inProgress []string) []string {
logger := log.FromContext(ctx)
pcs := &grovev1alpha1.PodCliqueSet{}
err := r.Client.Get(ctx, types.NamespacedName{Name: dgd.Name, Namespace: dgd.Namespace}, pcs)
if err != nil {
logger.Error(err, "failed to get PodCliqueSet")
return inProgress
}
if pcs.Status.ObservedGeneration == nil {
logger.Info("PodCliqueSet %s observedGeneration is nil", dgd.Name)
return inProgress
}
if *pcs.Status.ObservedGeneration < pcs.Generation {
logger.Info("PodCliqueSet %s not yet reconciled: generation=%d, observedGeneration=%d", dgd.Name, pcs.Generation, *pcs.Status.ObservedGeneration)
return inProgress
}
updatedInProgress := make([]string, 0, len(inProgress))
for _, serviceName := range inProgress {
component := dgd.Spec.Services[serviceName]
resourceName := fmt.Sprintf("%s-0-%s", dgd.Name, strings.ToLower(serviceName))
var isReady bool
var reason string
if component.GetNumberOfNodes() > 1 {
isReady, reason, _ = dynamo.CheckPCSGReady(ctx, r.Client, resourceName, dgd.Namespace, logger)
} else {
isReady, reason, _ = dynamo.CheckPodCliqueReady(ctx, r.Client, resourceName, dgd.Namespace, logger)
}
if !isReady {
logger.V(1).Info("service not ready", "serviceName", serviceName, "resourceName", resourceName, "reason", reason)
updatedInProgress = append(updatedInProgress, serviceName)
}
}
return updatedInProgress
}
func isRestartAlreadyProcessed(dgd *nvidiacomv1alpha1.DynamoGraphDeployment) bool {
if dgd.Spec.Restart == nil || dgd.Spec.Restart.ID == "" {
return true
}
if dgd.Status.Restart == nil || dgd.Status.Restart.ObservedID == "" {
return false
}
if dgd.Spec.Restart.ID == dgd.Status.Restart.ObservedID &&
(dgd.Status.Restart.Phase == nvidiacomv1alpha1.RestartPhaseCompleted ||
dgd.Status.Restart.Phase == nvidiacomv1alpha1.RestartPhaseFailed) {
return true
}
return false
} }
// scaleGroveResource scales a Grove resource using the generic scaling function // scaleGroveResource scales a Grove resource using the generic scaling function
...@@ -315,11 +403,17 @@ func (r *DynamoGraphDeploymentReconciler) scaleGroveResource(ctx context.Context ...@@ -315,11 +403,17 @@ func (r *DynamoGraphDeploymentReconciler) scaleGroveResource(ctx context.Context
return err return err
} }
func (r *DynamoGraphDeploymentReconciler) reconcileGrovePodCliqueSet(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) (*commoncontroller.Resource, error) { func (r *DynamoGraphDeploymentReconciler) reconcileGrovePodCliqueSet(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment, restartState *dynamo.RestartState) (*commoncontroller.Resource, error) {
logger := log.FromContext(ctx) logger := log.FromContext(ctx)
existingRestartAnnotations, err := r.getExistingRestartAnnotationsPCS(ctx, dynamoDeployment)
if err != nil {
logger.Error(err, "failed to get existing restart annotations")
return nil, fmt.Errorf("failed to get existing restart annotations: %w", err)
}
// generate the dynamoComponentsDeployments from the config // generate the dynamoComponentsDeployments from the config
grovePodCliqueSet, err := dynamo.GenerateGrovePodCliqueSet(ctx, dynamoDeployment, r.Config, r.DockerSecretRetriever) grovePodCliqueSet, err := dynamo.GenerateGrovePodCliqueSet(ctx, dynamoDeployment, r.Config, r.DockerSecretRetriever, restartState, existingRestartAnnotations)
if err != nil { if err != nil {
logger.Error(err, "failed to generate the Grove GangSet") logger.Error(err, "failed to generate the Grove GangSet")
return nil, fmt.Errorf("failed to generate the Grove GangSet: %w", err) return nil, fmt.Errorf("failed to generate the Grove GangSet: %w", err)
...@@ -349,6 +443,28 @@ func (r *DynamoGraphDeploymentReconciler) reconcileGrovePodCliqueSet(ctx context ...@@ -349,6 +443,28 @@ func (r *DynamoGraphDeploymentReconciler) reconcileGrovePodCliqueSet(ctx context
return syncedGrovePodCliqueSetAsResource, nil return syncedGrovePodCliqueSetAsResource, nil
} }
func (r *DynamoGraphDeploymentReconciler) getExistingRestartAnnotationsPCS(ctx context.Context, dgd *nvidiacomv1alpha1.DynamoGraphDeployment) (map[string]string, error) {
restartAnnotations := make(map[string]string)
pcs := &grovev1alpha1.PodCliqueSet{}
err := r.Client.Get(ctx, types.NamespacedName{Name: dgd.Name, Namespace: dgd.Namespace}, pcs)
if err != nil && !errors.IsNotFound(err) {
return nil, fmt.Errorf("failed to get PodCliqueSet: %w", err)
}
if errors.IsNotFound(err) {
return restartAnnotations, nil
}
for _, clique := range pcs.Spec.Template.Cliques {
if clique.Annotations != nil {
if timestamp, ok := clique.Annotations[consts.RestartAnnotation]; ok {
if serviceName, ok := clique.Labels[consts.KubeLabelDynamoComponent]; ok {
restartAnnotations[serviceName] = timestamp
}
}
}
}
return restartAnnotations, nil
}
// reconcileGroveScaling handles scaling operations for Grove resources based on service replica changes // reconcileGroveScaling handles scaling operations for Grove resources based on service replica changes
func (r *DynamoGraphDeploymentReconciler) reconcileGroveScaling(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) error { func (r *DynamoGraphDeploymentReconciler) reconcileGroveScaling(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) error {
logger := log.FromContext(ctx) logger := log.FromContext(ctx)
...@@ -397,10 +513,10 @@ func (r *DynamoGraphDeploymentReconciler) reconcileGroveScaling(ctx context.Cont ...@@ -397,10 +513,10 @@ func (r *DynamoGraphDeploymentReconciler) reconcileGroveScaling(ctx context.Cont
return nil return nil
} }
func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) (ReconcileResult, error) { func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment, restartState *dynamo.RestartState) (ReconcileResult, error) {
logger := log.FromContext(ctx) logger := log.FromContext(ctx)
grovePodCliqueSetAsResource, err := r.reconcileGrovePodCliqueSet(ctx, dynamoDeployment) grovePodCliqueSetAsResource, err := r.reconcileGrovePodCliqueSet(ctx, dynamoDeployment, restartState)
if err != nil { if err != nil {
logger.Error(err, "failed to reconcile the Grove PodClique Set") logger.Error(err, "failed to reconcile the Grove PodClique Set")
return ReconcileResult{}, fmt.Errorf("failed to reconcile the Grove PodClique Set: %w", err) return ReconcileResult{}, fmt.Errorf("failed to reconcile the Grove PodClique Set: %w", err)
...@@ -510,7 +626,253 @@ func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Co ...@@ -510,7 +626,253 @@ func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Co
} }
} }
} }
return r.checkResourcesReadiness(resources), nil
// Check resource readiness
result := r.checkResourcesReadiness(resources)
return result, nil
}
// isNewRestartRequest checks if the current spec.restart.id represents a new restart request
func isNewRestartRequest(dgd *nvidiacomv1alpha1.DynamoGraphDeployment) bool {
if dgd.Status.Restart == nil || dgd.Status.Restart.ObservedID == "" || dgd.Spec.Restart.ID == "" {
return true
}
return dgd.Spec.Restart.ID != dgd.Status.Restart.ObservedID
}
// computeParallelRestartStatus handles parallel restart where all services restart together.
func (r *DynamoGraphDeploymentReconciler) computeParallelRestartStatus(
ctx context.Context,
dgd *nvidiacomv1alpha1.DynamoGraphDeployment,
) *nvidiacomv1alpha1.RestartStatus {
logger := log.FromContext(ctx)
specID := dgd.Spec.Restart.ID
var servicesToCheck []string
if isNewRestartRequest(dgd) {
logger.Info("New restart request detected, resetting to all services", "specID", specID)
servicesToCheck = make([]string, 0, len(dgd.Spec.Services))
for serviceName := range dgd.Spec.Services {
servicesToCheck = append(servicesToCheck, serviceName)
}
// Sort for deterministic output
sort.Strings(servicesToCheck)
} else if dgd.Status.Restart != nil && len(dgd.Status.Restart.InProgress) > 0 {
// Continuing existing restart: use current InProgress list
servicesToCheck = dgd.Status.Restart.InProgress
} else {
// No in-progress list but same ID - use all services
servicesToCheck = make([]string, 0, len(dgd.Spec.Services))
for serviceName := range dgd.Spec.Services {
servicesToCheck = append(servicesToCheck, serviceName)
}
// Sort for deterministic output
sort.Strings(servicesToCheck)
}
if len(servicesToCheck) == 0 {
return &nvidiacomv1alpha1.RestartStatus{
ObservedID: specID,
Phase: nvidiacomv1alpha1.RestartPhaseCompleted,
}
}
updatedInProgress := r.getUpdatedInProgress(ctx, dgd, servicesToCheck)
if len(updatedInProgress) == 0 {
logger.Info("Restart completed for all services")
return &nvidiacomv1alpha1.RestartStatus{
ObservedID: specID,
Phase: nvidiacomv1alpha1.RestartPhaseCompleted,
}
}
return &nvidiacomv1alpha1.RestartStatus{
ObservedID: specID,
Phase: nvidiacomv1alpha1.RestartPhaseRestarting,
InProgress: updatedInProgress,
}
}
// computeSequentialRestartStatus handles sequential restart where services restart one at a time.
func (r *DynamoGraphDeploymentReconciler) computeSequentialRestartStatus(
ctx context.Context,
dgd *nvidiacomv1alpha1.DynamoGraphDeployment,
order []string,
) *nvidiacomv1alpha1.RestartStatus {
logger := log.FromContext(ctx)
specID := dgd.Spec.Restart.ID
// Get the current service being restarted from previous status
var currentService string
if isNewRestartRequest(dgd) {
// New restart request: start fresh from the first service
logger.Info("New restart request detected, starting from first service", "specID", specID, "firstService", order[0])
currentService = order[0]
return &nvidiacomv1alpha1.RestartStatus{
ObservedID: specID,
Phase: nvidiacomv1alpha1.RestartPhaseRestarting,
InProgress: []string{currentService},
}
}
if dgd.Status.Restart != nil && len(dgd.Status.Restart.InProgress) > 0 {
currentService = dgd.Status.Restart.InProgress[0] // For sequential, there's only one
}
// If no current service, we're starting fresh - use the first service
if currentService == "" {
currentService = order[0]
return &nvidiacomv1alpha1.RestartStatus{
ObservedID: specID,
Phase: nvidiacomv1alpha1.RestartPhaseRestarting,
InProgress: []string{currentService},
}
}
// Check if the current service is fully updated
updatedInProgress := r.getUpdatedInProgress(ctx, dgd, []string{currentService})
if len(updatedInProgress) > 0 {
// Still restarting
logger.Info("Service restart not completed", "service", currentService, "updatedInProgress", updatedInProgress)
return &nvidiacomv1alpha1.RestartStatus{
ObservedID: specID,
Phase: nvidiacomv1alpha1.RestartPhaseRestarting,
InProgress: []string{currentService},
}
}
// Current service is fully updated - it's done
logger.Info("Service restart completed", "service", currentService)
// Find the next service
nextService := getNextServiceInOrder(order, currentService)
if nextService == "" {
// No more services, restart is complete
logger.Info("Restart completed for all services")
return &nvidiacomv1alpha1.RestartStatus{
ObservedID: specID,
Phase: nvidiacomv1alpha1.RestartPhaseCompleted,
}
}
// Move to the next service
logger.Info("Starting next service restart", "service", nextService)
return &nvidiacomv1alpha1.RestartStatus{
ObservedID: specID,
Phase: nvidiacomv1alpha1.RestartPhaseRestarting,
InProgress: []string{nextService},
}
}
// getNextServiceInOrder returns the service after the given service in the order, or empty string if none.
func getNextServiceInOrder(order []string, currentService string) string {
for i, svc := range order {
if svc == currentService && i+1 < len(order) {
return order[i+1]
}
}
return ""
}
func (r *DynamoGraphDeploymentReconciler) computeRestartStatus(ctx context.Context, dgd *nvidiacomv1alpha1.DynamoGraphDeployment) *nvidiacomv1alpha1.RestartStatus {
// No restart requested
if dgd.Spec.Restart == nil || dgd.Spec.Restart.ID == "" {
// Preserve existing terminal status
if dgd.Status.Restart != nil && (dgd.Status.Restart.Phase == nvidiacomv1alpha1.RestartPhaseCompleted || dgd.Status.Restart.Phase == nvidiacomv1alpha1.RestartPhaseFailed) {
return dgd.Status.Restart
}
return nil
}
// If restart was already processed (completed or failed), return existing status
if isRestartAlreadyProcessed(dgd) {
return dgd.Status.Restart
}
order := dynamo.GetRestartOrder(dgd)
if dynamo.IsParallelRestart(dgd) {
return r.computeParallelRestartStatus(ctx, dgd)
}
return r.computeSequentialRestartStatus(ctx, dgd, order)
}
// checkComponentServiceFullyUpdated checks if a DynamoComponentDeployment is fully updated.
func (r *DynamoGraphDeploymentReconciler) checkComponentServiceFullyUpdated(ctx context.Context, dgd *nvidiacomv1alpha1.DynamoGraphDeployment, serviceName string) (bool, string) {
resourceName := dynamo.GetDynamoComponentName(dgd, serviceName)
return checkDCDReady(ctx, r.Client, resourceName, dgd.Namespace)
}
// checkDCDReady checks if a DynamoComponentDeployment has completed its restart.
// A DCD is considered fully updated when:
// 1. The DCD controller has processed the latest spec (observedGeneration >= generation)
// 2. The Available condition is set to True
func checkDCDReady(ctx context.Context, client client.Client, resourceName, namespace string) (bool, string) {
logger := log.FromContext(ctx)
dcd := &nvidiacomv1alpha1.DynamoComponentDeployment{}
err := client.Get(ctx, types.NamespacedName{Name: resourceName, Namespace: namespace}, dcd)
if err != nil {
if errors.IsNotFound(err) {
logger.V(2).Info("DynamoComponentDeployment not found", "resourceName", resourceName)
return false, "resource not found"
}
logger.V(1).Info("Failed to get DynamoComponentDeployment", "error", err, "resourceName", resourceName)
return false, fmt.Sprintf("get error: %v", err)
}
// Log the DCD status for debugging
logger.Info("CheckDCDFullyUpdated",
"resourceName", resourceName,
"generation", dcd.Generation,
"observedGeneration", dcd.Status.ObservedGeneration,
"conditionCount", len(dcd.Status.Conditions))
if dcd.Status.ObservedGeneration < dcd.Generation {
logger.V(1).Info("DynamoComponentDeployment spec not yet processed",
"resourceName", resourceName,
"generation", dcd.Generation,
"observedGeneration", dcd.Status.ObservedGeneration)
return false, fmt.Sprintf("spec not yet processed: generation=%d, observedGeneration=%d", dcd.Generation, dcd.Status.ObservedGeneration)
}
// Check if the Available condition is True
for _, condition := range dcd.Status.Conditions {
if condition.Type == nvidiacomv1alpha1.DynamoGraphDeploymentConditionTypeAvailable {
if condition.Status == metav1.ConditionTrue {
return true, ""
}
logger.V(1).Info("DynamoComponentDeployment not available",
"resourceName", resourceName,
"status", condition.Status,
"reason", condition.Reason,
"message", condition.Message)
return false, fmt.Sprintf("not available: %s", condition.Message)
}
}
logger.V(1).Info("DynamoComponentDeployment missing Available condition", "resourceName", resourceName)
return false, "Available condition not found"
}
// getUpdatedInProgressForComponent checks which services are still in progress for DCD pathway.
func (r *DynamoGraphDeploymentReconciler) getUpdatedInProgressForComponent(ctx context.Context, dgd *nvidiacomv1alpha1.DynamoGraphDeployment, inProgress []string) []string {
logger := log.FromContext(ctx)
updatedInProgress := make([]string, 0, len(inProgress))
for _, serviceName := range inProgress {
isFullyUpdated, reason := r.checkComponentServiceFullyUpdated(ctx, dgd, serviceName)
if !isFullyUpdated {
logger.V(1).Info("service not fully updated", "serviceName", serviceName, "reason", reason)
updatedInProgress = append(updatedInProgress, serviceName)
}
}
return updatedInProgress
} }
func (r *DynamoGraphDeploymentReconciler) checkResourcesReadiness(resources []Resource) ReconcileResult { func (r *DynamoGraphDeploymentReconciler) checkResourcesReadiness(resources []Resource) ReconcileResult {
...@@ -552,13 +914,19 @@ func (r *DynamoGraphDeploymentReconciler) checkResourcesReadiness(resources []Re ...@@ -552,13 +914,19 @@ func (r *DynamoGraphDeploymentReconciler) checkResourcesReadiness(resources []Re
} }
} }
func (r *DynamoGraphDeploymentReconciler) reconcileDynamoComponentsDeployments(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) (ReconcileResult, error) { func (r *DynamoGraphDeploymentReconciler) reconcileDynamoComponentsDeployments(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment, restartState *dynamo.RestartState) (ReconcileResult, error) {
resources := []Resource{} resources := []Resource{}
logger := log.FromContext(ctx) logger := log.FromContext(ctx)
existingRestartAnnotations, err := r.getExistingRestartAnnotationsDCD(ctx, dynamoDeployment)
if err != nil {
logger.Error(err, "failed to get existing restart annotations")
return ReconcileResult{}, fmt.Errorf("failed to get existing restart annotations: %w", err)
}
// generate the dynamoComponentsDeployments from the config // generate the dynamoComponentsDeployments from the config
defaultIngressSpec := dynamo.GenerateDefaultIngressSpec(dynamoDeployment, r.Config.IngressConfig) defaultIngressSpec := dynamo.GenerateDefaultIngressSpec(dynamoDeployment, r.Config.IngressConfig)
dynamoComponentsDeployments, err := dynamo.GenerateDynamoComponentsDeployments(ctx, dynamoDeployment, &defaultIngressSpec) dynamoComponentsDeployments, err := dynamo.GenerateDynamoComponentsDeployments(ctx, dynamoDeployment, &defaultIngressSpec, restartState, existingRestartAnnotations)
if err != nil { if err != nil {
logger.Error(err, "failed to generate the DynamoComponentsDeployments") logger.Error(err, "failed to generate the DynamoComponentsDeployments")
return ReconcileResult{}, fmt.Errorf("failed to generate the DynamoComponentsDeployments: %w", err) return ReconcileResult{}, fmt.Errorf("failed to generate the DynamoComponentsDeployments: %w", err)
...@@ -577,7 +945,34 @@ func (r *DynamoGraphDeploymentReconciler) reconcileDynamoComponentsDeployments(c ...@@ -577,7 +945,34 @@ func (r *DynamoGraphDeploymentReconciler) reconcileDynamoComponentsDeployments(c
resources = append(resources, dynamoComponentDeployment) resources = append(resources, dynamoComponentDeployment)
} }
return r.checkResourcesReadiness(resources), nil // Check resource readiness
result := r.checkResourcesReadiness(resources)
return result, nil
}
func (r *DynamoGraphDeploymentReconciler) getExistingRestartAnnotationsDCD(ctx context.Context, dgd *nvidiacomv1alpha1.DynamoGraphDeployment) (map[string]string, error) {
logger := log.FromContext(ctx)
restartAnnotations := make(map[string]string)
for serviceName := range dgd.Spec.Services {
dcdName := dynamo.GetDynamoComponentName(dgd, serviceName)
existingDCD := &nvidiacomv1alpha1.DynamoComponentDeployment{}
err := r.Get(ctx, types.NamespacedName{Name: dcdName, Namespace: dgd.Namespace}, existingDCD)
if err != nil && !errors.IsNotFound(err) {
return nil, fmt.Errorf("failed to get DynamoComponentDeployment: %w", err)
}
if errors.IsNotFound(err) {
logger.Info("DynamoComponentDeployment not found", "dcdName", dcdName)
continue
}
if existingDCD.Spec.Annotations != nil {
if restartAt := existingDCD.Spec.Annotations[consts.RestartAnnotation]; restartAt != "" {
restartAnnotations[serviceName] = restartAt
}
}
}
return restartAnnotations, nil
} }
// reconcilePVC reconciles a single top-level PVC defined in the DynamoGraphDeployment spec // reconcilePVC reconciles a single top-level PVC defined in the DynamoGraphDeployment spec
......
...@@ -397,9 +397,10 @@ func Test_reconcileGroveResources(t *testing.T) { ...@@ -397,9 +397,10 @@ func Test_reconcileGroveResources(t *testing.T) {
Replicas: 2, Replicas: 2,
}, },
Status: grovev1alpha1.PodCliqueStatus{ Status: grovev1alpha1.PodCliqueStatus{
Replicas: 2, Replicas: 2,
UpdatedReplicas: 2, UpdatedReplicas: 2,
ReadyReplicas: 2, ReadyReplicas: 2,
ObservedGeneration: ptr.To(int64(1)),
}, },
}, },
}, },
...@@ -443,9 +444,10 @@ func Test_reconcileGroveResources(t *testing.T) { ...@@ -443,9 +444,10 @@ func Test_reconcileGroveResources(t *testing.T) {
Replicas: 1, Replicas: 1,
}, },
Status: grovev1alpha1.PodCliqueStatus{ Status: grovev1alpha1.PodCliqueStatus{
Replicas: 1, Replicas: 1,
UpdatedReplicas: 1, UpdatedReplicas: 1,
ReadyReplicas: 1, ReadyReplicas: 1,
ObservedGeneration: ptr.To(int64(1)),
}, },
}, },
&grovev1alpha1.PodClique{ &grovev1alpha1.PodClique{
...@@ -457,9 +459,10 @@ func Test_reconcileGroveResources(t *testing.T) { ...@@ -457,9 +459,10 @@ func Test_reconcileGroveResources(t *testing.T) {
Replicas: 2, Replicas: 2,
}, },
Status: grovev1alpha1.PodCliqueStatus{ Status: grovev1alpha1.PodCliqueStatus{
Replicas: 2, Replicas: 2,
UpdatedReplicas: 1, UpdatedReplicas: 1,
ReadyReplicas: 1, // Only 1 ready, but 2 desired ReadyReplicas: 1, // Only 1 ready, but 2 desired
ObservedGeneration: ptr.To(int64(1)),
}, },
}, },
}, },
...@@ -516,9 +519,10 @@ func Test_reconcileGroveResources(t *testing.T) { ...@@ -516,9 +519,10 @@ func Test_reconcileGroveResources(t *testing.T) {
Replicas: 1, Replicas: 1,
}, },
Status: grovev1alpha1.PodCliqueScalingGroupStatus{ Status: grovev1alpha1.PodCliqueScalingGroupStatus{
Replicas: 1, Replicas: 1,
UpdatedReplicas: 1, UpdatedReplicas: 1,
AvailableReplicas: 1, AvailableReplicas: 1,
ObservedGeneration: ptr.To(int64(1)),
}, },
}, },
&grovev1alpha1.PodCliqueScalingGroup{ &grovev1alpha1.PodCliqueScalingGroup{
...@@ -530,9 +534,10 @@ func Test_reconcileGroveResources(t *testing.T) { ...@@ -530,9 +534,10 @@ func Test_reconcileGroveResources(t *testing.T) {
Replicas: 1, Replicas: 1,
}, },
Status: grovev1alpha1.PodCliqueScalingGroupStatus{ Status: grovev1alpha1.PodCliqueScalingGroupStatus{
Replicas: 1, Replicas: 1,
UpdatedReplicas: 1, UpdatedReplicas: 1,
AvailableReplicas: 1, AvailableReplicas: 1,
ObservedGeneration: ptr.To(int64(1)),
}, },
}, },
}, },
...@@ -586,9 +591,10 @@ func Test_reconcileGroveResources(t *testing.T) { ...@@ -586,9 +591,10 @@ func Test_reconcileGroveResources(t *testing.T) {
Replicas: 1, Replicas: 1,
}, },
Status: grovev1alpha1.PodCliqueStatus{ Status: grovev1alpha1.PodCliqueStatus{
Replicas: 1, Replicas: 1,
UpdatedReplicas: 1, UpdatedReplicas: 1,
ReadyReplicas: 1, ReadyReplicas: 1,
ObservedGeneration: ptr.To(int64(1)),
}, },
}, },
&grovev1alpha1.PodCliqueScalingGroup{ &grovev1alpha1.PodCliqueScalingGroup{
...@@ -600,9 +606,10 @@ func Test_reconcileGroveResources(t *testing.T) { ...@@ -600,9 +606,10 @@ func Test_reconcileGroveResources(t *testing.T) {
Replicas: 2, Replicas: 2,
}, },
Status: grovev1alpha1.PodCliqueScalingGroupStatus{ Status: grovev1alpha1.PodCliqueScalingGroupStatus{
Replicas: 2, Replicas: 2,
UpdatedReplicas: 2, UpdatedReplicas: 2,
AvailableReplicas: 1, // Only 1 available, but 2 desired AvailableReplicas: 1, // Only 1 available, but 2 desired
ObservedGeneration: ptr.To(int64(1)),
}, },
}, },
}, },
...@@ -671,7 +678,7 @@ func Test_reconcileGroveResources(t *testing.T) { ...@@ -671,7 +678,7 @@ func Test_reconcileGroveResources(t *testing.T) {
}, },
} }
result, err := reconciler.reconcileGroveResources(ctx, dgd) result, err := reconciler.reconcileGroveResources(ctx, dgd, nil)
g.Expect(err).NotTo(gomega.HaveOccurred()) g.Expect(err).NotTo(gomega.HaveOccurred())
g.Expect(result).To(gomega.Equal(tt.wantReconcileResult)) g.Expect(result).To(gomega.Equal(tt.wantReconcileResult))
...@@ -679,6 +686,668 @@ func Test_reconcileGroveResources(t *testing.T) { ...@@ -679,6 +686,668 @@ func Test_reconcileGroveResources(t *testing.T) {
} }
} }
func Test_computeRestartStatus(t *testing.T) {
ctx := context.Background()
newID := "restart-1"
oldID := "restart-0"
tests := []struct {
name string
dgdSpec v1alpha1.DynamoGraphDeploymentSpec
dgdStatus v1alpha1.DynamoGraphDeploymentStatus
existingResources []client.Object
groveEnabled bool
wantRestartStatus *v1alpha1.RestartStatus
}{
{
name: "no restart requested - returns nil",
},
{
name: "no restart at time - returns nil",
dgdSpec: v1alpha1.DynamoGraphDeploymentSpec{
Restart: &v1alpha1.Restart{},
},
},
{
name: "no restart requested but has completed status - preserves status",
dgdStatus: v1alpha1.DynamoGraphDeploymentStatus{
Restart: &v1alpha1.RestartStatus{
ObservedID: oldID,
Phase: v1alpha1.RestartPhaseCompleted,
},
},
wantRestartStatus: &v1alpha1.RestartStatus{
ObservedID: oldID,
Phase: v1alpha1.RestartPhaseCompleted,
},
},
{
name: "no restart requested but has restarting status - returns nil",
dgdStatus: v1alpha1.DynamoGraphDeploymentStatus{
Restart: &v1alpha1.RestartStatus{
ObservedID: oldID,
Phase: v1alpha1.RestartPhaseRestarting,
},
},
},
{
name: "restart already processed (completed) - returns existing status",
dgdSpec: v1alpha1.DynamoGraphDeploymentSpec{
Restart: &v1alpha1.Restart{
ID: newID,
},
},
dgdStatus: v1alpha1.DynamoGraphDeploymentStatus{
Restart: &v1alpha1.RestartStatus{
ObservedID: newID,
Phase: v1alpha1.RestartPhaseCompleted,
},
},
wantRestartStatus: &v1alpha1.RestartStatus{
ObservedID: newID,
Phase: v1alpha1.RestartPhaseCompleted,
},
},
{
name: "restart already processed (failed) - returns existing status",
dgdSpec: v1alpha1.DynamoGraphDeploymentSpec{
Restart: &v1alpha1.Restart{
ID: newID,
},
},
dgdStatus: v1alpha1.DynamoGraphDeploymentStatus{
Restart: &v1alpha1.RestartStatus{
ObservedID: newID,
Phase: v1alpha1.RestartPhaseFailed,
},
},
wantRestartStatus: &v1alpha1.RestartStatus{
ObservedID: newID,
Phase: v1alpha1.RestartPhaseFailed,
},
},
{
name: "parallel restart - all services complete (DCD pathway)",
dgdSpec: v1alpha1.DynamoGraphDeploymentSpec{
Restart: &v1alpha1.Restart{
ID: newID,
Strategy: &v1alpha1.RestartStrategy{
Type: v1alpha1.RestartStrategyTypeParallel,
},
},
Services: map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
"frontend": {
Replicas: ptr.To(int32(1)),
},
},
},
dgdStatus: v1alpha1.DynamoGraphDeploymentStatus{
Restart: &v1alpha1.RestartStatus{
ObservedID: newID,
Phase: v1alpha1.RestartPhaseRestarting,
InProgress: []string{"frontend"},
},
},
existingResources: []client.Object{
&v1alpha1.DynamoComponentDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dgd-frontend",
Namespace: "default",
Generation: 1,
},
Status: v1alpha1.DynamoComponentDeploymentStatus{
ObservedGeneration: 1,
Conditions: []metav1.Condition{
{
Type: v1alpha1.DynamoGraphDeploymentConditionTypeAvailable,
Status: metav1.ConditionTrue,
},
},
},
},
},
wantRestartStatus: &v1alpha1.RestartStatus{
ObservedID: newID,
Phase: v1alpha1.RestartPhaseCompleted,
},
},
{
name: "parallel restart - services still restarting (DCD pathway)",
dgdSpec: v1alpha1.DynamoGraphDeploymentSpec{
BackendFramework: "vllm",
Restart: &v1alpha1.Restart{
ID: newID,
Strategy: &v1alpha1.RestartStrategy{
Type: v1alpha1.RestartStrategyTypeParallel,
},
},
Services: map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
"frontend": {
ServiceName: "frontend",
ComponentType: string(commonconsts.ComponentTypeFrontend),
Replicas: ptr.To(int32(1)),
},
"decode": {
ServiceName: "decode",
ComponentType: string(commonconsts.ComponentTypeDecode),
Replicas: ptr.To(int32(2)),
},
},
},
dgdStatus: v1alpha1.DynamoGraphDeploymentStatus{
Restart: &v1alpha1.RestartStatus{
ObservedID: newID,
Phase: v1alpha1.RestartPhaseRestarting,
InProgress: []string{"frontend", "decode"},
},
},
existingResources: []client.Object{
&v1alpha1.DynamoComponentDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dgd-frontend",
Namespace: "default",
Generation: 1,
},
Status: v1alpha1.DynamoComponentDeploymentStatus{
ObservedGeneration: 1,
Conditions: []metav1.Condition{
{
Type: v1alpha1.DynamoGraphDeploymentConditionTypeAvailable,
Status: metav1.ConditionTrue,
},
},
},
},
&v1alpha1.DynamoComponentDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dgd-decode",
Namespace: "default",
Generation: 2,
},
Status: v1alpha1.DynamoComponentDeploymentStatus{
ObservedGeneration: 1, // Not yet caught up
Conditions: []metav1.Condition{
{
Type: v1alpha1.DynamoGraphDeploymentConditionTypeAvailable,
Status: metav1.ConditionFalse,
},
},
},
},
},
wantRestartStatus: &v1alpha1.RestartStatus{
ObservedID: newID,
Phase: v1alpha1.RestartPhaseRestarting,
InProgress: []string{"decode"},
},
},
{
name: "sequential restart - first service starting",
dgdSpec: v1alpha1.DynamoGraphDeploymentSpec{
Restart: &v1alpha1.Restart{
ID: newID,
Strategy: &v1alpha1.RestartStrategy{
Type: v1alpha1.RestartStrategyTypeSequential,
Order: []string{"frontend", "decode"},
},
},
Services: map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
"frontend": {
Replicas: ptr.To(int32(1)),
},
"decode": {
Replicas: ptr.To(int32(2)),
},
},
},
wantRestartStatus: &v1alpha1.RestartStatus{
ObservedID: newID,
Phase: v1alpha1.RestartPhaseRestarting,
InProgress: []string{"frontend"},
},
},
{
name: "sequential restart - first service done, moving to second",
dgdSpec: v1alpha1.DynamoGraphDeploymentSpec{
Restart: &v1alpha1.Restart{
ID: newID,
Strategy: &v1alpha1.RestartStrategy{
Type: v1alpha1.RestartStrategyTypeSequential,
Order: []string{"frontend", "decode"},
},
},
Services: map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
"frontend": {
Replicas: ptr.To(int32(1)),
},
"decode": {
Replicas: ptr.To(int32(2)),
},
},
},
dgdStatus: v1alpha1.DynamoGraphDeploymentStatus{
Restart: &v1alpha1.RestartStatus{
ObservedID: newID,
Phase: v1alpha1.RestartPhaseRestarting,
InProgress: []string{"frontend"},
},
},
existingResources: []client.Object{
&v1alpha1.DynamoComponentDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dgd-frontend",
Namespace: "default",
Generation: 1,
},
Status: v1alpha1.DynamoComponentDeploymentStatus{
ObservedGeneration: 1,
Conditions: []metav1.Condition{
{
Type: v1alpha1.DynamoGraphDeploymentConditionTypeAvailable,
Status: metav1.ConditionTrue,
},
},
},
},
},
wantRestartStatus: &v1alpha1.RestartStatus{
ObservedID: newID,
Phase: v1alpha1.RestartPhaseRestarting,
InProgress: []string{"decode"},
},
},
{
name: "sequential restart - all services complete",
dgdSpec: v1alpha1.DynamoGraphDeploymentSpec{
Restart: &v1alpha1.Restart{
ID: newID,
Strategy: &v1alpha1.RestartStrategy{
Type: v1alpha1.RestartStrategyTypeSequential,
},
},
Services: map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
"frontend": {
Replicas: ptr.To(int32(1)),
},
},
},
dgdStatus: v1alpha1.DynamoGraphDeploymentStatus{
Restart: &v1alpha1.RestartStatus{
ObservedID: newID,
Phase: v1alpha1.RestartPhaseRestarting,
InProgress: []string{"frontend"},
},
},
existingResources: []client.Object{
&v1alpha1.DynamoComponentDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dgd-frontend",
Namespace: "default",
Generation: 1,
},
Status: v1alpha1.DynamoComponentDeploymentStatus{
ObservedGeneration: 1,
Conditions: []metav1.Condition{
{
Type: v1alpha1.DynamoGraphDeploymentConditionTypeAvailable,
Status: metav1.ConditionTrue,
},
},
},
},
},
wantRestartStatus: &v1alpha1.RestartStatus{
ObservedID: newID,
Phase: v1alpha1.RestartPhaseCompleted,
},
},
{
name: "default strategy (sequential) - no strategy specified",
dgdSpec: v1alpha1.DynamoGraphDeploymentSpec{
Restart: &v1alpha1.Restart{
ID: newID,
},
Services: map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
"frontend": {
Replicas: ptr.To(int32(1)),
},
},
},
dgdStatus: v1alpha1.DynamoGraphDeploymentStatus{},
wantRestartStatus: &v1alpha1.RestartStatus{
ObservedID: newID,
Phase: v1alpha1.RestartPhaseRestarting,
InProgress: []string{"frontend"},
},
},
{
name: "parallel restart with empty services - returns completed",
dgdSpec: v1alpha1.DynamoGraphDeploymentSpec{
Restart: &v1alpha1.Restart{
ID: newID,
Strategy: &v1alpha1.RestartStrategy{
Type: v1alpha1.RestartStrategyTypeParallel,
},
},
Services: map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{},
},
dgdStatus: v1alpha1.DynamoGraphDeploymentStatus{},
wantRestartStatus: &v1alpha1.RestartStatus{
ObservedID: newID,
Phase: v1alpha1.RestartPhaseCompleted,
},
},
{
name: "Grove pathway - parallel restart complete",
dgdSpec: v1alpha1.DynamoGraphDeploymentSpec{
Restart: &v1alpha1.Restart{
ID: newID,
Strategy: &v1alpha1.RestartStrategy{
Type: v1alpha1.RestartStrategyTypeParallel,
},
},
Services: map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
"frontend": {
Replicas: ptr.To(int32(1)),
},
},
},
dgdStatus: v1alpha1.DynamoGraphDeploymentStatus{
Restart: &v1alpha1.RestartStatus{
ObservedID: newID,
Phase: v1alpha1.RestartPhaseRestarting,
InProgress: []string{"frontend"},
},
},
existingResources: []client.Object{
&grovev1alpha1.PodCliqueSet{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dgd",
Namespace: "default",
Generation: 1,
},
Status: grovev1alpha1.PodCliqueSetStatus{
ObservedGeneration: ptr.To(int64(1)),
},
},
&grovev1alpha1.PodClique{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dgd-0-frontend",
Namespace: "default",
Generation: 1,
},
Spec: grovev1alpha1.PodCliqueSpec{
Replicas: 1,
},
Status: grovev1alpha1.PodCliqueStatus{
Replicas: 1,
UpdatedReplicas: 1,
ReadyReplicas: 1,
ObservedGeneration: ptr.To(int64(1)),
},
},
},
groveEnabled: true,
wantRestartStatus: &v1alpha1.RestartStatus{
ObservedID: newID,
Phase: v1alpha1.RestartPhaseCompleted,
},
},
{
name: "Grove pathway - sequential restart in progress",
dgdSpec: v1alpha1.DynamoGraphDeploymentSpec{
Restart: &v1alpha1.Restart{
ID: newID,
Strategy: &v1alpha1.RestartStrategy{
Type: v1alpha1.RestartStrategyTypeSequential,
Order: []string{"frontend"},
},
},
Services: map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
"frontend": {
Replicas: ptr.To(int32(2)),
},
},
},
dgdStatus: v1alpha1.DynamoGraphDeploymentStatus{
Restart: &v1alpha1.RestartStatus{
ObservedID: newID,
Phase: v1alpha1.RestartPhaseRestarting,
InProgress: []string{"frontend"},
},
},
existingResources: []client.Object{
&grovev1alpha1.PodClique{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dgd-0-frontend",
Namespace: "default",
Generation: 1,
},
Spec: grovev1alpha1.PodCliqueSpec{
Replicas: 2,
},
Status: grovev1alpha1.PodCliqueStatus{
Replicas: 2,
UpdatedReplicas: 1, // Not fully updated
ReadyReplicas: 1,
ObservedGeneration: ptr.To(int64(1)),
},
},
},
groveEnabled: true,
wantRestartStatus: &v1alpha1.RestartStatus{
ObservedID: newID,
Phase: v1alpha1.RestartPhaseRestarting,
InProgress: []string{"frontend"},
},
},
{
name: "parallel restart - new restart request during ongoing restart resets to all services (DCD pathway)",
dgdSpec: v1alpha1.DynamoGraphDeploymentSpec{
Restart: &v1alpha1.Restart{
ID: newID, // NEW timestamp
Strategy: &v1alpha1.RestartStrategy{
Type: v1alpha1.RestartStrategyTypeParallel,
},
},
Services: map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
"frontend": {
Replicas: ptr.To(int32(1)),
},
"decode": {
Replicas: ptr.To(int32(1)),
},
"completed": {
Replicas: ptr.To(int32(1)),
},
},
},
dgdStatus: v1alpha1.DynamoGraphDeploymentStatus{
Restart: &v1alpha1.RestartStatus{
ObservedID: oldID,
Phase: v1alpha1.RestartPhaseRestarting,
InProgress: []string{"frontend", "decode"}, // completed service already done
},
},
existingResources: []client.Object{
// All services are now ready (simulating state after new restart timestamp is applied)
&v1alpha1.DynamoComponentDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dgd-frontend",
Namespace: "default",
Generation: 2,
},
Status: v1alpha1.DynamoComponentDeploymentStatus{
ObservedGeneration: 2,
Conditions: []metav1.Condition{
{Type: v1alpha1.DynamoGraphDeploymentConditionTypeAvailable, Status: metav1.ConditionFalse},
},
},
},
&v1alpha1.DynamoComponentDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dgd-decode",
Namespace: "default",
Generation: 2,
},
Status: v1alpha1.DynamoComponentDeploymentStatus{
ObservedGeneration: 2,
Conditions: []metav1.Condition{
{Type: v1alpha1.DynamoGraphDeploymentConditionTypeAvailable, Status: metav1.ConditionFalse},
},
},
},
&v1alpha1.DynamoComponentDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dgd-completed",
Namespace: "default",
Generation: 2,
},
Status: v1alpha1.DynamoComponentDeploymentStatus{
ObservedGeneration: 2,
Conditions: []metav1.Condition{
{Type: v1alpha1.DynamoGraphDeploymentConditionTypeAvailable, Status: metav1.ConditionFalse},
},
},
},
},
wantRestartStatus: &v1alpha1.RestartStatus{
ObservedID: newID,
Phase: v1alpha1.RestartPhaseRestarting,
InProgress: []string{"completed", "decode", "frontend"}, // ALL services, sorted
},
},
{
name: "sequential restart - new restart request during ongoing restart resets to first service (DCD pathway)",
dgdSpec: v1alpha1.DynamoGraphDeploymentSpec{
Restart: &v1alpha1.Restart{
ID: newID, // NEW timestamp
Strategy: &v1alpha1.RestartStrategy{
Type: v1alpha1.RestartStrategyTypeSequential,
Order: []string{"frontend", "decode", "worker"},
},
},
Services: map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
"frontend": {
Replicas: ptr.To(int32(1)),
},
"decode": {
Replicas: ptr.To(int32(1)),
},
"worker": {
Replicas: ptr.To(int32(1)),
},
},
},
dgdStatus: v1alpha1.DynamoGraphDeploymentStatus{
Restart: &v1alpha1.RestartStatus{
ObservedID: oldID,
Phase: v1alpha1.RestartPhaseRestarting,
InProgress: []string{"decode"},
},
},
existingResources: []client.Object{
&v1alpha1.DynamoComponentDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dgd-frontend",
Namespace: "default",
Generation: 2,
},
Status: v1alpha1.DynamoComponentDeploymentStatus{
ObservedGeneration: 2,
Conditions: []metav1.Condition{
{Type: v1alpha1.DynamoGraphDeploymentConditionTypeAvailable, Status: metav1.ConditionTrue},
},
},
},
&v1alpha1.DynamoComponentDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dgd-decode",
Namespace: "default",
Generation: 2,
},
Status: v1alpha1.DynamoComponentDeploymentStatus{
ObservedGeneration: 2,
Conditions: []metav1.Condition{
{Type: v1alpha1.DynamoGraphDeploymentConditionTypeAvailable, Status: metav1.ConditionTrue},
},
},
},
&v1alpha1.DynamoComponentDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dgd-worker",
Namespace: "default",
Generation: 2,
},
Status: v1alpha1.DynamoComponentDeploymentStatus{
ObservedGeneration: 2,
Conditions: []metav1.Condition{
{Type: v1alpha1.DynamoGraphDeploymentConditionTypeAvailable, Status: metav1.ConditionTrue},
},
},
},
},
wantRestartStatus: &v1alpha1.RestartStatus{
ObservedID: newID,
Phase: v1alpha1.RestartPhaseRestarting,
InProgress: []string{"frontend"}, // Reset to FIRST service
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := gomega.NewGomegaWithT(t)
s := scheme.Scheme
err := v1alpha1.AddToScheme(s)
g.Expect(err).NotTo(gomega.HaveOccurred())
err = grovev1alpha1.AddToScheme(s)
g.Expect(err).NotTo(gomega.HaveOccurred())
dgd := &v1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dgd",
Namespace: "default",
},
Spec: tt.dgdSpec,
Status: tt.dgdStatus,
}
var objects []client.Object
objects = append(objects, dgd)
objects = append(objects, tt.existingResources...)
fakeKubeClient := fake.NewClientBuilder().
WithScheme(s).
WithObjects(objects...).
WithStatusSubresource(objects...).
Build()
recorder := record.NewFakeRecorder(100)
reconciler := &DynamoGraphDeploymentReconciler{
Client: fakeKubeClient,
Recorder: recorder,
Config: controller_common.Config{
Grove: controller_common.GroveConfig{
Enabled: tt.groveEnabled,
},
},
}
result := reconciler.computeRestartStatus(ctx, dgd)
if tt.wantRestartStatus == nil {
g.Expect(result).To(gomega.BeNil())
return
}
g.Expect(result).NotTo(gomega.BeNil())
g.Expect(result).To(gomega.Equal(tt.wantRestartStatus))
})
}
}
func Test_reconcileDynamoComponentsDeployments(t *testing.T) { func Test_reconcileDynamoComponentsDeployments(t *testing.T) {
ctx := context.Background() ctx := context.Background()
...@@ -1238,7 +1907,7 @@ func Test_reconcileDynamoComponentsDeployments(t *testing.T) { ...@@ -1238,7 +1907,7 @@ func Test_reconcileDynamoComponentsDeployments(t *testing.T) {
Config: controller_common.Config{}, Config: controller_common.Config{},
} }
result, err := reconciler.reconcileDynamoComponentsDeployments(ctx, dgd) result, err := reconciler.reconcileDynamoComponentsDeployments(ctx, dgd, nil)
g.Expect(err).NotTo(gomega.HaveOccurred()) g.Expect(err).NotTo(gomega.HaveOccurred())
g.Expect(result).To(gomega.Equal(tt.wantReconcileResult)) g.Expect(result).To(gomega.Equal(tt.wantReconcileResult))
......
...@@ -44,6 +44,142 @@ import ( ...@@ -44,6 +44,142 @@ import (
networkingv1 "k8s.io/api/networking/v1" networkingv1 "k8s.io/api/networking/v1"
) )
// RestartState holds the restart state for DGD services.
type RestartState struct {
// Timestamp is the restart timestamp to apply as the annotation value.
// Format: RFC3339
Timestamp string
// ServicesToAnnotate is the set of service names that should have the restart annotation.
ServicesToAnnotate map[string]bool
}
// ShouldAnnotateService returns true if the given service should have a restart annotation.
func (s *RestartState) ShouldAnnotateService(serviceName string) bool {
if s == nil || s.ServicesToAnnotate == nil {
return false
}
return s.ServicesToAnnotate[serviceName]
}
// DetermineRestartState computes the restart state for DGD services.
func DetermineRestartState(dgd *v1alpha1.DynamoGraphDeployment, restartStatus *v1alpha1.RestartStatus) *RestartState {
if restartStatus == nil {
return nil
}
if dgd.Spec.Restart == nil || dgd.Spec.Restart.ID == "" {
// Check if there's a completed restart we need to preserve
if restartStatus.ObservedID != "" {
return &RestartState{
Timestamp: restartStatus.ObservedID,
ServicesToAnnotate: getAllServiceNames(dgd),
}
}
return nil
}
specID := dgd.Spec.Restart.ID
isNewRestart := restartStatus.ObservedID == "" ||
dgd.Spec.Restart.ID != restartStatus.ObservedID
if !isNewRestart && restartStatus.Phase == v1alpha1.RestartPhaseCompleted {
return &RestartState{
Timestamp: specID,
ServicesToAnnotate: getAllServiceNames(dgd),
}
}
if IsParallelRestart(dgd) {
return &RestartState{
Timestamp: specID,
ServicesToAnnotate: getAllServiceNames(dgd),
}
}
// Sequential restart (default or specified)
return &RestartState{
Timestamp: specID,
ServicesToAnnotate: getServicesToAnnotateForSequentialRestart(dgd, restartStatus),
}
}
// getAllServiceNames returns a map of all service names in the DGD.
func getAllServiceNames(dgd *v1alpha1.DynamoGraphDeployment) map[string]bool {
services := make(map[string]bool, len(dgd.Spec.Services))
for serviceName := range dgd.Spec.Services {
services[serviceName] = true
}
return services
}
// IsParallelRestart returns true if the restart strategy is parallel.
func IsParallelRestart(dgd *v1alpha1.DynamoGraphDeployment) bool {
if dgd.Spec.Restart == nil || dgd.Spec.Restart.Strategy == nil {
return false // Default is sequential
}
return dgd.Spec.Restart.Strategy.Type == v1alpha1.RestartStrategyTypeParallel
}
// getServicesToAnnotateForSequentialRestart determines which services should be annotated
// for a sequential restart in progress.
func getServicesToAnnotateForSequentialRestart(dgd *v1alpha1.DynamoGraphDeployment, status *v1alpha1.RestartStatus) map[string]bool {
services := make(map[string]bool)
order := GetRestartOrder(dgd)
if len(order) == 0 {
return services
}
// New restart or Pending phase - only first service needs to be annotated
if status == nil ||
status.Phase == v1alpha1.RestartPhasePending ||
len(status.InProgress) == 0 {
services[order[0]] = true
return services
}
// Find the max index among in-progress services
inProgress := make(map[string]bool)
for _, svc := range status.InProgress {
inProgress[svc] = true
}
maxIndex := -1
for i, svc := range order {
if inProgress[svc] {
if i > maxIndex {
maxIndex = i
}
}
}
// Add all services up to and including maxIndex
// Services before the in-progress one have completed and need their annotation preserved
if maxIndex >= 0 {
for i := 0; i <= maxIndex; i++ {
services[order[i]] = true
}
}
return services
}
// GetRestartOrder returns the order of services for sequential restart.
// If not specified, returns a deterministic alphabetical order.
func GetRestartOrder(dgd *v1alpha1.DynamoGraphDeployment) []string {
if dgd.Spec.Restart != nil && dgd.Spec.Restart.Strategy != nil && len(dgd.Spec.Restart.Strategy.Order) > 0 {
return dgd.Spec.Restart.Strategy.Order
}
order := make([]string, 0, len(dgd.Spec.Services))
for serviceName := range dgd.Spec.Services {
order = append(order, serviceName)
}
sort.Strings(order)
return order
}
// ServiceConfig represents the YAML configuration structure for a service // ServiceConfig represents the YAML configuration structure for a service
type DynamoConfig struct { type DynamoConfig struct {
Enabled bool `yaml:"enabled"` Enabled bool `yaml:"enabled"`
...@@ -113,7 +249,7 @@ func ParseDynDeploymentConfig(ctx context.Context, jsonContent []byte) (DynDeplo ...@@ -113,7 +249,7 @@ func ParseDynDeploymentConfig(ctx context.Context, jsonContent []byte) (DynDeplo
} }
// GenerateDynamoComponentsDeployments generates a map of DynamoComponentDeployments from a DynamoGraphConfig // GenerateDynamoComponentsDeployments generates a map of DynamoComponentDeployments from a DynamoGraphConfig
func GenerateDynamoComponentsDeployments(ctx context.Context, parentDynamoGraphDeployment *v1alpha1.DynamoGraphDeployment, defaultIngressSpec *v1alpha1.IngressSpec) (map[string]*v1alpha1.DynamoComponentDeployment, error) { func GenerateDynamoComponentsDeployments(ctx context.Context, parentDynamoGraphDeployment *v1alpha1.DynamoGraphDeployment, defaultIngressSpec *v1alpha1.IngressSpec, restartState *RestartState, existingRestartAnnotations map[string]string) (map[string]*v1alpha1.DynamoComponentDeployment, error) {
deployments := make(map[string]*v1alpha1.DynamoComponentDeployment) deployments := make(map[string]*v1alpha1.DynamoComponentDeployment)
for componentName, component := range parentDynamoGraphDeployment.Spec.Services { for componentName, component := range parentDynamoGraphDeployment.Spec.Services {
dynamoNamespace := getDynamoNamespace(parentDynamoGraphDeployment, component) dynamoNamespace := getDynamoNamespace(parentDynamoGraphDeployment, component)
...@@ -146,6 +282,23 @@ func GenerateDynamoComponentsDeployments(ctx context.Context, parentDynamoGraphD ...@@ -146,6 +282,23 @@ func GenerateDynamoComponentsDeployments(ctx context.Context, parentDynamoGraphD
} }
} }
// Apply restart annotation if this service should be restarted.
// For services not in the current restart order, preserve their existing annotation
// to avoid triggering unwanted rollouts when a new restart begins.
if restartState.ShouldAnnotateService(componentName) {
if deployment.Spec.Annotations == nil {
deployment.Spec.Annotations = make(map[string]string)
}
deployment.Spec.Annotations[commonconsts.RestartAnnotation] = restartState.Timestamp
} else if existingRestartAnnotations != nil {
if existingRestartAt, ok := existingRestartAnnotations[componentName]; ok && existingRestartAt != "" {
if deployment.Spec.Annotations == nil {
deployment.Spec.Annotations = make(map[string]string)
}
deployment.Spec.Annotations[commonconsts.RestartAnnotation] = existingRestartAt
}
}
if component.ComponentType == commonconsts.ComponentTypePlanner { if component.ComponentType == commonconsts.ComponentTypePlanner {
// ensure that the extraPodSpec is not nil // ensure that the extraPodSpec is not nil
if deployment.Spec.ExtraPodSpec == nil { if deployment.Spec.ExtraPodSpec == nil {
...@@ -959,6 +1112,8 @@ func GenerateGrovePodCliqueSet( ...@@ -959,6 +1112,8 @@ func GenerateGrovePodCliqueSet(
dynamoDeployment *v1alpha1.DynamoGraphDeployment, dynamoDeployment *v1alpha1.DynamoGraphDeployment,
controllerConfig controller_common.Config, controllerConfig controller_common.Config,
secretsRetriever SecretsRetriever, secretsRetriever SecretsRetriever,
restartState *RestartState,
existingRestartAnnotations map[string]string,
) (*grovev1alpha1.PodCliqueSet, error) { ) (*grovev1alpha1.PodCliqueSet, error) {
gangSet := &grovev1alpha1.PodCliqueSet{} gangSet := &grovev1alpha1.PodCliqueSet{}
gangSet.Name = dynamoDeployment.Name gangSet.Name = dynamoDeployment.Name
...@@ -1040,6 +1195,23 @@ func GenerateGrovePodCliqueSet( ...@@ -1040,6 +1195,23 @@ func GenerateGrovePodCliqueSet(
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to generate annotations: %w", err) return nil, fmt.Errorf("failed to generate annotations: %w", err)
} }
// Apply restart annotation if this service should be restarted.
// For services not in the current restart order, preserve their existing annotation
// to avoid triggering unwanted rollouts when a new restart begins.
if restartState.ShouldAnnotateService(serviceName) {
if annotations == nil {
annotations = make(map[string]string)
}
annotations[commonconsts.RestartAnnotation] = restartState.Timestamp
} else if existingRestartAnnotations != nil {
if existingTimestamp, ok := existingRestartAnnotations[serviceName]; ok {
if annotations == nil {
annotations = make(map[string]string)
}
annotations[commonconsts.RestartAnnotation] = existingTimestamp
}
}
clique.Annotations = annotations clique.Annotations = annotations
// Inject kai-scheduler settings if enabled // Inject kai-scheduler settings if enabled
......
...@@ -722,7 +722,7 @@ func TestGenerateDynamoComponentsDeployments(t *testing.T) { ...@@ -722,7 +722,7 @@ func TestGenerateDynamoComponentsDeployments(t *testing.T) {
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
got, err := GenerateDynamoComponentsDeployments(context.Background(), tt.args.parentDynamoGraphDeployment, tt.args.ingressSpec) got, err := GenerateDynamoComponentsDeployments(context.Background(), tt.args.parentDynamoGraphDeployment, tt.args.ingressSpec, nil, nil)
if (err != nil) != tt.wantErr { if (err != nil) != tt.wantErr {
t.Errorf("GenerateDynamoComponentsDeployments() error = %v, wantErr %v", err, tt.wantErr) t.Errorf("GenerateDynamoComponentsDeployments() error = %v, wantErr %v", err, tt.wantErr)
return return
...@@ -756,7 +756,7 @@ func Test_GetDynamoComponentDeploymentsGlobalNamespace(t *testing.T) { ...@@ -756,7 +756,7 @@ func Test_GetDynamoComponentDeploymentsGlobalNamespace(t *testing.T) {
}, },
} }
got, err := GenerateDynamoComponentsDeployments(context.Background(), dgd, nil) got, err := GenerateDynamoComponentsDeployments(context.Background(), dgd, nil, nil, nil)
if !assert.NoError(t, err) { if !assert.NoError(t, err) {
return return
} }
...@@ -3548,7 +3548,7 @@ func TestGenerateGrovePodCliqueSet(t *testing.T) { ...@@ -3548,7 +3548,7 @@ func TestGenerateGrovePodCliqueSet(t *testing.T) {
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
got, err := GenerateGrovePodCliqueSet(tt.args.ctx, tt.args.dynamoDeployment, tt.args.controllerConfig, nil) got, err := GenerateGrovePodCliqueSet(tt.args.ctx, tt.args.dynamoDeployment, tt.args.controllerConfig, nil, nil, nil)
if (err != nil) != tt.wantErr { if (err != nil) != tt.wantErr {
t.Errorf("GenerateGrovePodCliqueSet() error = %v, wantErr %v", err, tt.wantErr) t.Errorf("GenerateGrovePodCliqueSet() error = %v, wantErr %v", err, tt.wantErr)
return return
...@@ -3600,7 +3600,7 @@ func Test_GeneratePodCliqueSetGlobalDynamoNamespace(t *testing.T) { ...@@ -3600,7 +3600,7 @@ func Test_GeneratePodCliqueSetGlobalDynamoNamespace(t *testing.T) {
}, },
} }
got, err := GenerateGrovePodCliqueSet(context.Background(), dynamoDeployment, controller_common.Config{}, nil) got, err := GenerateGrovePodCliqueSet(context.Background(), dynamoDeployment, controller_common.Config{}, nil, nil, nil)
if !assert.NoError(t, err) { if !assert.NoError(t, err) {
return return
} }
...@@ -4683,7 +4683,7 @@ func TestGenerateGrovePodCliqueSet_StartsAfterDependencies(t *testing.T) { ...@@ -4683,7 +4683,7 @@ func TestGenerateGrovePodCliqueSet_StartsAfterDependencies(t *testing.T) {
NatsAddress: "nats-address", NatsAddress: "nats-address",
} }
got, err := GenerateGrovePodCliqueSet(context.Background(), dynamoDeployment, controllerConfig, secretsRetriever) got, err := GenerateGrovePodCliqueSet(context.Background(), dynamoDeployment, controllerConfig, secretsRetriever, nil, nil)
if err != nil { if err != nil {
t.Errorf("GenerateGrovePodCliqueSet() error = %v", err) t.Errorf("GenerateGrovePodCliqueSet() error = %v", err)
return return
...@@ -5984,3 +5984,552 @@ func TestGenerateBasePodSpec_SecurityContext(t *testing.T) { ...@@ -5984,3 +5984,552 @@ func TestGenerateBasePodSpec_SecurityContext(t *testing.T) {
}) })
} }
} }
func TestDetermineGroveRestartState(t *testing.T) {
restartID := "restart-1"
oldRestartID := "restart-0"
tests := []struct {
name string
dgd *v1alpha1.DynamoGraphDeployment
restartStatus *v1alpha1.RestartStatus
want *RestartState
wantNil bool
wantSvcs []string // expected services to annotate (sorted)
wantTimestamp *string
}{
{
name: "restartStatus nil returns nil",
dgd: &v1alpha1.DynamoGraphDeployment{
Spec: v1alpha1.DynamoGraphDeploymentSpec{
Services: map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
"Frontend": {},
"Worker": {},
},
},
},
wantNil: true,
},
{
name: "spec.restart.at nil and restartStatus.observedAt nil returns nil",
dgd: &v1alpha1.DynamoGraphDeployment{
Spec: v1alpha1.DynamoGraphDeploymentSpec{
Services: map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
"Frontend": {},
"Worker": {},
},
Restart: &v1alpha1.Restart{},
},
},
restartStatus: &v1alpha1.RestartStatus{
ObservedID: "",
},
wantNil: true,
},
{
name: "new parallel restart annotates all services",
dgd: &v1alpha1.DynamoGraphDeployment{
Spec: v1alpha1.DynamoGraphDeploymentSpec{
Services: map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
"Frontend": {},
"Worker": {},
},
Restart: &v1alpha1.Restart{
ID: restartID,
Strategy: &v1alpha1.RestartStrategy{
Type: v1alpha1.RestartStrategyTypeParallel,
},
},
},
},
restartStatus: &v1alpha1.RestartStatus{
ObservedID: restartID,
Phase: v1alpha1.RestartPhaseRestarting,
InProgress: []string{"Frontend", "Worker"},
},
wantSvcs: []string{"Frontend", "Worker"},
wantTimestamp: ptr.To(restartID),
},
{
name: "new sequential restart annotates only first service",
dgd: &v1alpha1.DynamoGraphDeployment{
Spec: v1alpha1.DynamoGraphDeploymentSpec{
Services: map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
"Frontend": {},
"Worker": {},
},
Restart: &v1alpha1.Restart{
ID: restartID,
Strategy: &v1alpha1.RestartStrategy{
Type: v1alpha1.RestartStrategyTypeSequential,
Order: []string{"Worker", "Frontend"},
},
},
},
},
restartStatus: &v1alpha1.RestartStatus{
ObservedID: restartID,
Phase: v1alpha1.RestartPhaseRestarting,
InProgress: []string{"Worker"},
},
wantSvcs: []string{"Worker"},
wantTimestamp: ptr.To(restartID),
},
{
name: "sequential restart in progress annotates completed + in-progress",
dgd: &v1alpha1.DynamoGraphDeployment{
Spec: v1alpha1.DynamoGraphDeploymentSpec{
Services: map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
"Frontend": {},
"Worker": {},
"Backend": {},
},
Restart: &v1alpha1.Restart{
ID: restartID,
Strategy: &v1alpha1.RestartStrategy{
Type: v1alpha1.RestartStrategyTypeSequential,
Order: []string{"Frontend", "Worker", "Backend"},
},
},
},
},
restartStatus: &v1alpha1.RestartStatus{
ObservedID: restartID,
Phase: v1alpha1.RestartPhaseRestarting,
InProgress: []string{"Worker"},
},
wantSvcs: []string{"Frontend", "Worker"}, // Frontend completed, Worker in progress
wantTimestamp: ptr.To(restartID),
},
{
name: "default restart in progress annotates completed + in-progress",
dgd: &v1alpha1.DynamoGraphDeployment{
Spec: v1alpha1.DynamoGraphDeploymentSpec{
Services: map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
"Frontend": {},
"Worker": {},
"Backend": {},
},
Restart: &v1alpha1.Restart{
ID: restartID,
},
},
},
restartStatus: &v1alpha1.RestartStatus{
ObservedID: restartID,
Phase: v1alpha1.RestartPhaseRestarting,
InProgress: []string{"Worker"},
},
wantSvcs: []string{"Frontend", "Worker", "Backend"},
wantTimestamp: ptr.To(restartID),
},
{
name: "completed restart with empty spec restart preserves all annotations",
dgd: &v1alpha1.DynamoGraphDeployment{
Spec: v1alpha1.DynamoGraphDeploymentSpec{
Services: map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
"Frontend": {},
"Worker": {},
},
},
},
restartStatus: &v1alpha1.RestartStatus{
ObservedID: oldRestartID,
Phase: v1alpha1.RestartPhaseCompleted,
},
wantSvcs: []string{"Frontend", "Worker"},
wantTimestamp: ptr.To(oldRestartID),
},
{
name: "completed restart",
dgd: &v1alpha1.DynamoGraphDeployment{
Spec: v1alpha1.DynamoGraphDeploymentSpec{
Services: map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
"Frontend": {},
"Worker": {},
},
Restart: &v1alpha1.Restart{
ID: restartID,
},
},
},
restartStatus: &v1alpha1.RestartStatus{
ObservedID: restartID,
Phase: v1alpha1.RestartPhaseCompleted,
},
wantSvcs: []string{"Frontend", "Worker"},
wantTimestamp: ptr.To(restartID),
},
{
name: "new restart after completed restart",
dgd: &v1alpha1.DynamoGraphDeployment{
Spec: v1alpha1.DynamoGraphDeploymentSpec{
Services: map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
"Frontend": {},
"Worker": {},
},
Restart: &v1alpha1.Restart{
ID: restartID, // new time
Strategy: &v1alpha1.RestartStrategy{
Type: v1alpha1.RestartStrategyTypeParallel,
},
},
},
},
restartStatus: &v1alpha1.RestartStatus{
ObservedID: oldRestartID,
Phase: v1alpha1.RestartPhaseCompleted,
},
wantSvcs: []string{"Frontend", "Worker"},
wantTimestamp: ptr.To(restartID),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := DetermineRestartState(tt.dgd, tt.restartStatus)
if tt.wantNil {
if got != nil {
t.Errorf("DetermineGroveRestartState() = %v, want nil", got)
}
return
}
if got == nil {
t.Errorf("DetermineGroveRestartState() = nil, want non-nil")
return
}
var gotSvcs []string
for svc, shouldAnnotate := range got.ServicesToAnnotate {
if shouldAnnotate {
gotSvcs = append(gotSvcs, svc)
}
}
sort.Strings(gotSvcs)
sort.Strings(tt.wantSvcs)
if !reflect.DeepEqual(gotSvcs, tt.wantSvcs) {
t.Errorf("DetermineGroveRestartState() services = %v, want %v", gotSvcs, tt.wantSvcs)
}
if tt.wantTimestamp != nil && (got.Timestamp != *tt.wantTimestamp) {
t.Errorf("DetermineGroveRestartState() timestamp = %v, want %v", got.Timestamp, *tt.wantTimestamp)
}
})
}
}
func TestGroveRestartStateShouldAnnotateService(t *testing.T) {
tests := []struct {
name string
state *RestartState
serviceName string
want bool
}{
{
name: "nil state returns false",
state: nil,
serviceName: "Frontend",
want: false,
},
{
name: "nil services map returns false",
state: &RestartState{
Timestamp: "2024-01-01T00:00:00Z",
ServicesToAnnotate: nil,
},
serviceName: "Frontend",
want: false,
},
{
name: "service in map returns true",
state: &RestartState{
Timestamp: "2024-01-01T00:00:00Z",
ServicesToAnnotate: map[string]bool{"Frontend": true, "Worker": true},
},
serviceName: "Frontend",
want: true,
},
{
name: "service not in map returns false",
state: &RestartState{
Timestamp: "2024-01-01T00:00:00Z",
ServicesToAnnotate: map[string]bool{"Frontend": true},
},
serviceName: "Worker",
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := tt.state.ShouldAnnotateService(tt.serviceName); got != tt.want {
t.Errorf("ShouldAnnotateService() = %v, want %v", got, tt.want)
}
})
}
}
func TestGenerateGrovePodCliqueSet_RestartAnnotations(t *testing.T) {
restartTimestamp := "2024-01-05T10:00:00Z"
tests := []struct {
name string
restartState *RestartState
services map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec
wantAnnotationsPerClique map[string]bool // clique name -> should have restart annotation
wantPreservedAnnotations map[string]map[string]string // clique name -> preserved annotations to verify
}{
{
name: "nil restartState - no annotations",
restartState: nil,
services: map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
"Frontend": {
ComponentType: commonconsts.ComponentTypeFrontend,
Replicas: ptr.To(int32(1)),
},
"Worker": {
ComponentType: commonconsts.ComponentTypeWorker,
Replicas: ptr.To(int32(1)),
},
},
wantAnnotationsPerClique: map[string]bool{
"frontend": false,
"worker": false,
},
},
{
name: "nil ServicesToAnnotate - no annotations",
restartState: &RestartState{
Timestamp: restartTimestamp,
ServicesToAnnotate: nil,
},
services: map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
"Frontend": {
ComponentType: commonconsts.ComponentTypeFrontend,
Replicas: ptr.To(int32(1)),
},
},
wantAnnotationsPerClique: map[string]bool{
"frontend": false,
},
},
{
name: "all services annotated - parallel restart",
restartState: &RestartState{
Timestamp: restartTimestamp,
ServicesToAnnotate: map[string]bool{"Frontend": true, "Worker": true},
},
services: map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
"Frontend": {
ComponentType: commonconsts.ComponentTypeFrontend,
Replicas: ptr.To(int32(1)),
},
"Worker": {
ComponentType: commonconsts.ComponentTypeWorker,
Replicas: ptr.To(int32(1)),
},
},
wantAnnotationsPerClique: map[string]bool{
"frontend": true,
"worker": true,
},
},
{
name: "only first service annotated - sequential restart start",
restartState: &RestartState{
Timestamp: restartTimestamp,
ServicesToAnnotate: map[string]bool{"Frontend": true},
},
services: map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
"Frontend": {
ComponentType: commonconsts.ComponentTypeFrontend,
Replicas: ptr.To(int32(1)),
},
"Worker": {
ComponentType: commonconsts.ComponentTypeWorker,
Replicas: ptr.To(int32(1)),
},
},
wantAnnotationsPerClique: map[string]bool{
"frontend": true,
"worker": false,
},
},
{
name: "completed services keep annotation - sequential restart in progress",
restartState: &RestartState{
Timestamp: restartTimestamp,
ServicesToAnnotate: map[string]bool{"Frontend": true, "Worker": true},
},
services: map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
"Frontend": {
ComponentType: commonconsts.ComponentTypeFrontend,
Replicas: ptr.To(int32(1)),
},
"Worker": {
ComponentType: commonconsts.ComponentTypeWorker,
Replicas: ptr.To(int32(1)),
},
"Backend": {
ComponentType: commonconsts.ComponentTypeWorker,
Replicas: ptr.To(int32(1)),
},
},
wantAnnotationsPerClique: map[string]bool{
"frontend": true,
"worker": true,
"backend": false,
},
},
{
name: "service not in DGD spec - annotation still applied if in ServicesToAnnotate",
restartState: &RestartState{
Timestamp: restartTimestamp,
ServicesToAnnotate: map[string]bool{"Frontend": true, "NonExistent": true},
},
services: map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
"Frontend": {
ComponentType: commonconsts.ComponentTypeFrontend,
Replicas: ptr.To(int32(1)),
},
},
wantAnnotationsPerClique: map[string]bool{
"frontend": true,
},
},
{
name: "multinode service - all cliques get restart annotation",
restartState: &RestartState{
Timestamp: restartTimestamp,
ServicesToAnnotate: map[string]bool{"Worker": true},
},
services: map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
"Worker": {
ComponentType: commonconsts.ComponentTypeWorker,
Replicas: ptr.To(int32(2)),
Multinode: &v1alpha1.MultinodeSpec{
NodeCount: 2,
},
},
},
wantAnnotationsPerClique: map[string]bool{
"worker-ldr": true,
"worker-wkr": true,
},
},
{
name: "preserves existing annotations when adding restart annotation",
restartState: &RestartState{
Timestamp: restartTimestamp,
ServicesToAnnotate: map[string]bool{"Frontend": true},
},
services: map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
"Frontend": {
ComponentType: commonconsts.ComponentTypeFrontend,
Replicas: ptr.To(int32(1)),
Annotations: map[string]string{
"custom-annotation": "custom-value",
"another-key": "another-value",
},
},
},
wantAnnotationsPerClique: map[string]bool{
"frontend": true,
},
wantPreservedAnnotations: map[string]map[string]string{
"frontend": {
"custom-annotation": "custom-value",
"another-key": "another-value",
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
dgd := &v1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dgd",
Namespace: "default",
},
Spec: v1alpha1.DynamoGraphDeploymentSpec{
Services: tt.services,
},
}
controllerConfig := controller_common.Config{
EtcdAddress: "etcd-address",
NatsAddress: "nats-address",
}
got, err := GenerateGrovePodCliqueSet(context.Background(), dgd, controllerConfig, nil, tt.restartState, nil)
if err != nil {
t.Fatalf("GenerateGrovePodCliqueSet() error = %v", err)
}
// Build a map of clique annotations
cliqueAnnotations := make(map[string]map[string]string)
for _, clique := range got.Spec.Template.Cliques {
cliqueAnnotations[clique.Name] = clique.Annotations
}
// Verify restart annotations per clique
for cliqueName, shouldHaveAnnotation := range tt.wantAnnotationsPerClique {
annotations := cliqueAnnotations[cliqueName]
if shouldHaveAnnotation {
if annotations == nil {
t.Errorf("Clique %q: expected restart annotation, but annotations is nil", cliqueName)
continue
}
restartValue, exists := annotations[commonconsts.RestartAnnotation]
if !exists {
t.Errorf("Clique %q: expected restart annotation %q, but not found. Annotations: %v",
cliqueName, commonconsts.RestartAnnotation, annotations)
continue
}
if restartValue != restartTimestamp {
t.Errorf("Clique %q: restart annotation value = %q, want %q",
cliqueName, restartValue, restartTimestamp)
}
} else {
if annotations != nil {
if _, exists := annotations[commonconsts.RestartAnnotation]; exists {
t.Errorf("Clique %q: unexpected restart annotation found", cliqueName)
}
}
}
}
// Verify no unexpected restart annotations on cliques not in wantAnnotationsPerClique
for cliqueName, annotations := range cliqueAnnotations {
if _, specified := tt.wantAnnotationsPerClique[cliqueName]; !specified {
if annotations != nil {
if _, exists := annotations[commonconsts.RestartAnnotation]; exists {
t.Errorf("Clique %q: unexpected restart annotation found (clique not in wantAnnotationsPerClique)", cliqueName)
}
}
}
}
// Verify preserved annotations
for cliqueName, expectedAnnotations := range tt.wantPreservedAnnotations {
annotations := cliqueAnnotations[cliqueName]
if annotations == nil {
t.Errorf("Clique %q: expected preserved annotations, but annotations is nil", cliqueName)
continue
}
for key, expectedValue := range expectedAnnotations {
if actualValue, exists := annotations[key]; !exists {
t.Errorf("Clique %q: expected preserved annotation %q, but not found", cliqueName, key)
} else if actualValue != expectedValue {
t.Errorf("Clique %q: preserved annotation %q = %q, want %q",
cliqueName, key, actualValue, expectedValue)
}
}
}
})
}
}
...@@ -68,14 +68,14 @@ func GetComponentReadinessAndServiceReplicaStatuses(ctx context.Context, client ...@@ -68,14 +68,14 @@ func GetComponentReadinessAndServiceReplicaStatuses(ctx context.Context, client
if isMultinode { if isMultinode {
// Check PodCliqueScalingGroup: spec.replicas == status.availableReplicas // Check PodCliqueScalingGroup: spec.replicas == status.availableReplicas
ok, reason, serviceStatus := checkPCSGReady(ctx, client, resourceName, dgd.Namespace, logger) ok, reason, serviceStatus := CheckPCSGReady(ctx, client, resourceName, dgd.Namespace, logger)
serviceStatuses[serviceName] = serviceStatus serviceStatuses[serviceName] = serviceStatus
if !ok { if !ok {
notReadyComponents = append(notReadyComponents, fmt.Sprintf("pcsg/%s: %s", resourceName, reason)) notReadyComponents = append(notReadyComponents, fmt.Sprintf("pcsg/%s: %s", resourceName, reason))
} }
} else { } else {
// Check PodClique: spec.replicas == status.readyReplicas // Check PodClique: spec.replicas == status.readyReplicas
ok, reason, serviceStatus := checkPodCliqueReady(ctx, client, resourceName, dgd.Namespace, logger) ok, reason, serviceStatus := CheckPodCliqueReady(ctx, client, resourceName, dgd.Namespace, logger)
serviceStatuses[serviceName] = serviceStatus serviceStatuses[serviceName] = serviceStatus
if !ok { if !ok {
notReadyComponents = append(notReadyComponents, fmt.Sprintf("podclique/%s: %s", resourceName, reason)) notReadyComponents = append(notReadyComponents, fmt.Sprintf("podclique/%s: %s", resourceName, reason))
...@@ -90,8 +90,11 @@ func GetComponentReadinessAndServiceReplicaStatuses(ctx context.Context, client ...@@ -90,8 +90,11 @@ func GetComponentReadinessAndServiceReplicaStatuses(ctx context.Context, client
return true, "", serviceStatuses return true, "", serviceStatuses
} }
// checkPodCliqueReady checks if a PodClique has spec.replicas == status.readyReplicas // CheckPodCliqueReady determines if a Grove PodClique is fully ready and available.
func checkPodCliqueReady(ctx context.Context, client client.Client, resourceName, namespace string, logger logr.Logger) (bool, string, v1alpha1.ServiceReplicaStatus) { // It checks various status fields to ensure all replicas are available and the PodClique
// configuration has been fully applied. This is the PodClique equivalent of IsDeploymentReady
// for standard Kubernetes Deployments.
func CheckPodCliqueReady(ctx context.Context, client client.Client, resourceName, namespace string, logger logr.Logger) (bool, string, v1alpha1.ServiceReplicaStatus) {
podClique := &grovev1alpha1.PodClique{} podClique := &grovev1alpha1.PodClique{}
err := client.Get(ctx, types.NamespacedName{Name: resourceName, Namespace: namespace}, podClique) err := client.Get(ctx, types.NamespacedName{Name: resourceName, Namespace: namespace}, podClique)
if err != nil { if err != nil {
...@@ -105,6 +108,20 @@ func checkPodCliqueReady(ctx context.Context, client client.Client, resourceName ...@@ -105,6 +108,20 @@ func checkPodCliqueReady(ctx context.Context, client client.Client, resourceName
desiredReplicas := podClique.Spec.Replicas desiredReplicas := podClique.Spec.Replicas
readyReplicas := podClique.Status.ReadyReplicas readyReplicas := podClique.Status.ReadyReplicas
updatedReplicas := podClique.Status.UpdatedReplicas
replicas := podClique.Status.Replicas
observedGeneration := podClique.Status.ObservedGeneration
generation := podClique.Generation
logger.V(1).Info("CheckPodCliqueFullyUpdated",
"resourceName", resourceName,
"generation", podClique.Generation,
"observedGeneration", podClique.Status.ObservedGeneration,
"desiredReplicas", desiredReplicas,
"readyReplicas", readyReplicas,
"updatedReplicas", updatedReplicas,
"replicas", replicas,
)
serviceStatus := v1alpha1.ServiceReplicaStatus{ serviceStatus := v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindPodClique, ComponentKind: v1alpha1.ComponentKindPodClique,
...@@ -114,8 +131,17 @@ func checkPodCliqueReady(ctx context.Context, client client.Client, resourceName ...@@ -114,8 +131,17 @@ func checkPodCliqueReady(ctx context.Context, client client.Client, resourceName
ReadyReplicas: &readyReplicas, ReadyReplicas: &readyReplicas,
} }
if observedGeneration == nil {
logger.V(1).Info("PodClique observedGeneration is nil", "resourceName", resourceName)
return false, "observedGeneration is nil", serviceStatus
}
if observedGeneration != nil && *observedGeneration < generation {
logger.V(1).Info("PodClique spec not yet processed", "resourceName", resourceName, "generation", generation, "observedGeneration", observedGeneration)
return false, fmt.Sprintf("spec not yet processed: generation=%d, observedGeneration=%d", generation, *observedGeneration), serviceStatus
}
if desiredReplicas == 0 { if desiredReplicas == 0 {
// No replicas desired, so it's ready
return true, "", serviceStatus return true, "", serviceStatus
} }
...@@ -124,11 +150,24 @@ func checkPodCliqueReady(ctx context.Context, client client.Client, resourceName ...@@ -124,11 +150,24 @@ func checkPodCliqueReady(ctx context.Context, client client.Client, resourceName
return false, fmt.Sprintf("desired=%d, ready=%d", desiredReplicas, readyReplicas), serviceStatus return false, fmt.Sprintf("desired=%d, ready=%d", desiredReplicas, readyReplicas), serviceStatus
} }
if desiredReplicas != updatedReplicas {
logger.V(1).Info("PodClique not fully updated", "resourceName", resourceName, "desired", desiredReplicas, "updated", updatedReplicas)
return false, fmt.Sprintf("desired=%d, updated=%d", desiredReplicas, updatedReplicas), serviceStatus
}
if replicas != desiredReplicas {
logger.V(1).Info("PodClique performing rolling update", "resourceName", resourceName, "desired", desiredReplicas, "replicas", replicas)
return false, fmt.Sprintf("performing rolling update: desired=%d, replicas=%d", desiredReplicas, replicas), serviceStatus
}
return true, "", serviceStatus return true, "", serviceStatus
} }
// checkPCSGReady checks if a PodCliqueScalingGroup has spec.replicas == status.availableReplicas // CheckPCSGReady determines if a Grove PodCliqueScalingGroup is fully ready and available.
func checkPCSGReady(ctx context.Context, client client.Client, resourceName, namespace string, logger logr.Logger) (bool, string, v1alpha1.ServiceReplicaStatus) { // It checks various status fields to ensure all replicas are available and the PodClique
// configuration has been fully applied. This is the PodCliqueScalingGroup equivalent of IsDeploymentReady
// for standard Kubernetes Deployments.
func CheckPCSGReady(ctx context.Context, client client.Client, resourceName, namespace string, logger logr.Logger) (bool, string, v1alpha1.ServiceReplicaStatus) {
pcsg := &grovev1alpha1.PodCliqueScalingGroup{} pcsg := &grovev1alpha1.PodCliqueScalingGroup{}
err := client.Get(ctx, types.NamespacedName{Name: resourceName, Namespace: namespace}, pcsg) err := client.Get(ctx, types.NamespacedName{Name: resourceName, Namespace: namespace}, pcsg)
if err != nil { if err != nil {
...@@ -142,6 +181,20 @@ func checkPCSGReady(ctx context.Context, client client.Client, resourceName, nam ...@@ -142,6 +181,20 @@ func checkPCSGReady(ctx context.Context, client client.Client, resourceName, nam
desiredReplicas := pcsg.Spec.Replicas desiredReplicas := pcsg.Spec.Replicas
availableReplicas := pcsg.Status.AvailableReplicas availableReplicas := pcsg.Status.AvailableReplicas
updatedReplicas := pcsg.Status.UpdatedReplicas
replicas := pcsg.Status.Replicas
observedGeneration := pcsg.Status.ObservedGeneration
generation := pcsg.Generation
logger.V(1).Info("CheckPCSGFullyUpdated",
"resourceName", resourceName,
"generation", pcsg.Generation,
"observedGeneration", pcsg.Status.ObservedGeneration,
"desiredReplicas", desiredReplicas,
"availableReplicas", availableReplicas,
"updatedReplicas", updatedReplicas,
"replicas", replicas,
)
serviceStatus := v1alpha1.ServiceReplicaStatus{ serviceStatus := v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindPodCliqueScalingGroup, ComponentKind: v1alpha1.ComponentKindPodCliqueScalingGroup,
...@@ -151,6 +204,16 @@ func checkPCSGReady(ctx context.Context, client client.Client, resourceName, nam ...@@ -151,6 +204,16 @@ func checkPCSGReady(ctx context.Context, client client.Client, resourceName, nam
AvailableReplicas: &availableReplicas, AvailableReplicas: &availableReplicas,
} }
if observedGeneration == nil {
logger.V(1).Info("PodCliqueScalingGroup observedGeneration is nil", "resourceName", resourceName)
return false, "observedGeneration is nil", serviceStatus
}
if observedGeneration != nil && *observedGeneration < generation {
logger.V(1).Info("PodCliqueScalingGroup spec not yet processed", "resourceName", resourceName, "generation", generation, "observedGeneration", observedGeneration)
return false, fmt.Sprintf("spec not yet processed: generation=%d, observedGeneration=%d", generation, *observedGeneration), serviceStatus
}
if desiredReplicas == 0 { if desiredReplicas == 0 {
// No replicas desired, so it's ready // No replicas desired, so it's ready
return true, "", serviceStatus return true, "", serviceStatus
...@@ -161,6 +224,16 @@ func checkPCSGReady(ctx context.Context, client client.Client, resourceName, nam ...@@ -161,6 +224,16 @@ func checkPCSGReady(ctx context.Context, client client.Client, resourceName, nam
return false, fmt.Sprintf("desired=%d, available=%d", desiredReplicas, availableReplicas), serviceStatus return false, fmt.Sprintf("desired=%d, available=%d", desiredReplicas, availableReplicas), serviceStatus
} }
if desiredReplicas != updatedReplicas {
logger.V(1).Info("PodCliqueScalingGroup not fully updated", "resourceName", resourceName, "desired", desiredReplicas, "updated", updatedReplicas)
return false, fmt.Sprintf("desired=%d, updated=%d", desiredReplicas, updatedReplicas), serviceStatus
}
if replicas != desiredReplicas {
logger.V(1).Info("PodCliqueScalingGroup performing rolling update", "resourceName", resourceName, "desired", desiredReplicas, "replicas", replicas)
return false, fmt.Sprintf("performing rolling update: desired=%d, replicas=%d", desiredReplicas, replicas), serviceStatus
}
return true, "", serviceStatus return true, "", serviceStatus
} }
......
...@@ -20,6 +20,7 @@ import ( ...@@ -20,6 +20,7 @@ import (
"k8s.io/utils/ptr" "k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/log"
) )
func TestResolveKaiSchedulerQueueName(t *testing.T) { func TestResolveKaiSchedulerQueueName(t *testing.T) {
...@@ -318,6 +319,536 @@ func TestEnsureQueueExists(t *testing.T) { ...@@ -318,6 +319,536 @@ func TestEnsureQueueExists(t *testing.T) {
} }
} }
func TestCheckPodCliqueReady(t *testing.T) {
ctx := context.Background()
tests := []struct {
name string
resourceName string
namespace string
existingPodClique *grovev1alpha1.PodClique
wantReady bool
wantReasonContains string
wantServiceStatus v1alpha1.ServiceReplicaStatus
}{
{
name: "PodClique not found",
resourceName: "missing-podclique",
namespace: "default",
wantReady: false,
wantReasonContains: "resource not found",
wantServiceStatus: v1alpha1.ServiceReplicaStatus{},
},
{
name: "PodClique fully ready",
resourceName: "ready-podclique",
namespace: "default",
existingPodClique: &grovev1alpha1.PodClique{
ObjectMeta: metav1.ObjectMeta{
Name: "ready-podclique",
Namespace: "default",
Generation: 1,
},
Spec: grovev1alpha1.PodCliqueSpec{
Replicas: 3,
},
Status: grovev1alpha1.PodCliqueStatus{
Replicas: 3,
ReadyReplicas: 3,
UpdatedReplicas: 3,
ObservedGeneration: ptr.To(int64(1)),
},
},
wantReady: true,
wantServiceStatus: v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindPodClique,
ComponentName: "ready-podclique",
Replicas: 3,
UpdatedReplicas: 3,
ReadyReplicas: ptr.To(int32(3)),
},
},
{
name: "PodClique with zero replicas desired",
resourceName: "zero-replicas-podclique",
namespace: "default",
existingPodClique: &grovev1alpha1.PodClique{
ObjectMeta: metav1.ObjectMeta{
Name: "zero-replicas-podclique",
Namespace: "default",
Generation: 1,
},
Spec: grovev1alpha1.PodCliqueSpec{
Replicas: 0,
},
Status: grovev1alpha1.PodCliqueStatus{
Replicas: 0,
ReadyReplicas: 0,
UpdatedReplicas: 0,
ObservedGeneration: ptr.To(int64(1)),
},
},
wantReady: true,
wantServiceStatus: v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindPodClique,
ComponentName: "zero-replicas-podclique",
Replicas: 0,
UpdatedReplicas: 0,
ReadyReplicas: ptr.To(int32(0)),
},
},
{
name: "PodClique spec not yet processed - observedGeneration < generation",
resourceName: "stale-podclique",
namespace: "default",
existingPodClique: &grovev1alpha1.PodClique{
ObjectMeta: metav1.ObjectMeta{
Name: "stale-podclique",
Namespace: "default",
Generation: 3,
},
Spec: grovev1alpha1.PodCliqueSpec{
Replicas: 2,
},
Status: grovev1alpha1.PodCliqueStatus{
Replicas: 2,
ReadyReplicas: 2,
UpdatedReplicas: 2,
ObservedGeneration: ptr.To(int64(2)),
},
},
wantReady: false,
wantReasonContains: "spec not yet processed",
wantServiceStatus: v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindPodClique,
ComponentName: "stale-podclique",
Replicas: 2,
UpdatedReplicas: 2,
ReadyReplicas: ptr.To(int32(2)),
},
},
{
name: "PodClique not ready - ready replicas less than desired",
resourceName: "not-ready-podclique",
namespace: "default",
existingPodClique: &grovev1alpha1.PodClique{
ObjectMeta: metav1.ObjectMeta{
Name: "not-ready-podclique",
Namespace: "default",
Generation: 1,
},
Spec: grovev1alpha1.PodCliqueSpec{
Replicas: 3,
},
Status: grovev1alpha1.PodCliqueStatus{
Replicas: 3,
ReadyReplicas: 1,
UpdatedReplicas: 3,
ObservedGeneration: ptr.To(int64(1)),
},
},
wantReady: false,
wantReasonContains: "desired=3, ready=1",
wantServiceStatus: v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindPodClique,
ComponentName: "not-ready-podclique",
Replicas: 3,
UpdatedReplicas: 3,
ReadyReplicas: ptr.To(int32(1)),
},
},
{
name: "PodClique not fully updated - updated replicas less than desired",
resourceName: "not-updated-podclique",
namespace: "default",
existingPodClique: &grovev1alpha1.PodClique{
ObjectMeta: metav1.ObjectMeta{
Name: "not-updated-podclique",
Namespace: "default",
Generation: 1,
},
Spec: grovev1alpha1.PodCliqueSpec{
Replicas: 3,
},
Status: grovev1alpha1.PodCliqueStatus{
Replicas: 3,
ReadyReplicas: 3,
UpdatedReplicas: 2,
ObservedGeneration: ptr.To(int64(1)),
},
},
wantReady: false,
wantReasonContains: "desired=3, updated=2",
wantServiceStatus: v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindPodClique,
ComponentName: "not-updated-podclique",
Replicas: 3,
UpdatedReplicas: 2,
ReadyReplicas: ptr.To(int32(3)),
},
},
{
name: "PodClique performing rolling update - replicas != desired",
resourceName: "rolling-update-podclique",
namespace: "default",
existingPodClique: &grovev1alpha1.PodClique{
ObjectMeta: metav1.ObjectMeta{
Name: "rolling-update-podclique",
Namespace: "default",
Generation: 1,
},
Spec: grovev1alpha1.PodCliqueSpec{
Replicas: 3,
},
Status: grovev1alpha1.PodCliqueStatus{
Replicas: 4,
ReadyReplicas: 3,
UpdatedReplicas: 3,
ObservedGeneration: ptr.To(int64(1)),
},
},
wantReady: false,
wantReasonContains: "performing rolling update",
wantServiceStatus: v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindPodClique,
ComponentName: "rolling-update-podclique",
Replicas: 4,
UpdatedReplicas: 3,
ReadyReplicas: ptr.To(int32(3)),
},
},
{
name: "PodClique with nil observedGeneration",
resourceName: "nil-observed-gen-podclique",
namespace: "default",
existingPodClique: &grovev1alpha1.PodClique{
ObjectMeta: metav1.ObjectMeta{
Name: "nil-observed-gen-podclique",
Namespace: "default",
Generation: 1,
},
Spec: grovev1alpha1.PodCliqueSpec{
Replicas: 2,
},
Status: grovev1alpha1.PodCliqueStatus{
Replicas: 2,
ReadyReplicas: 2,
UpdatedReplicas: 2,
ObservedGeneration: nil,
},
},
wantReady: false,
wantReasonContains: "observedGeneration is nil",
wantServiceStatus: v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindPodClique,
ComponentName: "nil-observed-gen-podclique",
Replicas: 2,
UpdatedReplicas: 2,
ReadyReplicas: ptr.To(int32(2)),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := gomega.NewGomegaWithT(t)
s := scheme.Scheme
err := v1alpha1.AddToScheme(s)
g.Expect(err).NotTo(gomega.HaveOccurred())
err = grovev1alpha1.AddToScheme(s)
g.Expect(err).NotTo(gomega.HaveOccurred())
var objects []client.Object
if tt.existingPodClique != nil {
objects = append(objects, tt.existingPodClique)
}
fakeKubeClient := fake.NewClientBuilder().
WithScheme(s).
WithObjects(objects...).
WithStatusSubresource(objects...).
Build()
logger := log.FromContext(ctx)
ready, reason, serviceStatus := CheckPodCliqueReady(ctx, fakeKubeClient, tt.resourceName, tt.namespace, logger)
g.Expect(ready).To(gomega.Equal(tt.wantReady))
if tt.wantReasonContains != "" {
g.Expect(reason).To(gomega.ContainSubstring(tt.wantReasonContains))
} else {
g.Expect(reason).To(gomega.Equal(""))
}
g.Expect(serviceStatus).To(gomega.Equal(tt.wantServiceStatus))
})
}
}
func TestCheckPCSGReady(t *testing.T) {
ctx := context.Background()
tests := []struct {
name string
resourceName string
namespace string
existingPCSG *grovev1alpha1.PodCliqueScalingGroup
wantReady bool
wantReasonContains string
wantServiceStatus v1alpha1.ServiceReplicaStatus
}{
{
name: "PCSG not found",
resourceName: "missing-pcsg",
namespace: "default",
wantReady: false,
wantReasonContains: "resource not found",
wantServiceStatus: v1alpha1.ServiceReplicaStatus{},
},
{
name: "PCSG fully ready",
resourceName: "ready-pcsg",
namespace: "default",
existingPCSG: &grovev1alpha1.PodCliqueScalingGroup{
ObjectMeta: metav1.ObjectMeta{
Name: "ready-pcsg",
Namespace: "default",
Generation: 1,
},
Spec: grovev1alpha1.PodCliqueScalingGroupSpec{
Replicas: 3,
},
Status: grovev1alpha1.PodCliqueScalingGroupStatus{
Replicas: 3,
AvailableReplicas: 3,
UpdatedReplicas: 3,
ObservedGeneration: ptr.To(int64(1)),
},
},
wantReady: true,
wantServiceStatus: v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindPodCliqueScalingGroup,
ComponentName: "ready-pcsg",
Replicas: 3,
UpdatedReplicas: 3,
AvailableReplicas: ptr.To(int32(3)),
},
},
{
name: "PCSG with zero replicas desired",
resourceName: "zero-replicas-pcsg",
namespace: "default",
existingPCSG: &grovev1alpha1.PodCliqueScalingGroup{
ObjectMeta: metav1.ObjectMeta{
Name: "zero-replicas-pcsg",
Namespace: "default",
Generation: 1,
},
Spec: grovev1alpha1.PodCliqueScalingGroupSpec{
Replicas: 0,
},
Status: grovev1alpha1.PodCliqueScalingGroupStatus{
Replicas: 0,
AvailableReplicas: 0,
UpdatedReplicas: 0,
ObservedGeneration: ptr.To(int64(1)),
},
},
wantReady: true,
wantServiceStatus: v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindPodCliqueScalingGroup,
ComponentName: "zero-replicas-pcsg",
Replicas: 0,
UpdatedReplicas: 0,
AvailableReplicas: ptr.To(int32(0)),
},
},
{
name: "PCSG spec not yet processed - observedGeneration < generation",
resourceName: "stale-pcsg",
namespace: "default",
existingPCSG: &grovev1alpha1.PodCliqueScalingGroup{
ObjectMeta: metav1.ObjectMeta{
Name: "stale-pcsg",
Namespace: "default",
Generation: 3,
},
Spec: grovev1alpha1.PodCliqueScalingGroupSpec{
Replicas: 2,
},
Status: grovev1alpha1.PodCliqueScalingGroupStatus{
Replicas: 2,
AvailableReplicas: 2,
UpdatedReplicas: 2,
ObservedGeneration: ptr.To(int64(2)),
},
},
wantReady: false,
wantReasonContains: "spec not yet processed",
wantServiceStatus: v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindPodCliqueScalingGroup,
ComponentName: "stale-pcsg",
Replicas: 2,
UpdatedReplicas: 2,
AvailableReplicas: ptr.To(int32(2)),
},
},
{
name: "PCSG not ready - available replicas less than desired",
resourceName: "not-ready-pcsg",
namespace: "default",
existingPCSG: &grovev1alpha1.PodCliqueScalingGroup{
ObjectMeta: metav1.ObjectMeta{
Name: "not-ready-pcsg",
Namespace: "default",
Generation: 1,
},
Spec: grovev1alpha1.PodCliqueScalingGroupSpec{
Replicas: 3,
},
Status: grovev1alpha1.PodCliqueScalingGroupStatus{
Replicas: 3,
AvailableReplicas: 1,
UpdatedReplicas: 3,
ObservedGeneration: ptr.To(int64(1)),
},
},
wantReady: false,
wantReasonContains: "desired=3, available=1",
wantServiceStatus: v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindPodCliqueScalingGroup,
ComponentName: "not-ready-pcsg",
Replicas: 3,
UpdatedReplicas: 3,
AvailableReplicas: ptr.To(int32(1)),
},
},
{
name: "PCSG not fully updated - updated replicas less than desired",
resourceName: "not-updated-pcsg",
namespace: "default",
existingPCSG: &grovev1alpha1.PodCliqueScalingGroup{
ObjectMeta: metav1.ObjectMeta{
Name: "not-updated-pcsg",
Namespace: "default",
Generation: 1,
},
Spec: grovev1alpha1.PodCliqueScalingGroupSpec{
Replicas: 3,
},
Status: grovev1alpha1.PodCliqueScalingGroupStatus{
Replicas: 3,
AvailableReplicas: 3,
UpdatedReplicas: 2,
ObservedGeneration: ptr.To(int64(1)),
},
},
wantReady: false,
wantReasonContains: "desired=3, updated=2",
wantServiceStatus: v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindPodCliqueScalingGroup,
ComponentName: "not-updated-pcsg",
Replicas: 3,
UpdatedReplicas: 2,
AvailableReplicas: ptr.To(int32(3)),
},
},
{
name: "PCSG performing rolling update - replicas != desired",
resourceName: "rolling-update-pcsg",
namespace: "default",
existingPCSG: &grovev1alpha1.PodCliqueScalingGroup{
ObjectMeta: metav1.ObjectMeta{
Name: "rolling-update-pcsg",
Namespace: "default",
Generation: 1,
},
Spec: grovev1alpha1.PodCliqueScalingGroupSpec{
Replicas: 3,
},
Status: grovev1alpha1.PodCliqueScalingGroupStatus{
Replicas: 4,
AvailableReplicas: 3,
UpdatedReplicas: 3,
ObservedGeneration: ptr.To(int64(1)),
},
},
wantReady: false,
wantReasonContains: "performing rolling update",
wantServiceStatus: v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindPodCliqueScalingGroup,
ComponentName: "rolling-update-pcsg",
Replicas: 4,
UpdatedReplicas: 3,
AvailableReplicas: ptr.To(int32(3)),
},
},
{
name: "PCSG with nil observedGeneration",
resourceName: "nil-observed-gen-pcsg",
namespace: "default",
existingPCSG: &grovev1alpha1.PodCliqueScalingGroup{
ObjectMeta: metav1.ObjectMeta{
Name: "nil-observed-gen-pcsg",
Namespace: "default",
Generation: 1,
},
Spec: grovev1alpha1.PodCliqueScalingGroupSpec{
Replicas: 2,
},
Status: grovev1alpha1.PodCliqueScalingGroupStatus{
Replicas: 2,
AvailableReplicas: 2,
UpdatedReplicas: 2,
ObservedGeneration: nil,
},
},
wantReady: false,
wantReasonContains: "observedGeneration is nil",
wantServiceStatus: v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindPodCliqueScalingGroup,
ComponentName: "nil-observed-gen-pcsg",
Replicas: 2,
UpdatedReplicas: 2,
AvailableReplicas: ptr.To(int32(2)),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := gomega.NewGomegaWithT(t)
s := scheme.Scheme
err := v1alpha1.AddToScheme(s)
g.Expect(err).NotTo(gomega.HaveOccurred())
err = grovev1alpha1.AddToScheme(s)
g.Expect(err).NotTo(gomega.HaveOccurred())
var objects []client.Object
if tt.existingPCSG != nil {
objects = append(objects, tt.existingPCSG)
}
fakeKubeClient := fake.NewClientBuilder().
WithScheme(s).
WithObjects(objects...).
WithStatusSubresource(objects...).
Build()
logger := log.FromContext(ctx)
ready, reason, serviceStatus := CheckPCSGReady(ctx, fakeKubeClient, tt.resourceName, tt.namespace, logger)
g.Expect(ready).To(gomega.Equal(tt.wantReady))
if tt.wantReasonContains != "" {
g.Expect(reason).To(gomega.ContainSubstring(tt.wantReasonContains))
} else {
g.Expect(reason).To(gomega.Equal(""))
}
g.Expect(serviceStatus).To(gomega.Equal(tt.wantServiceStatus))
})
}
}
func Test_GetComponentReadinessAndServiceReplicaStatuses(t *testing.T) { func Test_GetComponentReadinessAndServiceReplicaStatuses(t *testing.T) {
ctx := context.Background() ctx := context.Background()
...@@ -351,9 +882,10 @@ func Test_GetComponentReadinessAndServiceReplicaStatuses(t *testing.T) { ...@@ -351,9 +882,10 @@ func Test_GetComponentReadinessAndServiceReplicaStatuses(t *testing.T) {
Replicas: 2, Replicas: 2,
}, },
Status: grovev1alpha1.PodCliqueStatus{ Status: grovev1alpha1.PodCliqueStatus{
Replicas: 2, Replicas: 2,
UpdatedReplicas: 2, UpdatedReplicas: 2,
ReadyReplicas: 1, ReadyReplicas: 1,
ObservedGeneration: ptr.To(int64(1)),
}, },
}, },
}, },
...@@ -403,9 +935,10 @@ func Test_GetComponentReadinessAndServiceReplicaStatuses(t *testing.T) { ...@@ -403,9 +935,10 @@ func Test_GetComponentReadinessAndServiceReplicaStatuses(t *testing.T) {
Replicas: 2, Replicas: 2,
}, },
Status: grovev1alpha1.PodCliqueScalingGroupStatus{ Status: grovev1alpha1.PodCliqueScalingGroupStatus{
Replicas: 2, Replicas: 2,
UpdatedReplicas: 2, UpdatedReplicas: 2,
AvailableReplicas: 2, AvailableReplicas: 2,
ObservedGeneration: ptr.To(int64(1)),
}, },
}, },
&grovev1alpha1.PodCliqueScalingGroup{ &grovev1alpha1.PodCliqueScalingGroup{
...@@ -417,9 +950,10 @@ func Test_GetComponentReadinessAndServiceReplicaStatuses(t *testing.T) { ...@@ -417,9 +950,10 @@ func Test_GetComponentReadinessAndServiceReplicaStatuses(t *testing.T) {
Replicas: 3, Replicas: 3,
}, },
Status: grovev1alpha1.PodCliqueScalingGroupStatus{ Status: grovev1alpha1.PodCliqueScalingGroupStatus{
Replicas: 3, Replicas: 3,
UpdatedReplicas: 3, UpdatedReplicas: 3,
AvailableReplicas: 3, AvailableReplicas: 3,
ObservedGeneration: ptr.To(int64(1)),
}, },
}, },
}, },
...@@ -467,9 +1001,10 @@ func Test_GetComponentReadinessAndServiceReplicaStatuses(t *testing.T) { ...@@ -467,9 +1001,10 @@ func Test_GetComponentReadinessAndServiceReplicaStatuses(t *testing.T) {
Replicas: 2, Replicas: 2,
}, },
Status: grovev1alpha1.PodCliqueScalingGroupStatus{ Status: grovev1alpha1.PodCliqueScalingGroupStatus{
Replicas: 2, Replicas: 2,
UpdatedReplicas: 2, UpdatedReplicas: 2,
AvailableReplicas: 1, AvailableReplicas: 1,
ObservedGeneration: ptr.To(int64(1)),
}, },
}, },
}, },
...@@ -525,9 +1060,10 @@ func Test_GetComponentReadinessAndServiceReplicaStatuses(t *testing.T) { ...@@ -525,9 +1060,10 @@ func Test_GetComponentReadinessAndServiceReplicaStatuses(t *testing.T) {
Replicas: 1, Replicas: 1,
}, },
Status: grovev1alpha1.PodCliqueStatus{ Status: grovev1alpha1.PodCliqueStatus{
Replicas: 1, Replicas: 1,
UpdatedReplicas: 1, UpdatedReplicas: 1,
ReadyReplicas: 1, ReadyReplicas: 1,
ObservedGeneration: ptr.To(int64(1)),
}, },
}, },
&grovev1alpha1.PodCliqueScalingGroup{ &grovev1alpha1.PodCliqueScalingGroup{
...@@ -539,9 +1075,10 @@ func Test_GetComponentReadinessAndServiceReplicaStatuses(t *testing.T) { ...@@ -539,9 +1075,10 @@ func Test_GetComponentReadinessAndServiceReplicaStatuses(t *testing.T) {
Replicas: 2, Replicas: 2,
}, },
Status: grovev1alpha1.PodCliqueScalingGroupStatus{ Status: grovev1alpha1.PodCliqueScalingGroupStatus{
Replicas: 2, Replicas: 2,
UpdatedReplicas: 2, UpdatedReplicas: 2,
AvailableReplicas: 1, AvailableReplicas: 1,
ObservedGeneration: ptr.To(int64(1)),
}, },
}, },
&grovev1alpha1.PodCliqueScalingGroup{ &grovev1alpha1.PodCliqueScalingGroup{
...@@ -553,9 +1090,10 @@ func Test_GetComponentReadinessAndServiceReplicaStatuses(t *testing.T) { ...@@ -553,9 +1090,10 @@ func Test_GetComponentReadinessAndServiceReplicaStatuses(t *testing.T) {
Replicas: 2, Replicas: 2,
}, },
Status: grovev1alpha1.PodCliqueScalingGroupStatus{ Status: grovev1alpha1.PodCliqueScalingGroupStatus{
Replicas: 2, Replicas: 2,
UpdatedReplicas: 2, UpdatedReplicas: 2,
AvailableReplicas: 2, AvailableReplicas: 2,
ObservedGeneration: ptr.To(int64(1)),
}, },
}, },
}, },
......
...@@ -54,6 +54,11 @@ func (v *DynamoGraphDeploymentValidator) Validate() (admission.Warnings, error) ...@@ -54,6 +54,11 @@ func (v *DynamoGraphDeploymentValidator) Validate() (admission.Warnings, error)
return nil, err return nil, err
} }
// Validate restart
if err := v.validateRestart(); err != nil {
return nil, err
}
var allWarnings admission.Warnings var allWarnings admission.Warnings
// Validate each service // Validate each service
...@@ -263,6 +268,64 @@ func (v *DynamoGraphDeploymentValidator) validatePVC(index int, pvc *nvidiacomv1 ...@@ -263,6 +268,64 @@ func (v *DynamoGraphDeploymentValidator) validatePVC(index int, pvc *nvidiacomv1
return err return err
} }
func (v *DynamoGraphDeploymentValidator) validateRestart() error {
if v.deployment.Spec.Restart == nil {
return nil
}
restart := v.deployment.Spec.Restart
var err error
if restart.ID == "" {
err = errors.Join(err, fmt.Errorf("spec.restart.id is required"))
}
return errors.Join(err, v.validateRestartStrategyOrder())
}
func (v *DynamoGraphDeploymentValidator) validateRestartStrategyOrder() error {
restart := v.deployment.Spec.Restart
if restart.Strategy == nil || len(restart.Strategy.Order) == 0 {
return nil
}
if restart.Strategy.Type == nvidiacomv1alpha1.RestartStrategyTypeParallel {
return errors.New("spec.restart.strategy.order cannot be specified when strategy is parallel")
}
var err error
uniqueOrder := getUnique(restart.Strategy.Order)
if len(uniqueOrder) != len(restart.Strategy.Order) {
err = errors.Join(err, fmt.Errorf("spec.restart.strategy.order must be unique"))
}
if len(uniqueOrder) != len(v.deployment.Spec.Services) {
err = errors.Join(err, fmt.Errorf("spec.restart.strategy.order must have the same number of unique services as the deployment"))
}
for _, serviceName := range uniqueOrder {
if _, exists := v.deployment.Spec.Services[serviceName]; !exists {
err = errors.Join(err, fmt.Errorf("spec.restart.strategy.order contains unknown service: %s", serviceName))
}
}
return err
}
func getUnique[T comparable](slice []T) []T {
seen := make(map[T]struct{}, len(slice))
uniqueSlice := make([]T, 0, len(slice))
for _, element := range slice {
if _, exists := seen[element]; !exists {
seen[element] = struct{}{}
uniqueSlice = append(uniqueSlice, element)
}
}
return uniqueSlice
}
// getServiceNames extracts service names from a services map. // getServiceNames extracts service names from a services map.
// Returns a set-like map for efficient lookup and comparison. // Returns a set-like map for efficient lookup and comparison.
func getServiceNames(services map[string]*nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec) map[string]struct{} { func getServiceNames(services map[string]*nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec) map[string]struct{} {
......
...@@ -322,6 +322,190 @@ func TestDynamoGraphDeploymentValidator_Validate(t *testing.T) { ...@@ -322,6 +322,190 @@ func TestDynamoGraphDeploymentValidator_Validate(t *testing.T) {
wantErr: true, wantErr: true,
errMsg: "spec.services[main].sharedMemory.size is required when disabled is false", errMsg: "spec.services[main].sharedMemory.size is required when disabled is false",
}, },
// Restart validation test cases
{
name: "restart with nil at",
deployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-graph",
Namespace: "default",
},
Spec: nvidiacomv1alpha1.DynamoGraphDeploymentSpec{
Services: map[string]*nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
"main": {},
},
Restart: &nvidiacomv1alpha1.Restart{
ID: "",
},
},
},
wantErr: true,
errMsg: "spec.restart.id is required",
},
{
name: "restart with valid id and no strategy",
deployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-graph",
Namespace: "default",
},
Spec: nvidiacomv1alpha1.DynamoGraphDeploymentSpec{
Services: map[string]*nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
"main": {},
},
Restart: &nvidiacomv1alpha1.Restart{
ID: "restart-id",
},
},
},
wantErr: false,
},
{
name: "restart with parallel strategy and order specified",
deployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-graph",
Namespace: "default",
},
Spec: nvidiacomv1alpha1.DynamoGraphDeploymentSpec{
Services: map[string]*nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
"main": {},
"prefill": {},
},
Restart: &nvidiacomv1alpha1.Restart{
ID: "restart-id",
Strategy: &nvidiacomv1alpha1.RestartStrategy{
Type: nvidiacomv1alpha1.RestartStrategyTypeParallel,
Order: []string{"main", "prefill"},
},
},
},
},
wantErr: true,
errMsg: "spec.restart.strategy.order cannot be specified when strategy is parallel",
},
{
name: "restart with sequential strategy and duplicate services in order",
deployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-graph",
Namespace: "default",
},
Spec: nvidiacomv1alpha1.DynamoGraphDeploymentSpec{
Services: map[string]*nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
"main": {},
"prefill": {},
},
Restart: &nvidiacomv1alpha1.Restart{
ID: "restart-id",
Strategy: &nvidiacomv1alpha1.RestartStrategy{
Type: nvidiacomv1alpha1.RestartStrategyTypeSequential,
Order: []string{"main", "main", "prefill"},
},
},
},
},
wantErr: true,
errMsg: "spec.restart.strategy.order must be unique",
errContains: true,
},
{
name: "restart with sequential strategy and unknown service in order",
deployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-graph",
Namespace: "default",
},
Spec: nvidiacomv1alpha1.DynamoGraphDeploymentSpec{
Services: map[string]*nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
"main": {},
"prefill": {},
},
Restart: &nvidiacomv1alpha1.Restart{
ID: "restart-id",
Strategy: &nvidiacomv1alpha1.RestartStrategy{
Type: nvidiacomv1alpha1.RestartStrategyTypeSequential,
Order: []string{"main", "unknown"},
},
},
},
},
wantErr: true,
errMsg: "spec.restart.strategy.order contains unknown service: unknown",
errContains: true,
},
{
name: "restart with sequential strategy and missing service in order",
deployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-graph",
Namespace: "default",
},
Spec: nvidiacomv1alpha1.DynamoGraphDeploymentSpec{
Services: map[string]*nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
"main": {},
"prefill": {},
"decode": {},
},
Restart: &nvidiacomv1alpha1.Restart{
ID: "restart-id",
Strategy: &nvidiacomv1alpha1.RestartStrategy{
Type: nvidiacomv1alpha1.RestartStrategyTypeSequential,
Order: []string{"main", "prefill"},
},
},
},
},
wantErr: true,
errMsg: "spec.restart.strategy.order must have the same number of unique services as the deployment",
errContains: true,
},
{
name: "restart with valid sequential strategy and order",
deployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-graph",
Namespace: "default",
},
Spec: nvidiacomv1alpha1.DynamoGraphDeploymentSpec{
Services: map[string]*nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
"main": {},
"prefill": {},
"decode": {},
},
Restart: &nvidiacomv1alpha1.Restart{
ID: "restart-id",
Strategy: &nvidiacomv1alpha1.RestartStrategy{
Type: nvidiacomv1alpha1.RestartStrategyTypeSequential,
Order: []string{"prefill", "decode", "main"},
},
},
},
},
wantErr: false,
},
{
name: "restart with sequential strategy and empty order is valid",
deployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-graph",
Namespace: "default",
},
Spec: nvidiacomv1alpha1.DynamoGraphDeploymentSpec{
Services: map[string]*nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
"main": {},
},
Restart: &nvidiacomv1alpha1.Restart{
ID: "restart-id",
Strategy: &nvidiacomv1alpha1.RestartStrategy{
Type: nvidiacomv1alpha1.RestartStrategyTypeSequential,
Order: []string{},
},
},
},
},
wantErr: false,
},
} }
for _, tt := range tests { for _, tt := range tests {
...@@ -791,7 +975,7 @@ func TestDynamoGraphDeploymentValidator_ValidateUpdate(t *testing.T) { ...@@ -791,7 +975,7 @@ func TestDynamoGraphDeploymentValidator_ValidateUpdate(t *testing.T) {
errMsg: "spec.services[main] cannot change node topology (between single-node and multi-node) after creation", errMsg: "spec.services[main] cannot change node topology (between single-node and multi-node) after creation",
}, },
{ {
name: "adding new service with multinode is allowed", name: "adding new service with multinode is not allowed", // service topology is immutable
oldDeployment: &nvidiacomv1alpha1.DynamoGraphDeployment{ oldDeployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
Spec: nvidiacomv1alpha1.DynamoGraphDeploymentSpec{ Spec: nvidiacomv1alpha1.DynamoGraphDeploymentSpec{
BackendFramework: "sglang", BackendFramework: "sglang",
...@@ -810,7 +994,6 @@ func TestDynamoGraphDeploymentValidator_ValidateUpdate(t *testing.T) { ...@@ -810,7 +994,6 @@ func TestDynamoGraphDeploymentValidator_ValidateUpdate(t *testing.T) {
Multinode: nil, Multinode: nil,
}, },
"decode": { "decode": {
// New service with multinode - should be allowed
Multinode: &nvidiacomv1alpha1.MultinodeSpec{ Multinode: &nvidiacomv1alpha1.MultinodeSpec{
NodeCount: 4, NodeCount: 4,
}, },
...@@ -818,10 +1001,11 @@ func TestDynamoGraphDeploymentValidator_ValidateUpdate(t *testing.T) { ...@@ -818,10 +1001,11 @@ func TestDynamoGraphDeploymentValidator_ValidateUpdate(t *testing.T) {
}, },
}, },
}, },
wantErr: false, wantErr: true,
errMsg: "service topology is immutable and cannot be modified after creation: services added: [decode]",
}, },
{ {
name: "adding new service without multinode is allowed", name: "adding new service without multinode is not allowed", // service topology is immutable
oldDeployment: &nvidiacomv1alpha1.DynamoGraphDeployment{ oldDeployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
Spec: nvidiacomv1alpha1.DynamoGraphDeploymentSpec{ Spec: nvidiacomv1alpha1.DynamoGraphDeploymentSpec{
BackendFramework: "sglang", BackendFramework: "sglang",
...@@ -850,7 +1034,8 @@ func TestDynamoGraphDeploymentValidator_ValidateUpdate(t *testing.T) { ...@@ -850,7 +1034,8 @@ func TestDynamoGraphDeploymentValidator_ValidateUpdate(t *testing.T) {
}, },
}, },
}, },
wantErr: false, wantErr: true,
errMsg: "service topology is immutable and cannot be modified after creation: services added: [gateway]",
}, },
} }
......
...@@ -434,6 +434,7 @@ _Appears in:_ ...@@ -434,6 +434,7 @@ _Appears in:_
| `services` _object (keys:string, values:[DynamoComponentDeploymentSharedSpec](#dynamocomponentdeploymentsharedspec))_ | Services are the services to deploy as part of this deployment. | | MaxProperties: 25 <br />Optional: \{\} <br /> | | `services` _object (keys:string, values:[DynamoComponentDeploymentSharedSpec](#dynamocomponentdeploymentsharedspec))_ | Services are the services to deploy as part of this deployment. | | MaxProperties: 25 <br />Optional: \{\} <br /> |
| `envs` _[EnvVar](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.28/#envvar-v1-core) array_ | Envs are environment variables applied to all services in the deployment unless<br />overridden by service-specific configuration. | | Optional: \{\} <br /> | | `envs` _[EnvVar](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.28/#envvar-v1-core) array_ | Envs are environment variables applied to all services in the deployment unless<br />overridden by service-specific configuration. | | Optional: \{\} <br /> |
| `backendFramework` _string_ | BackendFramework specifies the backend framework (e.g., "sglang", "vllm", "trtllm"). | | Enum: [sglang vllm trtllm] <br /> | | `backendFramework` _string_ | BackendFramework specifies the backend framework (e.g., "sglang", "vllm", "trtllm"). | | Enum: [sglang vllm trtllm] <br /> |
| `restart` _[Restart](#restart)_ | Restart specifies the restart policy for the graph deployment. | | Optional: \{\} <br /> |
#### DynamoGraphDeploymentStatus #### DynamoGraphDeploymentStatus
...@@ -452,6 +453,7 @@ _Appears in:_ ...@@ -452,6 +453,7 @@ _Appears in:_
| `state` _string_ | State is a high-level textual status of the graph deployment lifecycle. | | | | `state` _string_ | State is a high-level textual status of the graph deployment lifecycle. | | |
| `conditions` _[Condition](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.28/#condition-v1-meta) array_ | Conditions contains the latest observed conditions of the graph deployment.<br />The slice is merged by type on patch updates. | | | | `conditions` _[Condition](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.28/#condition-v1-meta) array_ | Conditions contains the latest observed conditions of the graph deployment.<br />The slice is merged by type on patch updates. | | |
| `services` _object (keys:string, values:[ServiceReplicaStatus](#servicereplicastatus))_ | Services contains per-service replica status information.<br />The map key is the service name from spec.services. | | | | `services` _object (keys:string, values:[ServiceReplicaStatus](#servicereplicastatus))_ | Services contains per-service replica status information.<br />The map key is the service name from spec.services. | | |
| `restart` _[RestartStatus](#restartstatus)_ | Restart contains the status of the restart of the graph deployment. | | |
#### DynamoModel #### DynamoModel
...@@ -742,6 +744,94 @@ _Appears in:_ ...@@ -742,6 +744,94 @@ _Appears in:_
| `claims` _[ResourceClaim](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.28/#resourceclaim-v1-core) array_ | Claims specifies resource claims for dynamic resource allocation | | | | `claims` _[ResourceClaim](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.28/#resourceclaim-v1-core) array_ | Claims specifies resource claims for dynamic resource allocation | | |
#### Restart
_Appears in:_
- [DynamoGraphDeploymentSpec](#dynamographdeploymentspec)
| Field | Description | Default | Validation |
| --- | --- | --- | --- |
| `id` _string_ | ID is an arbitrary string that triggers a restart when changed.<br />Any modification to this value will initiate a restart of the graph deployment according to the strategy. | | MinLength: 1 <br />Required: \{\} <br /> |
| `strategy` _[RestartStrategy](#restartstrategy)_ | Strategy specifies the restart strategy for the graph deployment. | | Optional: \{\} <br /> |
#### RestartPhase
_Underlying type:_ _string_
_Appears in:_
- [RestartStatus](#restartstatus)
| Field | Description |
| --- | --- |
| `Pending` | |
| `Restarting` | |
| `Completed` | |
| `Failed` | |
#### RestartStatus
RestartStatus contains the status of the restart of the graph deployment.
_Appears in:_
- [DynamoGraphDeploymentStatus](#dynamographdeploymentstatus)
| Field | Description | Default | Validation |
| --- | --- | --- | --- |
| `observedID` _string_ | ObservedID is the restart ID that has been observed and is being processed.<br />Matches the Restart.ID field in the spec. | | |
| `phase` _[RestartPhase](#restartphase)_ | Phase is the phase of the restart. | | |
| `inProgress` _string array_ | InProgress contains the names of the services that are currently being restarted. | | |
#### RestartStrategy
_Appears in:_
- [Restart](#restart)
| Field | Description | Default | Validation |
| --- | --- | --- | --- |
| `type` _[RestartStrategyType](#restartstrategytype)_ | Type specifies the restart strategy type. | Sequential | Enum: [Sequential Parallel] <br /> |
| `order` _string array_ | Order specifies the order in which the services should be restarted. | | Optional: \{\} <br /> |
#### RestartStrategyType
_Underlying type:_ _string_
_Appears in:_
- [RestartStrategy](#restartstrategy)
| Field | Description |
| --- | --- |
| `Sequential` | |
| `Parallel` | |
#### ScalingAdapter #### ScalingAdapter
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment