Unverified Commit 97f79537 authored by Thomas Montfort's avatar Thomas Montfort Committed by GitHub
Browse files

feat(operator): managed rolling updates for DGD worker deployments (#6110)


Signed-off-by: default avatartmontfort <tmontfort@nvidia.com>
parent 0d9eb99d
...@@ -11171,8 +11171,19 @@ spec: ...@@ -11171,8 +11171,19 @@ spec:
- LeaderWorkerSet - LeaderWorkerSet
type: string type: string
componentName: componentName:
description: ComponentName is the name of the underlying resource. description: |-
ComponentName is the name of the primary underlying resource.
DEPRECATED: Use ComponentNames instead. This field will be removed in a future release.
During rolling updates, this reflects the new (target) component name.
type: string type: string
componentNames:
description: |-
ComponentNames is the list of underlying resource names for this service.
During normal operation, this contains a single name.
During rolling updates, this contains both old and new component names.
items:
type: string
type: array
readyReplicas: readyReplicas:
description: |- description: |-
ReadyReplicas is the number of ready replicas. ReadyReplicas is the number of ready replicas.
......
...@@ -11386,6 +11386,37 @@ spec: ...@@ -11386,6 +11386,37 @@ spec:
description: Phase is the phase of the restart. description: Phase is the phase of the restart.
type: string type: string
type: object type: object
rollingUpdate:
description: |-
RollingUpdate tracks the progress of operator manged rolling updates.
Currently only supported for singl-node, non-Grove deployments (DCD/Deployment).
properties:
endTime:
description: EndTime is when the rolling update completed (successfully or failed).
format: date-time
type: string
phase:
description: Phase indicates the current phase of the rolling update.
enum:
- Pending
- InProgress
- Completed
- Failed
- ""
type: string
startTime:
description: StartTime is when the rolling update began.
format: date-time
type: string
updatedServices:
description: |-
UpdatedServices is the list of services that have completed the rolling update.
A service is considered updated when its new replicas are all ready and old replicas are fully scaled down.
Only services of componentType Worker (or Prefill/Decode) are considered.
items:
type: string
type: array
type: object
services: services:
additionalProperties: additionalProperties:
description: ServiceReplicaStatus contains replica information for a single service. description: ServiceReplicaStatus contains replica information for a single service.
...@@ -11409,8 +11440,19 @@ spec: ...@@ -11409,8 +11440,19 @@ spec:
- LeaderWorkerSet - LeaderWorkerSet
type: string type: string
componentName: componentName:
description: ComponentName is the name of the underlying resource. description: |-
ComponentName is the name of the primary underlying resource.
DEPRECATED: Use ComponentNames instead. This field will be removed in a future release.
During rolling updates, this reflects the new (target) component name.
type: string type: string
componentNames:
description: |-
ComponentNames is the list of underlying resource names for this service.
During normal operation, this contains a single name.
During rolling updates, this contains both old and new component names.
items:
type: string
type: array
readyReplicas: readyReplicas:
description: |- description: |-
ReadyReplicas is the number of ready replicas. ReadyReplicas is the number of ready replicas.
......
...@@ -109,7 +109,6 @@ type DynamoGraphDeploymentStatus struct { ...@@ -109,7 +109,6 @@ type DynamoGraphDeploymentStatus struct {
// The map key is the service name from spec.services. // The map key is the service name from spec.services.
// +optional // +optional
Services map[string]ServiceReplicaStatus `json:"services,omitempty"` Services map[string]ServiceReplicaStatus `json:"services,omitempty"`
// Restart contains the status of the restart of the graph deployment. // Restart contains the status of the restart of the graph deployment.
// +optional // +optional
Restart *RestartStatus `json:"restart,omitempty"` Restart *RestartStatus `json:"restart,omitempty"`
...@@ -117,6 +116,10 @@ type DynamoGraphDeploymentStatus struct { ...@@ -117,6 +116,10 @@ type DynamoGraphDeploymentStatus struct {
// The map key is the service name from spec.services. // The map key is the service name from spec.services.
// +optional // +optional
Checkpoints map[string]ServiceCheckpointStatus `json:"checkpoints,omitempty"` Checkpoints map[string]ServiceCheckpointStatus `json:"checkpoints,omitempty"`
// RollingUpdate tracks the progress of operator manged rolling updates.
// Currently only supported for singl-node, non-Grove deployments (DCD/Deployment).
// +optional
RollingUpdate *RollingUpdateStatus `json:"rollingUpdate,omitempty"`
} }
// ServiceCheckpointStatus contains checkpoint information for a single service. // ServiceCheckpointStatus contains checkpoint information for a single service.
...@@ -151,15 +154,58 @@ const ( ...@@ -151,15 +154,58 @@ const (
RestartPhaseRestarting RestartPhase = "Restarting" RestartPhaseRestarting RestartPhase = "Restarting"
RestartPhaseCompleted RestartPhase = "Completed" RestartPhaseCompleted RestartPhase = "Completed"
RestartPhaseFailed RestartPhase = "Failed" RestartPhaseFailed RestartPhase = "Failed"
RestartPhaseSuperseded RestartPhase = "Superseded"
) )
// RollingUpdatePhase represents the current phase of a rolling update.
// +kubebuilder:validation:Enum=Pending;InProgress;Completed;Failed;""
type RollingUpdatePhase string
const (
RollingUpdatePhasePending RollingUpdatePhase = "Pending"
RollingUpdatePhaseInProgress RollingUpdatePhase = "InProgress"
RollingUpdatePhaseCompleted RollingUpdatePhase = "Completed"
RollingUpdatePhaseNone RollingUpdatePhase = ""
)
// RollingUpdateStatus tracks the progress of a rolling update.
type RollingUpdateStatus struct {
// Phase indicates the current phase of the rolling update.
// +optional
Phase RollingUpdatePhase `json:"phase,omitempty"`
// StartTime is when the rolling update began.
// +optional
StartTime *metav1.Time `json:"startTime,omitempty"`
// EndTime is when the rolling update completed (successfully or failed).
// +optional
EndTime *metav1.Time `json:"endTime,omitempty"`
// UpdatedServices is the list of services that have completed the rolling update.
// A service is considered updated when its new replicas are all ready and old replicas are fully scaled down.
// Only services of componentType Worker (or Prefill/Decode) are considered.
// +optional
UpdatedServices []string `json:"updatedServices,omitempty"`
}
// ServiceReplicaStatus contains replica information for a single service. // ServiceReplicaStatus contains replica information for a single service.
type ServiceReplicaStatus struct { type ServiceReplicaStatus struct {
// ComponentKind is the underlying resource kind (e.g., "PodClique", "PodCliqueScalingGroup", "Deployment", "LeaderWorkerSet"). // ComponentKind is the underlying resource kind (e.g., "PodClique", "PodCliqueScalingGroup", "Deployment", "LeaderWorkerSet").
ComponentKind ComponentKind `json:"componentKind"` ComponentKind ComponentKind `json:"componentKind"`
// ComponentName is the name of the underlying resource.
// ComponentName is the name of the primary underlying resource.
// DEPRECATED: Use ComponentNames instead. This field will be removed in a future release.
// During rolling updates, this reflects the new (target) component name.
// +kubebuilder:deprecatedversion:warning="ComponentName is deprecated, view ComponentNames instead"
ComponentName string `json:"componentName"` ComponentName string `json:"componentName"`
// ComponentNames is the list of underlying resource names for this service.
// During normal operation, this contains a single name.
// During rolling updates, this contains both old and new component names.
// +optional
ComponentNames []string `json:"componentNames,omitempty"`
// Replicas is the total number of non-terminated replicas. // Replicas is the total number of non-terminated replicas.
// Required for all component kinds. // Required for all component kinds.
// +kubebuilder:validation:Minimum=0 // +kubebuilder:validation:Minimum=0
...@@ -264,11 +310,6 @@ func (s *DynamoGraphDeployment) HasAnyMultinodeService() bool { ...@@ -264,11 +310,6 @@ func (s *DynamoGraphDeployment) HasAnyMultinodeService() bool {
return false return false
} }
// GetDynamoNamespaceForService returns the Dynamo namespace for a given service.
func (s *DynamoGraphDeployment) GetDynamoNamespaceForService(service *DynamoComponentDeploymentSharedSpec) string {
return ComputeDynamoNamespace(service.GlobalDynamoNamespace, s.GetNamespace(), s.GetName())
}
// HasEPPService returns true if any service in the DGD has EPP component type // HasEPPService returns true if any service in the DGD has EPP component type
func (dgd *DynamoGraphDeployment) HasEPPService() bool { func (dgd *DynamoGraphDeployment) HasEPPService() bool {
for _, component := range dgd.Spec.Services { for _, component := range dgd.Spec.Services {
...@@ -279,6 +320,11 @@ func (dgd *DynamoGraphDeployment) HasEPPService() bool { ...@@ -279,6 +320,11 @@ func (dgd *DynamoGraphDeployment) HasEPPService() bool {
return false return false
} }
// GetDynamoNamespaceForService returns the Dynamo namespace for a given service.
func (s *DynamoGraphDeployment) GetDynamoNamespaceForService(service *DynamoComponentDeploymentSharedSpec) string {
return ComputeDynamoNamespace(service.GlobalDynamoNamespace, s.GetNamespace(), s.GetName())
}
// GetEPPService returns the EPP service name and spec if present // GetEPPService returns the EPP service name and spec if present
func (dgd *DynamoGraphDeployment) GetEPPService() (string, *DynamoComponentDeploymentSharedSpec, bool) { func (dgd *DynamoGraphDeployment) GetEPPService() (string, *DynamoComponentDeploymentSharedSpec, bool) {
for serviceName, component := range dgd.Spec.Services { for serviceName, component := range dgd.Spec.Services {
......
...@@ -968,6 +968,11 @@ func (in *DynamoGraphDeploymentStatus) DeepCopyInto(out *DynamoGraphDeploymentSt ...@@ -968,6 +968,11 @@ func (in *DynamoGraphDeploymentStatus) DeepCopyInto(out *DynamoGraphDeploymentSt
(*out)[key] = val (*out)[key] = val
} }
} }
if in.RollingUpdate != nil {
in, out := &in.RollingUpdate, &out.RollingUpdate
*out = new(RollingUpdateStatus)
(*in).DeepCopyInto(*out)
}
} }
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DynamoGraphDeploymentStatus. // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DynamoGraphDeploymentStatus.
...@@ -1491,6 +1496,34 @@ func (in *RestartStrategy) DeepCopy() *RestartStrategy { ...@@ -1491,6 +1496,34 @@ func (in *RestartStrategy) DeepCopy() *RestartStrategy {
return out return out
} }
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *RollingUpdateStatus) DeepCopyInto(out *RollingUpdateStatus) {
*out = *in
if in.StartTime != nil {
in, out := &in.StartTime, &out.StartTime
*out = (*in).DeepCopy()
}
if in.EndTime != nil {
in, out := &in.EndTime, &out.EndTime
*out = (*in).DeepCopy()
}
if in.UpdatedServices != nil {
in, out := &in.UpdatedServices, &out.UpdatedServices
*out = make([]string, len(*in))
copy(*out, *in)
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RollingUpdateStatus.
func (in *RollingUpdateStatus) DeepCopy() *RollingUpdateStatus {
if in == nil {
return nil
}
out := new(RollingUpdateStatus)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ScalingAdapter) DeepCopyInto(out *ScalingAdapter) { func (in *ScalingAdapter) DeepCopyInto(out *ScalingAdapter) {
*out = *in *out = *in
...@@ -1549,6 +1582,11 @@ func (in *ServiceCheckpointStatus) DeepCopy() *ServiceCheckpointStatus { ...@@ -1549,6 +1582,11 @@ func (in *ServiceCheckpointStatus) DeepCopy() *ServiceCheckpointStatus {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ServiceReplicaStatus) DeepCopyInto(out *ServiceReplicaStatus) { func (in *ServiceReplicaStatus) DeepCopyInto(out *ServiceReplicaStatus) {
*out = *in *out = *in
if in.ComponentNames != nil {
in, out := &in.ComponentNames, &out.ComponentNames
*out = make([]string, len(*in))
copy(*out, *in)
}
if in.ReadyReplicas != nil { if in.ReadyReplicas != nil {
in, out := &in.ReadyReplicas, &out.ReadyReplicas in, out := &in.ReadyReplicas, &out.ReadyReplicas
*out = new(int32) *out = new(int32)
......
...@@ -11171,8 +11171,19 @@ spec: ...@@ -11171,8 +11171,19 @@ spec:
- LeaderWorkerSet - LeaderWorkerSet
type: string type: string
componentName: componentName:
description: ComponentName is the name of the underlying resource. description: |-
ComponentName is the name of the primary underlying resource.
DEPRECATED: Use ComponentNames instead. This field will be removed in a future release.
During rolling updates, this reflects the new (target) component name.
type: string type: string
componentNames:
description: |-
ComponentNames is the list of underlying resource names for this service.
During normal operation, this contains a single name.
During rolling updates, this contains both old and new component names.
items:
type: string
type: array
readyReplicas: readyReplicas:
description: |- description: |-
ReadyReplicas is the number of ready replicas. ReadyReplicas is the number of ready replicas.
......
...@@ -11386,6 +11386,37 @@ spec: ...@@ -11386,6 +11386,37 @@ spec:
description: Phase is the phase of the restart. description: Phase is the phase of the restart.
type: string type: string
type: object type: object
rollingUpdate:
description: |-
RollingUpdate tracks the progress of operator manged rolling updates.
Currently only supported for singl-node, non-Grove deployments (DCD/Deployment).
properties:
endTime:
description: EndTime is when the rolling update completed (successfully or failed).
format: date-time
type: string
phase:
description: Phase indicates the current phase of the rolling update.
enum:
- Pending
- InProgress
- Completed
- Failed
- ""
type: string
startTime:
description: StartTime is when the rolling update began.
format: date-time
type: string
updatedServices:
description: |-
UpdatedServices is the list of services that have completed the rolling update.
A service is considered updated when its new replicas are all ready and old replicas are fully scaled down.
Only services of componentType Worker (or Prefill/Decode) are considered.
items:
type: string
type: array
type: object
services: services:
additionalProperties: additionalProperties:
description: ServiceReplicaStatus contains replica information for a single service. description: ServiceReplicaStatus contains replica information for a single service.
...@@ -11409,8 +11440,19 @@ spec: ...@@ -11409,8 +11440,19 @@ spec:
- LeaderWorkerSet - LeaderWorkerSet
type: string type: string
componentName: componentName:
description: ComponentName is the name of the underlying resource. description: |-
ComponentName is the name of the primary underlying resource.
DEPRECATED: Use ComponentNames instead. This field will be removed in a future release.
During rolling updates, this reflects the new (target) component name.
type: string type: string
componentNames:
description: |-
ComponentNames is the list of underlying resource names for this service.
During normal operation, this contains a single name.
During rolling updates, this contains both old and new component names.
items:
type: string
type: array
readyReplicas: readyReplicas:
description: |- description: |-
ReadyReplicas is the number of ready replicas. ReadyReplicas is the number of ready replicas.
......
...@@ -54,6 +54,7 @@ const ( ...@@ -54,6 +54,7 @@ const (
KubeAnnotationDynamoBaseModel = "nvidia.com/dynamo-base-model" KubeAnnotationDynamoBaseModel = "nvidia.com/dynamo-base-model"
KubeLabelDynamoDiscoveryBackend = "nvidia.com/dynamo-discovery-backend" KubeLabelDynamoDiscoveryBackend = "nvidia.com/dynamo-discovery-backend"
KubeLabelDynamoDiscoveryEnabled = "nvidia.com/dynamo-discovery-enabled" KubeLabelDynamoDiscoveryEnabled = "nvidia.com/dynamo-discovery-enabled"
KubeLabelDynamoWorkerHash = "nvidia.com/dynamo-worker-hash"
KubeLabelValueFalse = "false" KubeLabelValueFalse = "false"
KubeLabelValueTrue = "true" KubeLabelValueTrue = "true"
...@@ -62,10 +63,12 @@ const ( ...@@ -62,10 +63,12 @@ const (
KubeResourceGPUNvidia = "nvidia.com/gpu" KubeResourceGPUNvidia = "nvidia.com/gpu"
DynamoDeploymentConfigEnvVar = "DYN_DEPLOYMENT_CONFIG" DynamoDeploymentConfigEnvVar = "DYN_DEPLOYMENT_CONFIG"
DynamoNamespaceEnvVar = "DYN_NAMESPACE" DynamoNamespaceEnvVar = "DYN_NAMESPACE"
DynamoComponentEnvVar = "DYN_COMPONENT" DynamoNamespacePrefixEnvVar = "DYN_NAMESPACE_PREFIX"
DynamoDiscoveryBackendEnvVar = "DYN_DISCOVERY_BACKEND" DynamoNamespaceWorkerSuffixEnvVar = "DYN_NAMESPACE_WORKER_SUFFIX"
DynamoComponentEnvVar = "DYN_COMPONENT"
DynamoDiscoveryBackendEnvVar = "DYN_DISCOVERY_BACKEND"
GlobalDynamoNamespace = "dynamo" GlobalDynamoNamespace = "dynamo"
...@@ -177,6 +180,14 @@ const ( ...@@ -177,6 +180,14 @@ const (
AnnotationDynParentDGDName = "nvidia.com/dyn-parent-dgd-name" AnnotationDynParentDGDName = "nvidia.com/dyn-parent-dgd-name"
AnnotationDynParentDGDNS = "nvidia.com/dyn-parent-dgd-namespace" AnnotationDynParentDGDNS = "nvidia.com/dyn-parent-dgd-namespace"
AnnotationDynDiscoveryBackend = "nvidia.com/dyn-discovery-backend" AnnotationDynDiscoveryBackend = "nvidia.com/dyn-discovery-backend"
// Rolling update annotations
AnnotationCurrentWorkerHash = "nvidia.com/current-worker-hash"
// LegacyWorkerHash is a sentinel value used during migration from pre-rolling-update
// operator versions. Legacy worker DCDs (those without a worker hash label) are tagged
// with this value so the existing rolling update machinery can manage the transition.
LegacyWorkerHash = "legacy"
) )
type MultinodeDeploymentType string type MultinodeDeploymentType string
......
...@@ -24,6 +24,7 @@ import ( ...@@ -24,6 +24,7 @@ import (
"fmt" "fmt"
"maps" "maps"
"os" "os"
"slices"
"time" "time"
appsv1 "k8s.io/api/apps/v1" appsv1 "k8s.io/api/apps/v1"
...@@ -321,6 +322,7 @@ func (r *DynamoComponentDeploymentReconciler) reconcileDeploymentResources(ctx c ...@@ -321,6 +322,7 @@ func (r *DynamoComponentDeploymentReconciler) reconcileDeploymentResources(ctx c
serviceReplicaStatus := &v1alpha1.ServiceReplicaStatus{ serviceReplicaStatus := &v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindDeployment, ComponentKind: v1alpha1.ComponentKindDeployment,
ComponentName: deployment.Name, ComponentName: deployment.Name,
ComponentNames: []string{deployment.Name},
Replicas: deployment.Status.Replicas, Replicas: deployment.Status.Replicas,
UpdatedReplicas: deployment.Status.UpdatedReplicas, UpdatedReplicas: deployment.Status.UpdatedReplicas,
ReadyReplicas: &deployment.Status.ReadyReplicas, ReadyReplicas: &deployment.Status.ReadyReplicas,
...@@ -498,6 +500,7 @@ func getLeaderWorkerSetReplicasStatus(leaderWorkerSet *leaderworkersetv1.LeaderW ...@@ -498,6 +500,7 @@ func getLeaderWorkerSetReplicasStatus(leaderWorkerSet *leaderworkersetv1.LeaderW
return v1alpha1.ServiceReplicaStatus{ return v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindLeaderWorkerSet, ComponentKind: v1alpha1.ComponentKindLeaderWorkerSet,
ComponentName: leaderWorkerSet.Name, ComponentName: leaderWorkerSet.Name,
ComponentNames: []string{leaderWorkerSet.Name},
Replicas: leaderWorkerSet.Status.Replicas, Replicas: leaderWorkerSet.Status.Replicas,
UpdatedReplicas: leaderWorkerSet.Status.UpdatedReplicas, UpdatedReplicas: leaderWorkerSet.Status.UpdatedReplicas,
ReadyReplicas: &leaderWorkerSet.Status.ReadyReplicas, ReadyReplicas: &leaderWorkerSet.Status.ReadyReplicas,
...@@ -514,14 +517,18 @@ func combineLWSReplicaStatuses(serviceReplicaStatuses []v1alpha1.ServiceReplicaS ...@@ -514,14 +517,18 @@ func combineLWSReplicaStatuses(serviceReplicaStatuses []v1alpha1.ServiceReplicaS
if firstServiceStatus.ReadyReplicas != nil { if firstServiceStatus.ReadyReplicas != nil {
readyReplicas = *firstServiceStatus.ReadyReplicas readyReplicas = *firstServiceStatus.ReadyReplicas
} }
allNames := append([]string{}, firstServiceStatus.ComponentNames...)
for _, serviceReplicaStatus := range serviceReplicaStatuses[1:] { for _, serviceReplicaStatus := range serviceReplicaStatuses[1:] {
firstServiceStatus.Replicas += serviceReplicaStatus.Replicas firstServiceStatus.Replicas += serviceReplicaStatus.Replicas
firstServiceStatus.UpdatedReplicas += serviceReplicaStatus.UpdatedReplicas firstServiceStatus.UpdatedReplicas += serviceReplicaStatus.UpdatedReplicas
if serviceReplicaStatus.ReadyReplicas != nil { if serviceReplicaStatus.ReadyReplicas != nil {
readyReplicas += *serviceReplicaStatus.ReadyReplicas readyReplicas += *serviceReplicaStatus.ReadyReplicas
} }
allNames = append(allNames, serviceReplicaStatus.ComponentNames...)
} }
slices.Sort(allNames)
firstServiceStatus.ComponentNames = allNames
firstServiceStatus.ReadyReplicas = &readyReplicas firstServiceStatus.ReadyReplicas = &readyReplicas
return &firstServiceStatus return &firstServiceStatus
} }
......
...@@ -634,7 +634,7 @@ func TestDynamoComponentDeploymentReconciler_generateLeaderWorkerSet(t *testing. ...@@ -634,7 +634,7 @@ func TestDynamoComponentDeploymentReconciler_generateLeaderWorkerSet(t *testing.
ComponentType: string(commonconsts.ComponentTypeWorker), ComponentType: string(commonconsts.ComponentTypeWorker),
SubComponentType: "test-sub-component", SubComponentType: "test-sub-component",
ServiceName: "test-lws-deploy-service", ServiceName: "test-lws-deploy-service",
DynamoNamespace: &[]string{"default"}[0], DynamoNamespace: &[]string{"default-test-lws-deploy"}[0],
Multinode: &v1alpha1.MultinodeSpec{ Multinode: &v1alpha1.MultinodeSpec{
NodeCount: 2, NodeCount: 2,
}, },
...@@ -1360,6 +1360,7 @@ func Test_reconcileLeaderWorkerSetResources(t *testing.T) { ...@@ -1360,6 +1360,7 @@ func Test_reconcileLeaderWorkerSetResources(t *testing.T) {
serviceReplicaStatus: &v1alpha1.ServiceReplicaStatus{ serviceReplicaStatus: &v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindLeaderWorkerSet, ComponentKind: v1alpha1.ComponentKindLeaderWorkerSet,
ComponentName: "test-component-0", ComponentName: "test-component-0",
ComponentNames: []string{"test-component-0"},
ReadyReplicas: ptr.To(int32(1)), ReadyReplicas: ptr.To(int32(1)),
UpdatedReplicas: 1, UpdatedReplicas: 1,
Replicas: 1, Replicas: 1,
...@@ -1439,6 +1440,7 @@ func Test_reconcileLeaderWorkerSetResources(t *testing.T) { ...@@ -1439,6 +1440,7 @@ func Test_reconcileLeaderWorkerSetResources(t *testing.T) {
serviceReplicaStatus: &v1alpha1.ServiceReplicaStatus{ serviceReplicaStatus: &v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindLeaderWorkerSet, ComponentKind: v1alpha1.ComponentKindLeaderWorkerSet,
ComponentName: "test-component-0", ComponentName: "test-component-0",
ComponentNames: []string{"test-component-0", "test-component-1", "test-component-2"},
ReadyReplicas: ptr.To(int32(2)), ReadyReplicas: ptr.To(int32(2)),
UpdatedReplicas: 2, UpdatedReplicas: 2,
Replicas: 3, Replicas: 3,
...@@ -1518,6 +1520,7 @@ func Test_reconcileLeaderWorkerSetResources(t *testing.T) { ...@@ -1518,6 +1520,7 @@ func Test_reconcileLeaderWorkerSetResources(t *testing.T) {
serviceReplicaStatus: &v1alpha1.ServiceReplicaStatus{ serviceReplicaStatus: &v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindLeaderWorkerSet, ComponentKind: v1alpha1.ComponentKindLeaderWorkerSet,
ComponentName: "test-component-0", ComponentName: "test-component-0",
ComponentNames: []string{"test-component-0", "test-component-1", "test-component-2"},
ReadyReplicas: ptr.To(int32(3)), ReadyReplicas: ptr.To(int32(3)),
UpdatedReplicas: 3, UpdatedReplicas: 3,
Replicas: 3, Replicas: 3,
...@@ -1662,6 +1665,7 @@ func Test_reconcileDeploymentResources(t *testing.T) { ...@@ -1662,6 +1665,7 @@ func Test_reconcileDeploymentResources(t *testing.T) {
serviceReplicaStatus: &v1alpha1.ServiceReplicaStatus{ serviceReplicaStatus: &v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindDeployment, ComponentKind: v1alpha1.ComponentKindDeployment,
ComponentName: "test-component", ComponentName: "test-component",
ComponentNames: []string{"test-component"},
Replicas: 2, Replicas: 2,
UpdatedReplicas: 2, UpdatedReplicas: 2,
ReadyReplicas: ptr.To(int32(2)), ReadyReplicas: ptr.To(int32(2)),
...@@ -1703,6 +1707,7 @@ func Test_reconcileDeploymentResources(t *testing.T) { ...@@ -1703,6 +1707,7 @@ func Test_reconcileDeploymentResources(t *testing.T) {
serviceReplicaStatus: &v1alpha1.ServiceReplicaStatus{ serviceReplicaStatus: &v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindDeployment, ComponentKind: v1alpha1.ComponentKindDeployment,
ComponentName: "test-component", ComponentName: "test-component",
ComponentNames: []string{"test-component"},
Replicas: 1, Replicas: 1,
UpdatedReplicas: 1, UpdatedReplicas: 1,
ReadyReplicas: ptr.To(int32(1)), ReadyReplicas: ptr.To(int32(1)),
......
...@@ -187,6 +187,40 @@ func (r *DynamoGraphDeploymentReconciler) Reconcile(ctx context.Context, req ctr ...@@ -187,6 +187,40 @@ func (r *DynamoGraphDeploymentReconciler) Reconcile(ctx context.Context, req ctr
} }
} }
if r.supportsManagedRollingUpdate(dynamoDeployment) {
if err = r.initializeWorkerHashIfNeeded(ctx, dynamoDeployment); err != nil {
logger.Error(err, "Failed to initialize worker hash")
reason = "failed_to_initialize_worker_hash"
return ctrl.Result{}, err
}
if r.isRollingUpdateInProgress(dynamoDeployment) || r.shouldTriggerRollingUpdate(dynamoDeployment) {
if err = r.reconcileRollingUpdate(ctx, dynamoDeployment); err != nil {
logger.Error(err, "Failed to reconcile rolling update")
state = DGDStateFailed
reason = Reason("RollingUpdateFailed")
message = Message(err.Error())
return ctrl.Result{}, err
}
}
} else {
// For unsupported pathways, log if a rolling update would have been triggered
if r.shouldTriggerRollingUpdate(dynamoDeployment) {
logger.Info("Worker spec change detected but rolling update not supported for this pathway",
"isGrove", r.isGrovePathway(dynamoDeployment),
"hasMultinode", dynamoDeployment.HasAnyMultinodeService())
r.Recorder.Event(dynamoDeployment, corev1.EventTypeWarning, "RollingUpdateNotSupported",
"Worker spec changed but custom rolling updates are not supported for Grove/multinode deployments")
// Update the hash to prevent repeated warnings
hash := dynamo.ComputeDGDWorkersSpecHash(dynamoDeployment)
r.setCurrentWorkerHash(dynamoDeployment, hash)
if updateErr := r.Update(ctx, dynamoDeployment); updateErr != nil {
logger.Error(updateErr, "Failed to update worker hash for unsupported pathway")
}
}
}
reconcileResult, err := r.reconcileResources(ctx, dynamoDeployment) reconcileResult, err := r.reconcileResources(ctx, dynamoDeployment)
state = reconcileResult.State state = reconcileResult.State
...@@ -201,6 +235,21 @@ func (r *DynamoGraphDeploymentReconciler) Reconcile(ctx context.Context, req ctr ...@@ -201,6 +235,21 @@ func (r *DynamoGraphDeploymentReconciler) Reconcile(ctx context.Context, req ctr
return ctrl.Result{}, err return ctrl.Result{}, err
} }
// Override state based on rolling update status if a rolling update is in progress
if dynamoDeployment.Status.RollingUpdate != nil {
switch dynamoDeployment.Status.RollingUpdate.Phase {
case nvidiacomv1alpha1.RollingUpdatePhaseCompleted:
// Keep the reconcileResult state (should be Ready if resources are ready)
case nvidiacomv1alpha1.RollingUpdatePhasePending, nvidiacomv1alpha1.RollingUpdatePhaseInProgress:
// Rolling update in progress - resources are being transitioned
if state != DGDStateFailed {
state = DGDStatePending
reason = "rolling_update_in_progress"
message = "Rolling update in progress"
}
}
}
return ctrl.Result{}, nil return ctrl.Result{}, nil
} }
...@@ -403,7 +452,8 @@ func isRestartAlreadyProcessed(dgd *nvidiacomv1alpha1.DynamoGraphDeployment) boo ...@@ -403,7 +452,8 @@ func isRestartAlreadyProcessed(dgd *nvidiacomv1alpha1.DynamoGraphDeployment) boo
if dgd.Spec.Restart.ID == dgd.Status.Restart.ObservedID && if dgd.Spec.Restart.ID == dgd.Status.Restart.ObservedID &&
(dgd.Status.Restart.Phase == nvidiacomv1alpha1.RestartPhaseCompleted || (dgd.Status.Restart.Phase == nvidiacomv1alpha1.RestartPhaseCompleted ||
dgd.Status.Restart.Phase == nvidiacomv1alpha1.RestartPhaseFailed) { dgd.Status.Restart.Phase == nvidiacomv1alpha1.RestartPhaseFailed ||
dgd.Status.Restart.Phase == nvidiacomv1alpha1.RestartPhaseSuperseded) {
return true return true
} }
...@@ -610,7 +660,7 @@ func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Co ...@@ -610,7 +660,7 @@ func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Co
if component.Ingress != nil { if component.Ingress != nil {
ingressSpec = *component.Ingress ingressSpec = *component.Ingress
} }
mainComponentIngress := dynamo.GenerateComponentIngress(ctx, dynamo.GetDynamoComponentName(dynamoDeployment, componentName), dynamoDeployment.Namespace, ingressSpec) mainComponentIngress := dynamo.GenerateComponentIngress(ctx, dynamo.GetDCDResourceName(dynamoDeployment, componentName, ""), dynamoDeployment.Namespace, ingressSpec)
_, syncedMainComponentIngress, err := commoncontroller.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*networkingv1.Ingress, bool, error) { _, syncedMainComponentIngress, err := commoncontroller.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*networkingv1.Ingress, bool, error) {
if !ingressSpec.Enabled || ingressSpec.IngressControllerClassName == nil { if !ingressSpec.Enabled || ingressSpec.IngressControllerClassName == nil {
logger.Info("Ingress is not enabled") logger.Info("Ingress is not enabled")
...@@ -634,7 +684,7 @@ func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Co ...@@ -634,7 +684,7 @@ func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Co
} }
// generate the main component virtual service // generate the main component virtual service
if r.Config.IngressConfig.UseVirtualService() { if r.Config.IngressConfig.UseVirtualService() {
mainComponentVirtualService := dynamo.GenerateComponentVirtualService(ctx, dynamo.GetDynamoComponentName(dynamoDeployment, componentName), dynamoDeployment.Namespace, ingressSpec) mainComponentVirtualService := dynamo.GenerateComponentVirtualService(ctx, dynamo.GetDCDResourceName(dynamoDeployment, componentName, ""), dynamoDeployment.Namespace, ingressSpec)
_, syncedMainComponentVirtualService, err := commoncontroller.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*networkingv1beta1.VirtualService, bool, error) { _, syncedMainComponentVirtualService, err := commoncontroller.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*networkingv1beta1.VirtualService, bool, error) {
if !ingressSpec.IsVirtualServiceEnabled() { if !ingressSpec.IsVirtualServiceEnabled() {
logger.Info("VirtualService is not enabled") logger.Info("VirtualService is not enabled")
...@@ -826,17 +876,27 @@ func (r *DynamoGraphDeploymentReconciler) computeRestartStatus(ctx context.Conte ...@@ -826,17 +876,27 @@ func (r *DynamoGraphDeploymentReconciler) computeRestartStatus(ctx context.Conte
// No restart requested // No restart requested
if dgd.Spec.Restart == nil || dgd.Spec.Restart.ID == "" { if dgd.Spec.Restart == nil || dgd.Spec.Restart.ID == "" {
// Preserve existing terminal status // Preserve existing terminal status
if dgd.Status.Restart != nil && (dgd.Status.Restart.Phase == nvidiacomv1alpha1.RestartPhaseCompleted || dgd.Status.Restart.Phase == nvidiacomv1alpha1.RestartPhaseFailed) { if dgd.Status.Restart != nil && (dgd.Status.Restart.Phase == nvidiacomv1alpha1.RestartPhaseCompleted || dgd.Status.Restart.Phase == nvidiacomv1alpha1.RestartPhaseFailed || dgd.Status.Restart.Phase == nvidiacomv1alpha1.RestartPhaseSuperseded) {
return dgd.Status.Restart return dgd.Status.Restart
} }
return nil return nil
} }
// If restart was already processed (completed or failed), return existing status // If restart was already processed (completed, failed, or superseded), return existing status
if isRestartAlreadyProcessed(dgd) { if isRestartAlreadyProcessed(dgd) {
return dgd.Status.Restart return dgd.Status.Restart
} }
// Supersede restart if a rolling update is in progress
if r.isRollingUpdateInProgress(dgd) {
r.Recorder.Eventf(dgd, corev1.EventTypeWarning, "RestartSuperseded",
"Restart %s superseded by rolling update", dgd.Spec.Restart.ID)
return &nvidiacomv1alpha1.RestartStatus{
ObservedID: dgd.Spec.Restart.ID,
Phase: nvidiacomv1alpha1.RestartPhaseSuperseded,
}
}
order := dynamo.GetRestartOrder(dgd) order := dynamo.GetRestartOrder(dgd)
if dynamo.IsParallelRestart(dgd) { if dynamo.IsParallelRestart(dgd) {
...@@ -848,7 +908,7 @@ func (r *DynamoGraphDeploymentReconciler) computeRestartStatus(ctx context.Conte ...@@ -848,7 +908,7 @@ func (r *DynamoGraphDeploymentReconciler) computeRestartStatus(ctx context.Conte
// checkComponentServiceFullyUpdated checks if a DynamoComponentDeployment is fully updated. // checkComponentServiceFullyUpdated checks if a DynamoComponentDeployment is fully updated.
func (r *DynamoGraphDeploymentReconciler) checkComponentServiceFullyUpdated(ctx context.Context, dgd *nvidiacomv1alpha1.DynamoGraphDeployment, serviceName string) (bool, string) { func (r *DynamoGraphDeploymentReconciler) checkComponentServiceFullyUpdated(ctx context.Context, dgd *nvidiacomv1alpha1.DynamoGraphDeployment, serviceName string) (bool, string) {
resourceName := dynamo.GetDynamoComponentName(dgd, serviceName) resourceName := dynamo.GetDCDResourceName(dgd, serviceName, r.getCurrentWorkerHash(dgd))
return checkDCDReady(ctx, r.Client, resourceName, dgd.Namespace) return checkDCDReady(ctx, r.Client, resourceName, dgd.Namespace)
} }
...@@ -961,44 +1021,79 @@ func (r *DynamoGraphDeploymentReconciler) reconcileDynamoComponentsDeployments(c ...@@ -961,44 +1021,79 @@ func (r *DynamoGraphDeploymentReconciler) reconcileDynamoComponentsDeployments(c
resources := []Resource{} resources := []Resource{}
logger := log.FromContext(ctx) logger := log.FromContext(ctx)
defaultIngressSpec := dynamo.GenerateDefaultIngressSpec(dynamoDeployment, r.Config.IngressConfig)
rollingUpdateCtx := r.buildRollingUpdateContext(ctx, dynamoDeployment)
existingRestartAnnotations, err := r.getExistingRestartAnnotationsDCD(ctx, dynamoDeployment) existingRestartAnnotations, err := r.getExistingRestartAnnotationsDCD(ctx, dynamoDeployment)
if err != nil { if err != nil {
logger.Error(err, "failed to get existing restart annotations") logger.Error(err, "failed to get existing restart annotations")
return ReconcileResult{}, fmt.Errorf("failed to get existing restart annotations: %w", err) return ReconcileResult{}, fmt.Errorf("failed to get existing restart annotations: %w", err)
} }
if rollingUpdateCtx.InProgress() {
logger.Info("Rolling update in progress",
"newWorkerHash", rollingUpdateCtx.NewWorkerHash,
"oldWorkerReplicas", rollingUpdateCtx.OldWorkerReplicas)
}
// generate the dynamoComponentsDeployments from the config // Generate all DCDs (handles both normal and rolling update cases)
defaultIngressSpec := dynamo.GenerateDefaultIngressSpec(dynamoDeployment, r.Config.IngressConfig) dynamoComponentsDeployments, err := dynamo.GenerateDynamoComponentsDeployments(
dynamoComponentsDeployments, err := dynamo.GenerateDynamoComponentsDeployments(ctx, dynamoDeployment, &defaultIngressSpec, restartState, existingRestartAnnotations) ctx, dynamoDeployment, &defaultIngressSpec, restartState, existingRestartAnnotations, rollingUpdateCtx,
)
if err != nil { if err != nil {
logger.Error(err, "failed to generate the DynamoComponentsDeployments") logger.Error(err, "failed to generate the DynamoComponentsDeployments")
return ReconcileResult{}, fmt.Errorf("failed to generate the DynamoComponentsDeployments: %w", err) return ReconcileResult{}, fmt.Errorf("failed to generate the DynamoComponentsDeployments: %w", err)
} }
// reconcile the dynamoComponentsDeployments // Sync all generated DCDs
for serviceName, dynamoComponentDeployment := range dynamoComponentsDeployments { for key, dcd := range dynamoComponentsDeployments {
logger.Info("Reconciling the DynamoComponentDeployment", "serviceName", serviceName, "dynamoComponentDeployment", dynamoComponentDeployment) logger.Info("Reconciling DynamoComponentDeployment", "key", key, "name", dcd.Name)
_, dynamoComponentDeployment, err = commoncontroller.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*nvidiacomv1alpha1.DynamoComponentDeployment, bool, error) { _, syncedDCD, err := commoncontroller.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*nvidiacomv1alpha1.DynamoComponentDeployment, bool, error) {
return dynamoComponentDeployment, false, nil return dcd, false, nil
}) })
if err != nil { if err != nil {
logger.Error(err, "failed to sync the DynamoComponentDeployment") logger.Error(err, "failed to sync the DynamoComponentDeployment", "name", dcd.Name)
return ReconcileResult{}, fmt.Errorf("failed to sync the DynamoComponentDeployment: %w", err) return ReconcileResult{}, fmt.Errorf("failed to sync the DynamoComponentDeployment: %w", err)
} }
resources = append(resources, dynamoComponentDeployment) resources = append(resources, syncedDCD)
}
// During rolling update, scale old worker DCDs via direct patching.
// This is done separately from DCD generation to avoid overwriting the old spec
// with the new spec (which would trigger an unwanted rolling update on old workers).
if rollingUpdateCtx.InProgress() {
if err := r.scaleOldWorkerDCDs(ctx, dynamoDeployment, rollingUpdateCtx); err != nil {
logger.Error(err, "failed to scale old worker DCDs")
return ReconcileResult{}, fmt.Errorf("failed to scale old worker DCDs: %w", err)
}
} }
// Check resource readiness // Check resource readiness
result := r.checkResourcesReadiness(resources) result := r.checkResourcesReadiness(resources)
// During rolling updates, aggregate old worker service statuses into the result
// so that Replicas, ReadyReplicas, etc. reflect the total across old and new DCDs.
if rollingUpdateCtx.InProgress() {
oldWorkerStatuses, err := r.aggregateOldWorkerServiceStatuses(ctx, dynamoDeployment, rollingUpdateCtx)
if err != nil {
logger.Error(err, "failed to aggregate old worker service statuses")
// Non-fatal: continue with partial status
} else if len(oldWorkerStatuses) > 0 {
mergeWorkerServiceStatuses(result.ServiceStatus, oldWorkerStatuses)
}
}
return result, nil return result, nil
} }
func (r *DynamoGraphDeploymentReconciler) getExistingRestartAnnotationsDCD(ctx context.Context, dgd *nvidiacomv1alpha1.DynamoGraphDeployment) (map[string]string, error) { func (r *DynamoGraphDeploymentReconciler) getExistingRestartAnnotationsDCD(ctx context.Context, dgd *nvidiacomv1alpha1.DynamoGraphDeployment) (map[string]string, error) {
logger := log.FromContext(ctx) logger := log.FromContext(ctx)
computedHash := dynamo.ComputeDGDWorkersSpecHash(dgd)
restartAnnotations := make(map[string]string) restartAnnotations := make(map[string]string)
for serviceName := range dgd.Spec.Services { for serviceName := range dgd.Spec.Services {
dcdName := dynamo.GetDynamoComponentName(dgd, serviceName) dcdName := dynamo.GetDCDResourceName(dgd, serviceName, computedHash)
existingDCD := &nvidiacomv1alpha1.DynamoComponentDeployment{} existingDCD := &nvidiacomv1alpha1.DynamoComponentDeployment{}
err := r.Get(ctx, types.NamespacedName{Name: dcdName, Namespace: dgd.Namespace}, existingDCD) err := r.Get(ctx, types.NamespacedName{Name: dcdName, Namespace: dgd.Namespace}, existingDCD)
...@@ -1437,7 +1532,7 @@ func (r *DynamoGraphDeploymentReconciler) reconcileEPPResources(ctx context.Cont ...@@ -1437,7 +1532,7 @@ func (r *DynamoGraphDeploymentReconciler) reconcileEPPResources(ctx context.Cont
// 2. Reconcile InferencePool // 2. Reconcile InferencePool
// Note: EPP Service is created automatically by the standard component reconciliation // Note: EPP Service is created automatically by the standard component reconciliation
// via GenerateComponentService() in graph.go (see ComponentTypeEPP case) // via GenerateComponentService() in graph.go (see ComponentTypeEPP case)
eppServiceName := dynamo.GetDynamoComponentName(dgd, componentName) eppServiceName := dynamo.GetDCDResourceName(dgd, componentName, "")
inferencePool, err := epp.GenerateInferencePool(dgd, componentName, eppServiceName, eppService.EPPConfig) inferencePool, err := epp.GenerateInferencePool(dgd, componentName, eppServiceName, eppService.EPPConfig)
if err != nil { if err != nil {
logger.Error(err, "Failed to generate EPP InferencePool") logger.Error(err, "Failed to generate EPP InferencePool")
......
...@@ -22,7 +22,6 @@ import ( ...@@ -22,7 +22,6 @@ import (
"testing" "testing"
"github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1" "github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
commonconsts "github.com/ai-dynamo/dynamo/deploy/operator/internal/consts" commonconsts "github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/controller_common" "github.com/ai-dynamo/dynamo/deploy/operator/internal/controller_common"
grovev1alpha1 "github.com/ai-dynamo/grove/operator/api/core/v1alpha1" grovev1alpha1 "github.com/ai-dynamo/grove/operator/api/core/v1alpha1"
...@@ -157,7 +156,7 @@ func TestDynamoGraphDeploymentReconciler_reconcileScalingAdapters(t *testing.T) ...@@ -157,7 +156,7 @@ func TestDynamoGraphDeploymentReconciler_reconcileScalingAdapters(t *testing.T)
Name: "test-dgd-frontend", Name: "test-dgd-frontend",
Namespace: "default", Namespace: "default",
Labels: map[string]string{ Labels: map[string]string{
consts.KubeLabelDynamoGraphDeploymentName: "test-dgd", commonconsts.KubeLabelDynamoGraphDeploymentName: "test-dgd",
}, },
OwnerReferences: []metav1.OwnerReference{ OwnerReferences: []metav1.OwnerReference{
{ {
...@@ -181,7 +180,7 @@ func TestDynamoGraphDeploymentReconciler_reconcileScalingAdapters(t *testing.T) ...@@ -181,7 +180,7 @@ func TestDynamoGraphDeploymentReconciler_reconcileScalingAdapters(t *testing.T)
Name: "test-dgd-removed", Name: "test-dgd-removed",
Namespace: "default", Namespace: "default",
Labels: map[string]string{ Labels: map[string]string{
consts.KubeLabelDynamoGraphDeploymentName: "test-dgd", commonconsts.KubeLabelDynamoGraphDeploymentName: "test-dgd",
}, },
OwnerReferences: []metav1.OwnerReference{ OwnerReferences: []metav1.OwnerReference{
{ {
...@@ -230,7 +229,7 @@ func TestDynamoGraphDeploymentReconciler_reconcileScalingAdapters(t *testing.T) ...@@ -230,7 +229,7 @@ func TestDynamoGraphDeploymentReconciler_reconcileScalingAdapters(t *testing.T)
Name: "test-dgd-frontend", Name: "test-dgd-frontend",
Namespace: "default", Namespace: "default",
Labels: map[string]string{ Labels: map[string]string{
consts.KubeLabelDynamoGraphDeploymentName: "test-dgd", commonconsts.KubeLabelDynamoGraphDeploymentName: "test-dgd",
}, },
OwnerReferences: []metav1.OwnerReference{ OwnerReferences: []metav1.OwnerReference{
{ {
...@@ -412,6 +411,7 @@ func Test_reconcileGroveResources(t *testing.T) { ...@@ -412,6 +411,7 @@ func Test_reconcileGroveResources(t *testing.T) {
"frontend": { "frontend": {
ComponentKind: v1alpha1.ComponentKindPodClique, ComponentKind: v1alpha1.ComponentKindPodClique,
ComponentName: "test-dgd-0-frontend", ComponentName: "test-dgd-0-frontend",
ComponentNames: []string{"test-dgd-0-frontend"},
Replicas: 2, Replicas: 2,
UpdatedReplicas: 2, UpdatedReplicas: 2,
ReadyReplicas: ptr.To(int32(2)), ReadyReplicas: ptr.To(int32(2)),
...@@ -474,6 +474,7 @@ func Test_reconcileGroveResources(t *testing.T) { ...@@ -474,6 +474,7 @@ func Test_reconcileGroveResources(t *testing.T) {
"frontend": { "frontend": {
ComponentKind: v1alpha1.ComponentKindPodClique, ComponentKind: v1alpha1.ComponentKindPodClique,
ComponentName: "test-dgd-0-frontend", ComponentName: "test-dgd-0-frontend",
ComponentNames: []string{"test-dgd-0-frontend"},
Replicas: 1, Replicas: 1,
UpdatedReplicas: 1, UpdatedReplicas: 1,
ReadyReplicas: ptr.To(int32(1)), ReadyReplicas: ptr.To(int32(1)),
...@@ -481,6 +482,7 @@ func Test_reconcileGroveResources(t *testing.T) { ...@@ -481,6 +482,7 @@ func Test_reconcileGroveResources(t *testing.T) {
"decode": { "decode": {
ComponentKind: v1alpha1.ComponentKindPodClique, ComponentKind: v1alpha1.ComponentKindPodClique,
ComponentName: "test-dgd-0-decode", ComponentName: "test-dgd-0-decode",
ComponentNames: []string{"test-dgd-0-decode"},
Replicas: 2, Replicas: 2,
UpdatedReplicas: 1, UpdatedReplicas: 1,
ReadyReplicas: ptr.To(int32(1)), ReadyReplicas: ptr.To(int32(1)),
...@@ -549,6 +551,7 @@ func Test_reconcileGroveResources(t *testing.T) { ...@@ -549,6 +551,7 @@ func Test_reconcileGroveResources(t *testing.T) {
"decode": { "decode": {
ComponentKind: v1alpha1.ComponentKindPodCliqueScalingGroup, ComponentKind: v1alpha1.ComponentKindPodCliqueScalingGroup,
ComponentName: "test-dgd-0-decode", ComponentName: "test-dgd-0-decode",
ComponentNames: []string{"test-dgd-0-decode"},
Replicas: 1, Replicas: 1,
UpdatedReplicas: 1, UpdatedReplicas: 1,
AvailableReplicas: ptr.To(int32(1)), AvailableReplicas: ptr.To(int32(1)),
...@@ -556,6 +559,7 @@ func Test_reconcileGroveResources(t *testing.T) { ...@@ -556,6 +559,7 @@ func Test_reconcileGroveResources(t *testing.T) {
"prefill": { "prefill": {
ComponentKind: v1alpha1.ComponentKindPodCliqueScalingGroup, ComponentKind: v1alpha1.ComponentKindPodCliqueScalingGroup,
ComponentName: "test-dgd-0-prefill", ComponentName: "test-dgd-0-prefill",
ComponentNames: []string{"test-dgd-0-prefill"},
Replicas: 1, Replicas: 1,
UpdatedReplicas: 1, UpdatedReplicas: 1,
AvailableReplicas: ptr.To(int32(1)), AvailableReplicas: ptr.To(int32(1)),
...@@ -621,6 +625,7 @@ func Test_reconcileGroveResources(t *testing.T) { ...@@ -621,6 +625,7 @@ func Test_reconcileGroveResources(t *testing.T) {
"frontend": { "frontend": {
ComponentKind: v1alpha1.ComponentKindPodClique, ComponentKind: v1alpha1.ComponentKindPodClique,
ComponentName: "test-dgd-0-frontend", ComponentName: "test-dgd-0-frontend",
ComponentNames: []string{"test-dgd-0-frontend"},
Replicas: 1, Replicas: 1,
UpdatedReplicas: 1, UpdatedReplicas: 1,
ReadyReplicas: ptr.To(int32(1)), ReadyReplicas: ptr.To(int32(1)),
...@@ -628,6 +633,7 @@ func Test_reconcileGroveResources(t *testing.T) { ...@@ -628,6 +633,7 @@ func Test_reconcileGroveResources(t *testing.T) {
"aggregated": { "aggregated": {
ComponentKind: v1alpha1.ComponentKindPodCliqueScalingGroup, ComponentKind: v1alpha1.ComponentKindPodCliqueScalingGroup,
ComponentName: "test-dgd-0-aggregated", ComponentName: "test-dgd-0-aggregated",
ComponentNames: []string{"test-dgd-0-aggregated"},
Replicas: 2, Replicas: 2,
UpdatedReplicas: 2, UpdatedReplicas: 2,
AvailableReplicas: ptr.To(int32(1)), AvailableReplicas: ptr.To(int32(1)),
...@@ -1336,6 +1342,114 @@ func Test_computeRestartStatus(t *testing.T) { ...@@ -1336,6 +1342,114 @@ func Test_computeRestartStatus(t *testing.T) {
InProgress: []string{"frontend"}, // Reset to FIRST service InProgress: []string{"frontend"}, // Reset to FIRST service
}, },
}, },
{
name: "rolling update in progress + new restart request - superseded",
dgdSpec: v1alpha1.DynamoGraphDeploymentSpec{
Restart: &v1alpha1.Restart{
ID: newID,
},
Services: map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
"frontend": {
Replicas: ptr.To(int32(1)),
},
},
},
dgdStatus: v1alpha1.DynamoGraphDeploymentStatus{
RollingUpdate: &v1alpha1.RollingUpdateStatus{
Phase: v1alpha1.RollingUpdatePhaseInProgress,
},
},
wantRestartStatus: &v1alpha1.RestartStatus{
ObservedID: newID,
Phase: v1alpha1.RestartPhaseSuperseded,
},
},
{
name: "rolling update pending + restart already in progress - superseded",
dgdSpec: v1alpha1.DynamoGraphDeploymentSpec{
Restart: &v1alpha1.Restart{
ID: newID,
},
Services: map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
"frontend": {
Replicas: ptr.To(int32(1)),
},
},
},
dgdStatus: v1alpha1.DynamoGraphDeploymentStatus{
Restart: &v1alpha1.RestartStatus{
ObservedID: oldID,
Phase: v1alpha1.RestartPhaseRestarting,
InProgress: []string{"frontend"},
},
RollingUpdate: &v1alpha1.RollingUpdateStatus{
Phase: v1alpha1.RollingUpdatePhasePending,
},
},
wantRestartStatus: &v1alpha1.RestartStatus{
ObservedID: newID,
Phase: v1alpha1.RestartPhaseSuperseded,
},
},
{
name: "rolling update completed + restart request - normal processing",
dgdSpec: v1alpha1.DynamoGraphDeploymentSpec{
Restart: &v1alpha1.Restart{
ID: newID,
},
Services: map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
"frontend": {
Replicas: ptr.To(int32(1)),
},
},
},
dgdStatus: v1alpha1.DynamoGraphDeploymentStatus{
RollingUpdate: &v1alpha1.RollingUpdateStatus{
Phase: v1alpha1.RollingUpdatePhaseCompleted,
},
},
wantRestartStatus: &v1alpha1.RestartStatus{
ObservedID: newID,
Phase: v1alpha1.RestartPhaseRestarting,
InProgress: []string{"frontend"},
},
},
{
name: "restart already processed as superseded - returns existing status",
dgdSpec: v1alpha1.DynamoGraphDeploymentSpec{
Restart: &v1alpha1.Restart{
ID: newID,
},
Services: map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
"frontend": {
Replicas: ptr.To(int32(1)),
},
},
},
dgdStatus: v1alpha1.DynamoGraphDeploymentStatus{
Restart: &v1alpha1.RestartStatus{
ObservedID: newID,
Phase: v1alpha1.RestartPhaseSuperseded,
},
},
wantRestartStatus: &v1alpha1.RestartStatus{
ObservedID: newID,
Phase: v1alpha1.RestartPhaseSuperseded,
},
},
{
name: "no restart requested but has superseded status - preserves status",
dgdStatus: v1alpha1.DynamoGraphDeploymentStatus{
Restart: &v1alpha1.RestartStatus{
ObservedID: oldID,
Phase: v1alpha1.RestartPhaseSuperseded,
},
},
wantRestartStatus: &v1alpha1.RestartStatus{
ObservedID: oldID,
Phase: v1alpha1.RestartPhaseSuperseded,
},
},
} }
for _, tt := range tests { for _, tt := range tests {
...@@ -1577,7 +1691,7 @@ func Test_reconcileDynamoComponentsDeployments(t *testing.T) { ...@@ -1577,7 +1691,7 @@ func Test_reconcileDynamoComponentsDeployments(t *testing.T) {
}, },
&v1alpha1.DynamoComponentDeployment{ &v1alpha1.DynamoComponentDeployment{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "test-dgd-decode", Name: "test-dgd-decode-e1f2a6fe",
Namespace: "default", Namespace: "default",
}, },
Spec: v1alpha1.DynamoComponentDeploymentSpec{ Spec: v1alpha1.DynamoComponentDeploymentSpec{
...@@ -1596,7 +1710,7 @@ func Test_reconcileDynamoComponentsDeployments(t *testing.T) { ...@@ -1596,7 +1710,7 @@ func Test_reconcileDynamoComponentsDeployments(t *testing.T) {
}, },
Service: &v1alpha1.ServiceReplicaStatus{ Service: &v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindDeployment, ComponentKind: v1alpha1.ComponentKindDeployment,
ComponentName: "test-dgd-decode-deployment", ComponentName: "test-dgd-decode-e1f2a6fe-deployment",
Replicas: 2, Replicas: 2,
UpdatedReplicas: 2, UpdatedReplicas: 2,
ReadyReplicas: ptr.To(int32(2)), ReadyReplicas: ptr.To(int32(2)),
...@@ -1606,7 +1720,7 @@ func Test_reconcileDynamoComponentsDeployments(t *testing.T) { ...@@ -1606,7 +1720,7 @@ func Test_reconcileDynamoComponentsDeployments(t *testing.T) {
}, },
&v1alpha1.DynamoComponentDeployment{ &v1alpha1.DynamoComponentDeployment{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "test-dgd-prefill", Name: "test-dgd-prefill-e1f2a6fe",
Namespace: "default", Namespace: "default",
}, },
Spec: v1alpha1.DynamoComponentDeploymentSpec{ Spec: v1alpha1.DynamoComponentDeploymentSpec{
...@@ -1625,7 +1739,7 @@ func Test_reconcileDynamoComponentsDeployments(t *testing.T) { ...@@ -1625,7 +1739,7 @@ func Test_reconcileDynamoComponentsDeployments(t *testing.T) {
}, },
Service: &v1alpha1.ServiceReplicaStatus{ Service: &v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindDeployment, ComponentKind: v1alpha1.ComponentKindDeployment,
ComponentName: "test-dgd-prefill-deployment", ComponentName: "test-dgd-prefill-e1f2a6fe-deployment",
Replicas: 3, Replicas: 3,
UpdatedReplicas: 3, UpdatedReplicas: 3,
ReadyReplicas: ptr.To(int32(3)), ReadyReplicas: ptr.To(int32(3)),
...@@ -1649,7 +1763,7 @@ func Test_reconcileDynamoComponentsDeployments(t *testing.T) { ...@@ -1649,7 +1763,7 @@ func Test_reconcileDynamoComponentsDeployments(t *testing.T) {
}, },
"decode": { "decode": {
ComponentKind: v1alpha1.ComponentKindDeployment, ComponentKind: v1alpha1.ComponentKindDeployment,
ComponentName: "test-dgd-decode-deployment", ComponentName: "test-dgd-decode-e1f2a6fe-deployment",
Replicas: 2, Replicas: 2,
UpdatedReplicas: 2, UpdatedReplicas: 2,
ReadyReplicas: ptr.To(int32(2)), ReadyReplicas: ptr.To(int32(2)),
...@@ -1657,7 +1771,7 @@ func Test_reconcileDynamoComponentsDeployments(t *testing.T) { ...@@ -1657,7 +1771,7 @@ func Test_reconcileDynamoComponentsDeployments(t *testing.T) {
}, },
"prefill": { "prefill": {
ComponentKind: v1alpha1.ComponentKindDeployment, ComponentKind: v1alpha1.ComponentKindDeployment,
ComponentName: "test-dgd-prefill-deployment", ComponentName: "test-dgd-prefill-e1f2a6fe-deployment",
Replicas: 3, Replicas: 3,
UpdatedReplicas: 3, UpdatedReplicas: 3,
ReadyReplicas: ptr.To(int32(3)), ReadyReplicas: ptr.To(int32(3)),
...@@ -1723,7 +1837,7 @@ func Test_reconcileDynamoComponentsDeployments(t *testing.T) { ...@@ -1723,7 +1837,7 @@ func Test_reconcileDynamoComponentsDeployments(t *testing.T) {
}, },
&v1alpha1.DynamoComponentDeployment{ &v1alpha1.DynamoComponentDeployment{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "test-dgd-decode", Name: "test-dgd-decode-e1f2a6fe",
Namespace: "default", Namespace: "default",
}, },
Spec: v1alpha1.DynamoComponentDeploymentSpec{ Spec: v1alpha1.DynamoComponentDeploymentSpec{
...@@ -1742,7 +1856,7 @@ func Test_reconcileDynamoComponentsDeployments(t *testing.T) { ...@@ -1742,7 +1856,7 @@ func Test_reconcileDynamoComponentsDeployments(t *testing.T) {
}, },
Service: &v1alpha1.ServiceReplicaStatus{ Service: &v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindDeployment, ComponentKind: v1alpha1.ComponentKindDeployment,
ComponentName: "test-dgd-decode-deployment", ComponentName: "test-dgd-decode-e1f2a6fe-deployment",
Replicas: 2, Replicas: 2,
UpdatedReplicas: 1, UpdatedReplicas: 1,
ReadyReplicas: ptr.To(int32(1)), ReadyReplicas: ptr.To(int32(1)),
...@@ -1752,7 +1866,7 @@ func Test_reconcileDynamoComponentsDeployments(t *testing.T) { ...@@ -1752,7 +1866,7 @@ func Test_reconcileDynamoComponentsDeployments(t *testing.T) {
}, },
&v1alpha1.DynamoComponentDeployment{ &v1alpha1.DynamoComponentDeployment{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "test-dgd-prefill", Name: "test-dgd-prefill-e1f2a6fe",
Namespace: "default", Namespace: "default",
}, },
Spec: v1alpha1.DynamoComponentDeploymentSpec{ Spec: v1alpha1.DynamoComponentDeploymentSpec{
...@@ -1771,7 +1885,7 @@ func Test_reconcileDynamoComponentsDeployments(t *testing.T) { ...@@ -1771,7 +1885,7 @@ func Test_reconcileDynamoComponentsDeployments(t *testing.T) {
}, },
Service: &v1alpha1.ServiceReplicaStatus{ Service: &v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindDeployment, ComponentKind: v1alpha1.ComponentKindDeployment,
ComponentName: "test-dgd-prefill-deployment", ComponentName: "test-dgd-prefill-e1f2a6fe-deployment",
Replicas: 3, Replicas: 3,
UpdatedReplicas: 3, UpdatedReplicas: 3,
ReadyReplicas: ptr.To(int32(3)), ReadyReplicas: ptr.To(int32(3)),
...@@ -1783,7 +1897,7 @@ func Test_reconcileDynamoComponentsDeployments(t *testing.T) { ...@@ -1783,7 +1897,7 @@ func Test_reconcileDynamoComponentsDeployments(t *testing.T) {
wantReconcileResult: ReconcileResult{ wantReconcileResult: ReconcileResult{
State: DGDStatePending, State: DGDStatePending,
Reason: "some_resources_are_not_ready", Reason: "some_resources_are_not_ready",
Message: "Resources not ready: test-dgd-decode: Component deployment not ready - Available condition not true", Message: "Resources not ready: test-dgd-decode-e1f2a6fe: Component deployment not ready - Available condition not true",
ServiceStatus: map[string]v1alpha1.ServiceReplicaStatus{ ServiceStatus: map[string]v1alpha1.ServiceReplicaStatus{
"frontend": { "frontend": {
ComponentKind: v1alpha1.ComponentKindDeployment, ComponentKind: v1alpha1.ComponentKindDeployment,
...@@ -1795,7 +1909,7 @@ func Test_reconcileDynamoComponentsDeployments(t *testing.T) { ...@@ -1795,7 +1909,7 @@ func Test_reconcileDynamoComponentsDeployments(t *testing.T) {
}, },
"decode": { "decode": {
ComponentKind: v1alpha1.ComponentKindDeployment, ComponentKind: v1alpha1.ComponentKindDeployment,
ComponentName: "test-dgd-decode-deployment", ComponentName: "test-dgd-decode-e1f2a6fe-deployment",
Replicas: 2, Replicas: 2,
UpdatedReplicas: 1, UpdatedReplicas: 1,
ReadyReplicas: ptr.To(int32(1)), ReadyReplicas: ptr.To(int32(1)),
...@@ -1803,7 +1917,7 @@ func Test_reconcileDynamoComponentsDeployments(t *testing.T) { ...@@ -1803,7 +1917,7 @@ func Test_reconcileDynamoComponentsDeployments(t *testing.T) {
}, },
"prefill": { "prefill": {
ComponentKind: v1alpha1.ComponentKindDeployment, ComponentKind: v1alpha1.ComponentKindDeployment,
ComponentName: "test-dgd-prefill-deployment", ComponentName: "test-dgd-prefill-e1f2a6fe-deployment",
Replicas: 3, Replicas: 3,
UpdatedReplicas: 3, UpdatedReplicas: 3,
ReadyReplicas: ptr.To(int32(3)), ReadyReplicas: ptr.To(int32(3)),
...@@ -1863,7 +1977,7 @@ func Test_reconcileDynamoComponentsDeployments(t *testing.T) { ...@@ -1863,7 +1977,7 @@ func Test_reconcileDynamoComponentsDeployments(t *testing.T) {
}, },
&v1alpha1.DynamoComponentDeployment{ &v1alpha1.DynamoComponentDeployment{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "test-dgd-decode", Name: "test-dgd-decode-5f3d46ba",
Namespace: "default", Namespace: "default",
}, },
Spec: v1alpha1.DynamoComponentDeploymentSpec{ Spec: v1alpha1.DynamoComponentDeploymentSpec{
...@@ -1882,7 +1996,7 @@ func Test_reconcileDynamoComponentsDeployments(t *testing.T) { ...@@ -1882,7 +1996,7 @@ func Test_reconcileDynamoComponentsDeployments(t *testing.T) {
}, },
Service: &v1alpha1.ServiceReplicaStatus{ Service: &v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindDeployment, ComponentKind: v1alpha1.ComponentKindDeployment,
ComponentName: "test-dgd-decode-deployment", ComponentName: "test-dgd-decode-5f3d46ba-deployment",
Replicas: 2, Replicas: 2,
UpdatedReplicas: 1, UpdatedReplicas: 1,
ReadyReplicas: ptr.To(int32(1)), ReadyReplicas: ptr.To(int32(1)),
...@@ -1894,7 +2008,7 @@ func Test_reconcileDynamoComponentsDeployments(t *testing.T) { ...@@ -1894,7 +2008,7 @@ func Test_reconcileDynamoComponentsDeployments(t *testing.T) {
wantReconcileResult: ReconcileResult{ wantReconcileResult: ReconcileResult{
State: DGDStatePending, State: DGDStatePending,
Reason: "some_resources_are_not_ready", Reason: "some_resources_are_not_ready",
Message: "Resources not ready: test-dgd-decode: Component deployment not ready - Available condition not true; test-dgd-frontend: Component deployment not ready - Available condition not true", Message: "Resources not ready: test-dgd-decode-5f3d46ba: Component deployment not ready - Available condition not true; test-dgd-frontend: Component deployment not ready - Available condition not true",
ServiceStatus: map[string]v1alpha1.ServiceReplicaStatus{ ServiceStatus: map[string]v1alpha1.ServiceReplicaStatus{
"frontend": { "frontend": {
ComponentKind: v1alpha1.ComponentKindDeployment, ComponentKind: v1alpha1.ComponentKindDeployment,
...@@ -1906,7 +2020,7 @@ func Test_reconcileDynamoComponentsDeployments(t *testing.T) { ...@@ -1906,7 +2020,7 @@ func Test_reconcileDynamoComponentsDeployments(t *testing.T) {
}, },
"decode": { "decode": {
ComponentKind: v1alpha1.ComponentKindDeployment, ComponentKind: v1alpha1.ComponentKindDeployment,
ComponentName: "test-dgd-decode-deployment", ComponentName: "test-dgd-decode-5f3d46ba-deployment",
Replicas: 2, Replicas: 2,
UpdatedReplicas: 1, UpdatedReplicas: 1,
ReadyReplicas: ptr.To(int32(1)), ReadyReplicas: ptr.To(int32(1)),
......
...@@ -50,6 +50,7 @@ type ComponentContext struct { ...@@ -50,6 +50,7 @@ type ComponentContext struct {
ParentGraphDeploymentNamespace string ParentGraphDeploymentNamespace string
DiscoveryBackend string DiscoveryBackend string
EPPConfig *v1alpha1.EPPConfig EPPConfig *v1alpha1.EPPConfig
WorkerHashSuffix string
} }
func (b *BaseComponentDefaults) GetBaseContainer(context ComponentContext) (corev1.Container, error) { func (b *BaseComponentDefaults) GetBaseContainer(context ComponentContext) (corev1.Container, error) {
......
...@@ -76,6 +76,10 @@ func (f *FrontendDefaults) GetBaseContainer(context ComponentContext) (corev1.Co ...@@ -76,6 +76,10 @@ func (f *FrontendDefaults) GetBaseContainer(context ComponentContext) (corev1.Co
Name: "DYN_HTTP_PORT", // TODO: need to reconcile DYNAMO_PORT and DYN_HTTP_PORT Name: "DYN_HTTP_PORT", // TODO: need to reconcile DYNAMO_PORT and DYN_HTTP_PORT
Value: fmt.Sprintf("%d", commonconsts.DynamoServicePort), Value: fmt.Sprintf("%d", commonconsts.DynamoServicePort),
}, },
{
Name: commonconsts.DynamoNamespacePrefixEnvVar,
Value: context.DynamoNamespace,
},
}...) }...)
return container, nil return container, nil
......
...@@ -109,5 +109,14 @@ func (w *WorkerDefaults) GetBaseContainer(context ComponentContext) (corev1.Cont ...@@ -109,5 +109,14 @@ func (w *WorkerDefaults) GetBaseContainer(context ComponentContext) (corev1.Cont
}, },
}...) }...)
if context.WorkerHashSuffix != "" {
container.Env = append(container.Env, []corev1.EnvVar{
{
Name: commonconsts.DynamoNamespaceWorkerSuffixEnvVar,
Value: context.WorkerHashSuffix,
},
}...)
}
return container, nil return container, nil
} }
...@@ -84,6 +84,12 @@ func DetermineRestartState(dgd *v1alpha1.DynamoGraphDeployment, restartStatus *v ...@@ -84,6 +84,12 @@ func DetermineRestartState(dgd *v1alpha1.DynamoGraphDeployment, restartStatus *v
isNewRestart := restartStatus.ObservedID == "" || isNewRestart := restartStatus.ObservedID == "" ||
dgd.Spec.Restart.ID != restartStatus.ObservedID dgd.Spec.Restart.ID != restartStatus.ObservedID
if !isNewRestart && restartStatus.Phase == v1alpha1.RestartPhaseSuperseded {
// Superseded: don't push any new annotations. Existing annotations
// are preserved via the existingRestartAnnotations fallback path.
return nil
}
if !isNewRestart && restartStatus.Phase == v1alpha1.RestartPhaseCompleted { if !isNewRestart && restartStatus.Phase == v1alpha1.RestartPhaseCompleted {
return &RestartState{ return &RestartState{
Timestamp: specID, Timestamp: specID,
...@@ -249,100 +255,149 @@ func ParseDynDeploymentConfig(ctx context.Context, jsonContent []byte) (DynDeplo ...@@ -249,100 +255,149 @@ func ParseDynDeploymentConfig(ctx context.Context, jsonContent []byte) (DynDeplo
return config, err return config, err
} }
// GenerateDynamoComponentsDeployments generates a map of DynamoComponentDeployments from a DynamoGraphConfig func (r RollingUpdateContext) InProgress() bool {
func GenerateDynamoComponentsDeployments(ctx context.Context, parentDynamoGraphDeployment *v1alpha1.DynamoGraphDeployment, defaultIngressSpec *v1alpha1.IngressSpec, restartState *RestartState, existingRestartAnnotations map[string]string) (map[string]*v1alpha1.DynamoComponentDeployment, error) { return len(r.OldWorkerReplicas) > 0
}
// RollingUpdateContext provides information about an in-progress rolling update.
type RollingUpdateContext struct {
// NewWorkerHash is the short hash (8 chars) for the new worker spec, used for DCD naming
NewWorkerHash string
// OldWorkerReplicas maps service name to the desired replica count for old workers.
// Used by the controller to patch old worker DCDs directly.
// Calculated as: max(0, desiredReplicas - newReadyReplicas)
OldWorkerReplicas map[string]int32
// NewWorkerReplicas maps service name to the desired replica count for new workers.
// Calculated as: min(desiredReplicas, newReadyReplicas + 1) to gradually scale up.
NewWorkerReplicas map[string]int32
}
// GenerateDynamoComponentsDeployments generates a map of DynamoComponentDeployments from a DynamoGraphConfig.
// The map key is a unique identifier for each DCD (serviceName).
func GenerateDynamoComponentsDeployments(
ctx context.Context,
parentDGD *v1alpha1.DynamoGraphDeployment,
defaultIngressSpec *v1alpha1.IngressSpec,
restartState *RestartState,
existingRestartAnnotations map[string]string,
rollingUpdateCtx RollingUpdateContext,
) (map[string]*v1alpha1.DynamoComponentDeployment, error) {
deployments := make(map[string]*v1alpha1.DynamoComponentDeployment) deployments := make(map[string]*v1alpha1.DynamoComponentDeployment)
for componentName, component := range parentDynamoGraphDeployment.Spec.Services {
dynamoNamespace := GetDynamoNamespace(parentDynamoGraphDeployment, component)
deployment := &v1alpha1.DynamoComponentDeployment{}
deployment.Spec.DynamoComponentDeploymentSharedSpec = *component
deployment.Name = GetDynamoComponentName(parentDynamoGraphDeployment, componentName)
deployment.Spec.BackendFramework = parentDynamoGraphDeployment.Spec.BackendFramework
deployment.Namespace = parentDynamoGraphDeployment.Namespace
deployment.Spec.ServiceName = componentName
deployment.Spec.DynamoNamespace = &dynamoNamespace
labels := make(map[string]string)
// add the labels in the spec in order to label all sub-resources
deployment.Spec.Labels = labels
// and add the labels to the deployment itself
deployment.Labels = labels
labels[commonconsts.KubeLabelDynamoComponent] = componentName
labels[commonconsts.KubeLabelDynamoNamespace] = dynamoNamespace
labels[commonconsts.KubeLabelDynamoGraphDeploymentName] = parentDynamoGraphDeployment.Name
// Propagate annotations from parent deployment if present
if parentDynamoGraphDeployment.Annotations != nil {
if deployment.Spec.Annotations == nil {
deployment.Spec.Annotations = make(map[string]string)
}
if val, exists := parentDynamoGraphDeployment.Annotations[commonconsts.KubeAnnotationEnableMetrics]; exists {
deployment.Spec.Annotations[commonconsts.KubeAnnotationEnableMetrics] = val
}
if val, exists := parentDynamoGraphDeployment.Annotations[commonconsts.KubeAnnotationDynamoDiscoveryBackend]; exists {
deployment.Spec.Annotations[commonconsts.KubeAnnotationDynamoDiscoveryBackend] = val
}
// Propagate operator origin version for version-gated behavior in backends
if val, exists := parentDynamoGraphDeployment.Annotations[commonconsts.KubeAnnotationDynamoOperatorOriginVersion]; exists {
deployment.Spec.Annotations[commonconsts.KubeAnnotationDynamoOperatorOriginVersion] = val
}
}
// Apply restart annotation if this service should be restarted. // Generate DCDs for each service
// For services not in the current restart order, preserve their existing annotation for componentName, component := range parentDGD.Spec.Services {
// to avoid triggering unwanted rollouts when a new restart begins. dynamoNamespace := parentDGD.GetDynamoNamespaceForService(component)
if restartState.ShouldAnnotateService(componentName) { dcd, err := generateSingleDCD(ctx, parentDGD, componentName, component, dynamoNamespace, defaultIngressSpec, restartState, existingRestartAnnotations, rollingUpdateCtx)
if deployment.Spec.Annotations == nil { if err != nil {
deployment.Spec.Annotations = make(map[string]string) return nil, err
}
deployment.Spec.Annotations[commonconsts.RestartAnnotation] = restartState.Timestamp
} else if existingRestartAnnotations != nil {
if existingRestartAt, ok := existingRestartAnnotations[componentName]; ok && existingRestartAt != "" {
if deployment.Spec.Annotations == nil {
deployment.Spec.Annotations = make(map[string]string)
}
deployment.Spec.Annotations[commonconsts.RestartAnnotation] = existingRestartAt
}
} }
deployments[componentName] = dcd
}
if component.ComponentType == commonconsts.ComponentTypePlanner { return deployments, nil
// ensure that the extraPodSpec is not nil }
if deployment.Spec.ExtraPodSpec == nil {
deployment.Spec.ExtraPodSpec = &v1alpha1.ExtraPodSpec{} func GetDynamoNamespace(object metav1.Object, service *v1alpha1.DynamoComponentDeploymentSharedSpec) string {
} return v1alpha1.ComputeDynamoNamespace(service.GlobalDynamoNamespace, object.GetNamespace(), object.GetName())
// ensure that the embedded PodSpec struct is not nil }
if deployment.Spec.ExtraPodSpec.PodSpec == nil {
deployment.Spec.ExtraPodSpec.PodSpec = &corev1.PodSpec{} // generateSingleDCD creates a DynamoComponentDeployment for a single service.
} func generateSingleDCD(
// finally set the service account name ctx context.Context,
deployment.Spec.ExtraPodSpec.PodSpec.ServiceAccountName = commonconsts.PlannerServiceAccountName parentDGD *v1alpha1.DynamoGraphDeployment,
componentName string,
component *v1alpha1.DynamoComponentDeploymentSharedSpec,
dynamoNamespace string,
defaultIngressSpec *v1alpha1.IngressSpec,
restartState *RestartState,
existingRestartAnnotations map[string]string,
rollingUpdateCtx RollingUpdateContext,
) (*v1alpha1.DynamoComponentDeployment, error) {
deployment := &v1alpha1.DynamoComponentDeployment{}
deployment.Spec.DynamoComponentDeploymentSharedSpec = *component
deployment.Name = GetDCDResourceName(parentDGD, componentName, rollingUpdateCtx.NewWorkerHash)
deployment.Spec.BackendFramework = parentDGD.Spec.BackendFramework
deployment.Namespace = parentDGD.Namespace
deployment.Spec.ServiceName = componentName
deployment.Spec.DynamoNamespace = &dynamoNamespace
labels := make(map[string]string)
deployment.Spec.Labels = labels
deployment.Labels = labels
labels[commonconsts.KubeLabelDynamoComponent] = componentName
labels[commonconsts.KubeLabelDynamoNamespace] = dynamoNamespace
labels[commonconsts.KubeLabelDynamoGraphDeploymentName] = parentDGD.Name
// only label worker DCDs with their hash for cleanup during rolling updates
if IsWorkerComponent(component.ComponentType) {
labels[commonconsts.KubeLabelDynamoWorkerHash] = rollingUpdateCtx.NewWorkerHash
}
// Propagate metrics annotation from parent deployment if present
if parentDGD.Annotations != nil {
if deployment.Spec.Annotations == nil {
deployment.Spec.Annotations = make(map[string]string)
} }
if deployment.IsFrontendComponent() && defaultIngressSpec != nil && deployment.Spec.Ingress == nil { if val, exists := parentDGD.Annotations[commonconsts.KubeAnnotationEnableMetrics]; exists {
deployment.Spec.Ingress = defaultIngressSpec deployment.Spec.Annotations[commonconsts.KubeAnnotationEnableMetrics] = val
} }
// merge the envs from the parent deployment with the envs from the service if val, exists := parentDGD.Annotations[commonconsts.KubeAnnotationDynamoDiscoveryBackend]; exists {
if len(parentDynamoGraphDeployment.Spec.Envs) > 0 { deployment.Spec.Annotations[commonconsts.KubeAnnotationDynamoDiscoveryBackend] = val
deployment.Spec.Envs = MergeEnvs(parentDynamoGraphDeployment.Spec.Envs, deployment.Spec.Envs)
} }
err := updateDynDeploymentConfig(deployment, commonconsts.DynamoServicePort) // Propagate operator origin version for version-gated behavior in backends
if err != nil { if val, exists := parentDGD.Annotations[commonconsts.KubeAnnotationDynamoOperatorOriginVersion]; exists {
return nil, err deployment.Spec.Annotations[commonconsts.KubeAnnotationDynamoOperatorOriginVersion] = val
} }
err = overrideWithDynDeploymentConfig(ctx, deployment) }
if err != nil {
return nil, err // Apply restart annotation if this service should be restarted.
if restartState.ShouldAnnotateService(componentName) {
if deployment.Spec.Annotations == nil {
deployment.Spec.Annotations = make(map[string]string)
} }
// we only override the replicas if it is not set in the CRD. deployment.Spec.Annotations[commonconsts.RestartAnnotation] = restartState.Timestamp
// replicas, if set in the CRD must always be the source of truth. } else if existingRestartAnnotations != nil {
if component.Replicas != nil { if existingRestartAt, ok := existingRestartAnnotations[componentName]; ok && existingRestartAt != "" {
deployment.Spec.Replicas = component.Replicas if deployment.Spec.Annotations == nil {
deployment.Spec.Annotations = make(map[string]string)
}
deployment.Spec.Annotations[commonconsts.RestartAnnotation] = existingRestartAt
} }
deployments[componentName] = deployment
} }
return deployments, nil
}
func GetDynamoNamespace(object metav1.Object, service *v1alpha1.DynamoComponentDeploymentSharedSpec) string { if component.ComponentType == commonconsts.ComponentTypePlanner {
return v1alpha1.ComputeDynamoNamespace(service.GlobalDynamoNamespace, object.GetNamespace(), object.GetName()) if deployment.Spec.ExtraPodSpec == nil {
deployment.Spec.ExtraPodSpec = &v1alpha1.ExtraPodSpec{}
}
if deployment.Spec.ExtraPodSpec.PodSpec == nil {
deployment.Spec.ExtraPodSpec.PodSpec = &corev1.PodSpec{}
}
deployment.Spec.ExtraPodSpec.PodSpec.ServiceAccountName = commonconsts.PlannerServiceAccountName
}
if deployment.IsFrontendComponent() && defaultIngressSpec != nil && deployment.Spec.Ingress == nil {
deployment.Spec.Ingress = defaultIngressSpec
}
if len(parentDGD.Spec.Envs) > 0 {
deployment.Spec.Envs = MergeEnvs(parentDGD.Spec.Envs, deployment.Spec.Envs)
}
if err := updateDynDeploymentConfig(deployment, commonconsts.DynamoServicePort); err != nil {
return nil, err
}
if err := overrideWithDynDeploymentConfig(ctx, deployment); err != nil {
return nil, err
}
// during a rolling update, the replica count is determined by the rollingUpdateCtx instead of the component spec
if rollingUpdateCtx.InProgress() && IsWorkerComponent(component.ComponentType) && rollingUpdateCtx.NewWorkerReplicas[componentName] != 0 {
deployment.Spec.Replicas = ptr.To(rollingUpdateCtx.NewWorkerReplicas[componentName])
} else if component.Replicas != nil {
deployment.Spec.Replicas = component.Replicas
}
return deployment, nil
} }
// updateDynDeploymentConfig updates the runtime config object for the given dynamoDeploymentComponent // updateDynDeploymentConfig updates the runtime config object for the given dynamoDeploymentComponent
...@@ -449,8 +504,15 @@ func MergeEnvs(common, specific []corev1.EnvVar) []corev1.EnvVar { ...@@ -449,8 +504,15 @@ func MergeEnvs(common, specific []corev1.EnvVar) []corev1.EnvVar {
return merged return merged
} }
func GetDynamoComponentName(dynamoDeployment *v1alpha1.DynamoGraphDeployment, component string) string { // GetDCDResourceName returns the Kubernetes resource name for a DynamoComponentDeployment.
return fmt.Sprintf("%s-%s", dynamoDeployment.Name, strings.ToLower(component)) // If using for a non DCD resource (i.e. Ingress or VirtualService), use the empty string for the workerSuffix.
// For DCD Resources, Worker components include the workerSuffix; for non-workers, workerSuffix is ignored
func GetDCDResourceName(dgd *v1alpha1.DynamoGraphDeployment, serviceName string, workerSuffix string) string {
baseName := fmt.Sprintf("%s-%s", dgd.Name, strings.ToLower(serviceName))
if spec := dgd.Spec.Services[serviceName]; spec != nil && IsWorkerComponent(spec.ComponentType) && workerSuffix != "" {
return baseName + "-" + workerSuffix
}
return baseName
} }
type SecretsRetriever interface { type SecretsRetriever interface {
...@@ -555,7 +617,7 @@ func GenerateComponentService(ctx context.Context, dynamoDeployment *v1alpha1.Dy ...@@ -555,7 +617,7 @@ func GenerateComponentService(ctx context.Context, dynamoDeployment *v1alpha1.Dy
return nil, fmt.Errorf("expected DynamoComponentDeployment %s to have a dynamoNamespace", componentName) return nil, fmt.Errorf("expected DynamoComponentDeployment %s to have a dynamoNamespace", componentName)
} }
// DNS-safe service resource name: "{dgd-name}-{lowercase(componentName)}" // DNS-safe service resource name: "{dgd-name}-{lowercase(componentName)}"
kubeServiceName := GetDynamoComponentName(dynamoDeployment, componentName) kubeServiceName := GetDCDResourceName(dynamoDeployment, componentName, "")
var servicePort corev1.ServicePort var servicePort corev1.ServicePort
switch component.ComponentType { switch component.ComponentType {
...@@ -845,8 +907,8 @@ func MultinodeDeployerFactory(multinodeDeploymentType commonconsts.MultinodeDepl ...@@ -845,8 +907,8 @@ func MultinodeDeployerFactory(multinodeDeploymentType commonconsts.MultinodeDepl
} }
} }
// isWorkerComponent checks if a component is a worker that needs backend framework detection // IsWorkerComponent checks if a component is a worker that needs backend framework detection
func isWorkerComponent(componentType string) bool { func IsWorkerComponent(componentType string) bool {
return componentType == commonconsts.ComponentTypeWorker || return componentType == commonconsts.ComponentTypeWorker ||
componentType == commonconsts.ComponentTypePrefill || componentType == commonconsts.ComponentTypePrefill ||
componentType == commonconsts.ComponentTypeDecode componentType == commonconsts.ComponentTypeDecode
...@@ -1129,6 +1191,11 @@ func setMetricsLabels(labels map[string]string, dynamoGraphDeployment *v1alpha1. ...@@ -1129,6 +1191,11 @@ func setMetricsLabels(labels map[string]string, dynamoGraphDeployment *v1alpha1.
func generateComponentContext(component *v1alpha1.DynamoComponentDeploymentSharedSpec, parentGraphDeploymentName string, namespace string, numberOfNodes int32, discoveryBackend string) ComponentContext { func generateComponentContext(component *v1alpha1.DynamoComponentDeploymentSharedSpec, parentGraphDeploymentName string, namespace string, numberOfNodes int32, discoveryBackend string) ComponentContext {
dynamoNamespace := v1alpha1.ComputeDynamoNamespace(component.GlobalDynamoNamespace, namespace, parentGraphDeploymentName) dynamoNamespace := v1alpha1.ComputeDynamoNamespace(component.GlobalDynamoNamespace, namespace, parentGraphDeploymentName)
var workerHashSuffix string
if IsWorkerComponent(component.ComponentType) && component.Labels[commonconsts.KubeLabelDynamoWorkerHash] != "" {
workerHashSuffix = component.Labels[commonconsts.KubeLabelDynamoWorkerHash]
}
componentContext := ComponentContext{ componentContext := ComponentContext{
numberOfNodes: numberOfNodes, numberOfNodes: numberOfNodes,
ComponentType: component.ComponentType, ComponentType: component.ComponentType,
...@@ -1137,6 +1204,7 @@ func generateComponentContext(component *v1alpha1.DynamoComponentDeploymentShare ...@@ -1137,6 +1204,7 @@ func generateComponentContext(component *v1alpha1.DynamoComponentDeploymentShare
DiscoveryBackend: discoveryBackend, DiscoveryBackend: discoveryBackend,
DynamoNamespace: dynamoNamespace, DynamoNamespace: dynamoNamespace,
EPPConfig: component.EPPConfig, EPPConfig: component.EPPConfig,
WorkerHashSuffix: workerHashSuffix,
} }
return componentContext return componentContext
} }
...@@ -1316,7 +1384,7 @@ func GenerateGrovePodCliqueSet( ...@@ -1316,7 +1384,7 @@ func GenerateGrovePodCliqueSet(
func generateLabels(component *v1alpha1.DynamoComponentDeploymentSharedSpec, dynamoDeployment *v1alpha1.DynamoGraphDeployment, componentName string) (map[string]string, error) { func generateLabels(component *v1alpha1.DynamoComponentDeploymentSharedSpec, dynamoDeployment *v1alpha1.DynamoGraphDeployment, componentName string) (map[string]string, error) {
labels := make(map[string]string) labels := make(map[string]string)
labels[commonconsts.KubeLabelDynamoSelector] = GetDynamoComponentName(dynamoDeployment, componentName) labels[commonconsts.KubeLabelDynamoSelector] = GetDCDResourceName(dynamoDeployment, componentName, "")
labels[commonconsts.KubeLabelDynamoGraphDeploymentName] = dynamoDeployment.Name labels[commonconsts.KubeLabelDynamoGraphDeploymentName] = dynamoDeployment.Name
labels[commonconsts.KubeLabelDynamoComponent] = componentName labels[commonconsts.KubeLabelDynamoComponent] = componentName
if component.DynamoNamespace != nil { if component.DynamoNamespace != nil {
...@@ -1409,7 +1477,7 @@ func determineBackendFramework( ...@@ -1409,7 +1477,7 @@ func determineBackendFramework(
explicitBackendFramework string, explicitBackendFramework string,
) (BackendFramework, error) { ) (BackendFramework, error) {
// Check if this is a worker component - if not, use noop backend // Check if this is a worker component - if not, use noop backend
if !isWorkerComponent(componentType) { if !IsWorkerComponent(componentType) {
return BackendFrameworkNoop, nil return BackendFrameworkNoop, nil
} }
......
...@@ -126,6 +126,7 @@ func CheckPodCliqueReady(ctx context.Context, client client.Client, resourceName ...@@ -126,6 +126,7 @@ func CheckPodCliqueReady(ctx context.Context, client client.Client, resourceName
serviceStatus := v1alpha1.ServiceReplicaStatus{ serviceStatus := v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindPodClique, ComponentKind: v1alpha1.ComponentKindPodClique,
ComponentName: resourceName, ComponentName: resourceName,
ComponentNames: []string{resourceName},
Replicas: podClique.Status.Replicas, Replicas: podClique.Status.Replicas,
UpdatedReplicas: podClique.Status.UpdatedReplicas, UpdatedReplicas: podClique.Status.UpdatedReplicas,
ReadyReplicas: &readyReplicas, ReadyReplicas: &readyReplicas,
...@@ -199,6 +200,7 @@ func CheckPCSGReady(ctx context.Context, client client.Client, resourceName, nam ...@@ -199,6 +200,7 @@ func CheckPCSGReady(ctx context.Context, client client.Client, resourceName, nam
serviceStatus := v1alpha1.ServiceReplicaStatus{ serviceStatus := v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindPodCliqueScalingGroup, ComponentKind: v1alpha1.ComponentKindPodCliqueScalingGroup,
ComponentName: resourceName, ComponentName: resourceName,
ComponentNames: []string{resourceName},
Replicas: pcsg.Status.Replicas, Replicas: pcsg.Status.Replicas,
UpdatedReplicas: pcsg.Status.UpdatedReplicas, UpdatedReplicas: pcsg.Status.UpdatedReplicas,
AvailableReplicas: &availableReplicas, AvailableReplicas: &availableReplicas,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment