Unverified Commit aa1bc3c5 authored by Thomas Montfort's avatar Thomas Montfort Committed by GitHub
Browse files

feat(operator): DGD service status replica information (#4863)

parent f790e921
...@@ -10400,6 +10400,59 @@ spec: ...@@ -10400,6 +10400,59 @@ spec:
PodSelector contains the labels that can be used to select Pods belonging to PodSelector contains the labels that can be used to select Pods belonging to
this component deployment. this component deployment.
type: object type: object
service:
description: Service contains replica status information for this service.
properties:
availableReplicas:
description: |-
AvailableReplicas is the number of available replicas.
For Deployment: replicas ready for >= minReadySeconds.
For PodCliqueScalingGroup: replicas where all constituent PodCliques have >= MinAvailable ready pods.
Not available for PodClique or LeaderWorkerSet.
When nil, the field is omitted from the API response.
format: int32
minimum: 0
type: integer
componentKind:
description: ComponentKind is the underlying resource kind (e.g., "PodClique", "PodCliqueScalingGroup", "Deployment", "LeaderWorkerSet").
enum:
- PodClique
- PodCliqueScalingGroup
- Deployment
- LeaderWorkerSet
type: string
componentName:
description: ComponentName is the name of the underlying resource.
type: string
readyReplicas:
description: |-
ReadyReplicas is the number of ready replicas.
Populated for PodClique, Deployment, and LeaderWorkerSet.
Not available for PodCliqueScalingGroup.
When nil, the field is omitted from the API response.
format: int32
minimum: 0
type: integer
replicas:
description: |-
Replicas is the total number of non-terminated replicas.
Required for all component kinds.
format: int32
minimum: 0
type: integer
updatedReplicas:
description: |-
UpdatedReplicas is the number of replicas at the current/desired revision.
Required for all component kinds.
format: int32
minimum: 0
type: integer
required:
- componentKind
- componentName
- replicas
- updatedReplicas
type: object
required: required:
- conditions - conditions
type: object type: object
......
...@@ -10530,6 +10530,64 @@ spec: ...@@ -10530,6 +10530,64 @@ spec:
- type - type
type: object type: object
type: array type: array
services:
additionalProperties:
description: ServiceReplicaStatus contains replica information for a single service.
properties:
availableReplicas:
description: |-
AvailableReplicas is the number of available replicas.
For Deployment: replicas ready for >= minReadySeconds.
For PodCliqueScalingGroup: replicas where all constituent PodCliques have >= MinAvailable ready pods.
Not available for PodClique or LeaderWorkerSet.
When nil, the field is omitted from the API response.
format: int32
minimum: 0
type: integer
componentKind:
description: ComponentKind is the underlying resource kind (e.g., "PodClique", "PodCliqueScalingGroup", "Deployment", "LeaderWorkerSet").
enum:
- PodClique
- PodCliqueScalingGroup
- Deployment
- LeaderWorkerSet
type: string
componentName:
description: ComponentName is the name of the underlying resource.
type: string
readyReplicas:
description: |-
ReadyReplicas is the number of ready replicas.
Populated for PodClique, Deployment, and LeaderWorkerSet.
Not available for PodCliqueScalingGroup.
When nil, the field is omitted from the API response.
format: int32
minimum: 0
type: integer
replicas:
description: |-
Replicas is the total number of non-terminated replicas.
Required for all component kinds.
format: int32
minimum: 0
type: integer
updatedReplicas:
description: |-
UpdatedReplicas is the number of replicas at the current/desired revision.
Required for all component kinds.
format: int32
minimum: 0
type: integer
required:
- componentKind
- componentName
- replicas
- updatedReplicas
type: object
description: |-
Services contains per-service replica status information.
The map key is the service name from spec.services.
type: object
state: state:
description: State is a high-level textual status of the graph deployment lifecycle. description: State is a high-level textual status of the graph deployment lifecycle.
type: string type: string
......
...@@ -179,6 +179,10 @@ type DynamoComponentDeploymentStatus struct { ...@@ -179,6 +179,10 @@ type DynamoComponentDeploymentStatus struct {
// PodSelector contains the labels that can be used to select Pods belonging to // PodSelector contains the labels that can be used to select Pods belonging to
// this component deployment. // this component deployment.
PodSelector map[string]string `json:"podSelector,omitempty"` PodSelector map[string]string `json:"podSelector,omitempty"`
// Service contains replica status information for this service.
// +optional
Service ServiceReplicaStatus `json:"service,omitempty"`
} }
// +genclient // +genclient
...@@ -218,6 +222,10 @@ func (s *DynamoComponentDeployment) IsReady() (bool, string) { ...@@ -218,6 +222,10 @@ func (s *DynamoComponentDeployment) IsReady() (bool, string) {
return ready, reason return ready, reason
} }
func (s *DynamoComponentDeployment) GetServiceStatuses() map[string]ServiceReplicaStatus {
return map[string]ServiceReplicaStatus{s.Spec.ServiceName: s.Status.Service}
}
func (s *DynamoComponentDeploymentStatus) IsReady() (bool, string) { func (s *DynamoComponentDeploymentStatus) IsReady() (bool, string) {
for _, condition := range s.Conditions { for _, condition := range s.Conditions {
if condition.Type == DynamoGraphDeploymentConditionTypeAvailable && condition.Status == metav1.ConditionTrue { if condition.Type == DynamoGraphDeploymentConditionTypeAvailable && condition.Status == metav1.ConditionTrue {
......
...@@ -27,6 +27,21 @@ import ( ...@@ -27,6 +27,21 @@ import (
// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN! // EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. // NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized.
// ComponentKind represents the type of underlying Kubernetes resource.
// +kubebuilder:validation:Enum=PodClique;PodCliqueScalingGroup;Deployment;LeaderWorkerSet
type ComponentKind string
const (
// ComponentKindPodClique represents a PodClique resource.
ComponentKindPodClique ComponentKind = "PodClique"
// ComponentKindPodCliqueScalingGroup represents a PodCliqueScalingGroup resource.
ComponentKindPodCliqueScalingGroup ComponentKind = "PodCliqueScalingGroup"
// ComponentKindDeployment represents a Deployment resource.
ComponentKindDeployment ComponentKind = "Deployment"
// ComponentKindLeaderWorkerSet represents a LeaderWorkerSet resource.
ComponentKindLeaderWorkerSet ComponentKind = "LeaderWorkerSet"
)
// DynamoGraphDeploymentSpec defines the desired state of DynamoGraphDeployment. // DynamoGraphDeploymentSpec defines the desired state of DynamoGraphDeployment.
type DynamoGraphDeploymentSpec struct { type DynamoGraphDeploymentSpec struct {
// PVCs defines a list of persistent volume claims that can be referenced by components. // PVCs defines a list of persistent volume claims that can be referenced by components.
...@@ -54,6 +69,45 @@ type DynamoGraphDeploymentStatus struct { ...@@ -54,6 +69,45 @@ type DynamoGraphDeploymentStatus struct {
// Conditions contains the latest observed conditions of the graph deployment. // Conditions contains the latest observed conditions of the graph deployment.
// The slice is merged by type on patch updates. // The slice is merged by type on patch updates.
Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type"` Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type"`
// Services contains per-service replica status information.
// The map key is the service name from spec.services.
// +optional
Services map[string]ServiceReplicaStatus `json:"services,omitempty"`
}
// ServiceReplicaStatus contains replica information for a single service.
type ServiceReplicaStatus struct {
// ComponentKind is the underlying resource kind (e.g., "PodClique", "PodCliqueScalingGroup", "Deployment", "LeaderWorkerSet").
ComponentKind ComponentKind `json:"componentKind"`
// ComponentName is the name of the underlying resource.
ComponentName string `json:"componentName"`
// Replicas is the total number of non-terminated replicas.
// Required for all component kinds.
// +kubebuilder:validation:Minimum=0
Replicas int32 `json:"replicas"`
// UpdatedReplicas is the number of replicas at the current/desired revision.
// Required for all component kinds.
// +kubebuilder:validation:Minimum=0
UpdatedReplicas int32 `json:"updatedReplicas"`
// ReadyReplicas is the number of ready replicas.
// Populated for PodClique, Deployment, and LeaderWorkerSet.
// Not available for PodCliqueScalingGroup.
// When nil, the field is omitted from the API response.
// +optional
// +kubebuilder:validation:Minimum=0
ReadyReplicas *int32 `json:"readyReplicas,omitempty"`
// AvailableReplicas is the number of available replicas.
// For Deployment: replicas ready for >= minReadySeconds.
// For PodCliqueScalingGroup: replicas where all constituent PodCliques have >= MinAvailable ready pods.
// Not available for PodClique or LeaderWorkerSet.
// When nil, the field is omitted from the API response.
// +optional
// +kubebuilder:validation:Minimum=0
AvailableReplicas *int32 `json:"availableReplicas,omitempty"`
} }
// +kubebuilder:object:root=true // +kubebuilder:object:root=true
......
...@@ -421,6 +421,7 @@ func (in *DynamoComponentDeploymentStatus) DeepCopyInto(out *DynamoComponentDepl ...@@ -421,6 +421,7 @@ func (in *DynamoComponentDeploymentStatus) DeepCopyInto(out *DynamoComponentDepl
(*out)[key] = val (*out)[key] = val
} }
} }
in.Service.DeepCopyInto(&out.Service)
} }
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DynamoComponentDeploymentStatus. // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DynamoComponentDeploymentStatus.
...@@ -768,6 +769,13 @@ func (in *DynamoGraphDeploymentStatus) DeepCopyInto(out *DynamoGraphDeploymentSt ...@@ -768,6 +769,13 @@ func (in *DynamoGraphDeploymentStatus) DeepCopyInto(out *DynamoGraphDeploymentSt
(*in)[i].DeepCopyInto(&(*out)[i]) (*in)[i].DeepCopyInto(&(*out)[i])
} }
} }
if in.Services != nil {
in, out := &in.Services, &out.Services
*out = make(map[string]ServiceReplicaStatus, len(*in))
for key, val := range *in {
(*out)[key] = *val.DeepCopy()
}
}
} }
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DynamoGraphDeploymentStatus. // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DynamoGraphDeploymentStatus.
...@@ -1214,6 +1222,31 @@ func (in *ScalingAdapter) DeepCopy() *ScalingAdapter { ...@@ -1214,6 +1222,31 @@ func (in *ScalingAdapter) DeepCopy() *ScalingAdapter {
return out return out
} }
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ServiceReplicaStatus) DeepCopyInto(out *ServiceReplicaStatus) {
*out = *in
if in.ReadyReplicas != nil {
in, out := &in.ReadyReplicas, &out.ReadyReplicas
*out = new(int32)
**out = **in
}
if in.AvailableReplicas != nil {
in, out := &in.AvailableReplicas, &out.AvailableReplicas
*out = new(int32)
**out = **in
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ServiceReplicaStatus.
func (in *ServiceReplicaStatus) DeepCopy() *ServiceReplicaStatus {
if in == nil {
return nil
}
out := new(ServiceReplicaStatus)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *SharedMemorySpec) DeepCopyInto(out *SharedMemorySpec) { func (in *SharedMemorySpec) DeepCopyInto(out *SharedMemorySpec) {
*out = *in *out = *in
......
...@@ -10400,6 +10400,59 @@ spec: ...@@ -10400,6 +10400,59 @@ spec:
PodSelector contains the labels that can be used to select Pods belonging to PodSelector contains the labels that can be used to select Pods belonging to
this component deployment. this component deployment.
type: object type: object
service:
description: Service contains replica status information for this service.
properties:
availableReplicas:
description: |-
AvailableReplicas is the number of available replicas.
For Deployment: replicas ready for >= minReadySeconds.
For PodCliqueScalingGroup: replicas where all constituent PodCliques have >= MinAvailable ready pods.
Not available for PodClique or LeaderWorkerSet.
When nil, the field is omitted from the API response.
format: int32
minimum: 0
type: integer
componentKind:
description: ComponentKind is the underlying resource kind (e.g., "PodClique", "PodCliqueScalingGroup", "Deployment", "LeaderWorkerSet").
enum:
- PodClique
- PodCliqueScalingGroup
- Deployment
- LeaderWorkerSet
type: string
componentName:
description: ComponentName is the name of the underlying resource.
type: string
readyReplicas:
description: |-
ReadyReplicas is the number of ready replicas.
Populated for PodClique, Deployment, and LeaderWorkerSet.
Not available for PodCliqueScalingGroup.
When nil, the field is omitted from the API response.
format: int32
minimum: 0
type: integer
replicas:
description: |-
Replicas is the total number of non-terminated replicas.
Required for all component kinds.
format: int32
minimum: 0
type: integer
updatedReplicas:
description: |-
UpdatedReplicas is the number of replicas at the current/desired revision.
Required for all component kinds.
format: int32
minimum: 0
type: integer
required:
- componentKind
- componentName
- replicas
- updatedReplicas
type: object
required: required:
- conditions - conditions
type: object type: object
......
...@@ -10530,6 +10530,64 @@ spec: ...@@ -10530,6 +10530,64 @@ spec:
- type - type
type: object type: object
type: array type: array
services:
additionalProperties:
description: ServiceReplicaStatus contains replica information for a single service.
properties:
availableReplicas:
description: |-
AvailableReplicas is the number of available replicas.
For Deployment: replicas ready for >= minReadySeconds.
For PodCliqueScalingGroup: replicas where all constituent PodCliques have >= MinAvailable ready pods.
Not available for PodClique or LeaderWorkerSet.
When nil, the field is omitted from the API response.
format: int32
minimum: 0
type: integer
componentKind:
description: ComponentKind is the underlying resource kind (e.g., "PodClique", "PodCliqueScalingGroup", "Deployment", "LeaderWorkerSet").
enum:
- PodClique
- PodCliqueScalingGroup
- Deployment
- LeaderWorkerSet
type: string
componentName:
description: ComponentName is the name of the underlying resource.
type: string
readyReplicas:
description: |-
ReadyReplicas is the number of ready replicas.
Populated for PodClique, Deployment, and LeaderWorkerSet.
Not available for PodCliqueScalingGroup.
When nil, the field is omitted from the API response.
format: int32
minimum: 0
type: integer
replicas:
description: |-
Replicas is the total number of non-terminated replicas.
Required for all component kinds.
format: int32
minimum: 0
type: integer
updatedReplicas:
description: |-
UpdatedReplicas is the number of replicas at the current/desired revision.
Required for all component kinds.
format: int32
minimum: 0
type: integer
required:
- componentKind
- componentName
- replicas
- updatedReplicas
type: object
description: |-
Services contains per-service replica status information.
The map key is the service name from spec.services.
type: object
state: state:
description: State is a high-level textual status of the graph deployment lifecycle. description: State is a high-level textual status of the graph deployment lifecycle.
type: string type: string
......
...@@ -226,22 +226,125 @@ func (r *DynamoComponentDeploymentReconciler) Reconcile(ctx context.Context, req ...@@ -226,22 +226,125 @@ func (r *DynamoComponentDeploymentReconciler) Reconcile(ctx context.Context, req
} }
} }
modified := false
// Create the appropriate workload resource based on deployment type // Create the appropriate workload resource based on deployment type
var leaderWorkerSets []*leaderworkersetv1.LeaderWorkerSet var componentReconcileResult ComponentReconcileResult
var deployment *appsv1.Deployment
if r.Config.LWS.Enabled && dynamoComponentDeployment.IsMultinode() { if r.Config.LWS.Enabled && dynamoComponentDeployment.IsMultinode() {
componentReconcileResult, err = r.reconcileLeaderWorkerSetResources(ctx, dynamoComponentDeployment)
} else {
componentReconcileResult, err = r.reconcileDeploymentResources(ctx, dynamoComponentDeployment)
}
if err != nil {
return ctrl.Result{}, fmt.Errorf("failed to reconcile the resources: %w", err)
}
modified := componentReconcileResult.modified
// create or update api-server service
serviceModified, err := r.createOrUpdateOrDeleteServices(ctx, generateResourceOption{
dynamoComponentDeployment: dynamoComponentDeployment,
})
if err != nil {
return ctrl.Result{}, fmt.Errorf("failed to create or update the service: %w", err)
}
// create or update headless service for model endpoint discovery
componentMap := map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
dynamoComponentDeployment.Name: &dynamoComponentDeployment.Spec.DynamoComponentDeploymentSharedSpec,
}
if err := dynamo.ReconcileModelServicesForComponents(
ctx,
r,
dynamoComponentDeployment,
componentMap,
dynamoComponentDeployment.Namespace,
); err != nil {
logs.Error(err, "Failed to reconcile model service")
return ctrl.Result{}, err
}
// create or update api-server ingresses
ingressModified, err := r.createOrUpdateOrDeleteIngress(ctx, generateResourceOption{
dynamoComponentDeployment: dynamoComponentDeployment,
})
if err != nil {
return ctrl.Result{}, fmt.Errorf("failed to create or update the ingress: %w", err)
}
if serviceModified || ingressModified {
modified = true
}
if !modified {
r.Recorder.Eventf(dynamoComponentDeployment, corev1.EventTypeNormal, "UpdateDynamoGraphDeployment", "No changes to dynamo deployment %s", dynamoComponentDeployment.Name)
}
logs.Info("Finished reconciling.")
r.Recorder.Eventf(dynamoComponentDeployment, corev1.EventTypeNormal, "Update", "All resources updated!")
err = r.setStatusConditionAndServiceReplicaStatus(ctx, dynamoComponentDeployment, componentReconcileResult)
if err != nil {
return ctrl.Result{}, fmt.Errorf("failed to set status condition and service replica status: %w", err)
}
return
}
type ComponentReconcileResult struct {
modified bool
status metav1.ConditionStatus
reason string
message string
serviceReplicaStatus v1alpha1.ServiceReplicaStatus
}
func (r *DynamoComponentDeploymentReconciler) reconcileDeploymentResources(ctx context.Context, dynamoComponentDeployment *v1alpha1.DynamoComponentDeployment) (ComponentReconcileResult, error) {
logger := log.FromContext(ctx)
deploymentModified, deployment, err := r.createOrUpdateOrDeleteDeployments(ctx, generateResourceOption{
dynamoComponentDeployment: dynamoComponentDeployment,
})
if err != nil {
return ComponentReconcileResult{}, fmt.Errorf("failed to create or update the deployment: %w", err)
}
serviceReplicaStatus := v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindDeployment,
ComponentName: deployment.Name,
Replicas: deployment.Status.Replicas,
UpdatedReplicas: deployment.Status.UpdatedReplicas,
ReadyReplicas: &deployment.Status.ReadyReplicas,
AvailableReplicas: &deployment.Status.AvailableReplicas,
}
if IsDeploymentReady(deployment) {
logger.Info("Deployment is ready. Setting available status condition to true.")
return ComponentReconcileResult{
modified: deploymentModified,
status: metav1.ConditionTrue,
reason: "DeploymentReady",
message: "Deployment is ready",
serviceReplicaStatus: serviceReplicaStatus,
}, nil
}
return ComponentReconcileResult{
modified: deploymentModified,
status: metav1.ConditionFalse,
reason: "DeploymentNotReady",
message: "Deployment is not ready",
serviceReplicaStatus: serviceReplicaStatus,
}, nil
}
func (r *DynamoComponentDeploymentReconciler) reconcileLeaderWorkerSetResources(ctx context.Context, dynamoComponentDeployment *v1alpha1.DynamoComponentDeployment) (ComponentReconcileResult, error) {
logger := log.FromContext(ctx)
desiredReplicas := int32(1) desiredReplicas := int32(1)
if dynamoComponentDeployment.Spec.Replicas != nil { if dynamoComponentDeployment.Spec.Replicas != nil {
desiredReplicas = *dynamoComponentDeployment.Spec.Replicas desiredReplicas = *dynamoComponentDeployment.Spec.Replicas
} }
anyModified := false anyModified := false
leaderWorkerSets := make([]*leaderworkersetv1.LeaderWorkerSet, 0, desiredReplicas)
for i := range int(desiredReplicas) { for i := range int(desiredReplicas) {
volcanoPodGroupModified, _, err := commonController.SyncResource(ctx, r, dynamoComponentDeployment, func(ctx context.Context) (*volcanov1beta1.PodGroup, bool, error) {
modified_, _, err := commonController.SyncResource(ctx, r, dynamoComponentDeployment, func(ctx context.Context) (*volcanov1beta1.PodGroup, bool, error) {
return r.generateVolcanoPodGroup(ctx, generateResourceOption{ return r.generateVolcanoPodGroup(ctx, generateResourceOption{
dynamoComponentDeployment: dynamoComponentDeployment, dynamoComponentDeployment: dynamoComponentDeployment,
isStealingTrafficDebugModeEnabled: false, isStealingTrafficDebugModeEnabled: false,
...@@ -249,16 +352,11 @@ func (r *DynamoComponentDeploymentReconciler) Reconcile(ctx context.Context, req ...@@ -249,16 +352,11 @@ func (r *DynamoComponentDeploymentReconciler) Reconcile(ctx context.Context, req
instanceID: &i, instanceID: &i,
}) })
}) })
if err != nil { if err != nil {
return ctrl.Result{}, err return ComponentReconcileResult{}, fmt.Errorf("failed to sync the PodGroup: %w", err)
}
if modified_ {
anyModified = true
} }
modified_, lwsObj, err := commonController.SyncResource(ctx, r, dynamoComponentDeployment, func(ctx context.Context) (*leaderworkersetv1.LeaderWorkerSet, bool, error) { leaderWorkerSetModified, lwsObj, err := commonController.SyncResource(ctx, r, dynamoComponentDeployment, func(ctx context.Context) (*leaderworkersetv1.LeaderWorkerSet, bool, error) {
return r.generateLeaderWorkerSet(ctx, generateResourceOption{ return r.generateLeaderWorkerSet(ctx, generateResourceOption{
dynamoComponentDeployment: dynamoComponentDeployment, dynamoComponentDeployment: dynamoComponentDeployment,
isStealingTrafficDebugModeEnabled: false, isStealingTrafficDebugModeEnabled: false,
...@@ -266,15 +364,13 @@ func (r *DynamoComponentDeploymentReconciler) Reconcile(ctx context.Context, req ...@@ -266,15 +364,13 @@ func (r *DynamoComponentDeploymentReconciler) Reconcile(ctx context.Context, req
instanceID: &i, instanceID: &i,
}) })
}) })
if err != nil { if err != nil {
return ctrl.Result{}, err return ComponentReconcileResult{}, fmt.Errorf("failed to sync the LeaderWorkerSet: %w", err)
} }
if modified_ { if leaderWorkerSetModified || volcanoPodGroupModified {
anyModified = true anyModified = true
} }
leaderWorkerSets = append(leaderWorkerSets, lwsObj) leaderWorkerSets = append(leaderWorkerSets, lwsObj)
} }
...@@ -293,12 +389,12 @@ func (r *DynamoComponentDeploymentReconciler) Reconcile(ctx context.Context, req ...@@ -293,12 +389,12 @@ func (r *DynamoComponentDeploymentReconciler) Reconcile(ctx context.Context, req
if k8serrors.IsNotFound(err) { if k8serrors.IsNotFound(err) {
break break
} }
return ctrl.Result{}, err return ComponentReconcileResult{}, fmt.Errorf("failed to get the LeaderWorkerSet for deletion: %w", err)
} }
err = r.Delete(ctx, lwsToDelete) err = r.Delete(ctx, lwsToDelete)
if err != nil { if err != nil {
return ctrl.Result{}, err return ComponentReconcileResult{}, fmt.Errorf("failed to delete the LeaderWorkerSet: %w", err)
} }
podGroupName := nextLWSName podGroupName := nextLWSName
...@@ -310,126 +406,94 @@ func (r *DynamoComponentDeploymentReconciler) Reconcile(ctx context.Context, req ...@@ -310,126 +406,94 @@ func (r *DynamoComponentDeploymentReconciler) Reconcile(ctx context.Context, req
if err != nil { if err != nil {
if !k8serrors.IsNotFound(err) { if !k8serrors.IsNotFound(err) {
logs.Error(err, "Failed to get PodGroup for deletion", "podGroupName", podGroupName) logger.Error(err, "Failed to get PodGroup for deletion", "podGroupName", podGroupName)
} }
} else { } else {
err = r.Delete(ctx, podGroupToDelete) err = r.Delete(ctx, podGroupToDelete)
if err != nil { if err != nil {
logs.Error(err, "Failed to delete PodGroup", "podGroupName", podGroupName) logger.Error(err, "Failed to delete PodGroup", "podGroupName", podGroupName)
} }
} }
anyModified = true anyModified = true
} }
modified = anyModified allReady := true
lwsReplicaStatuses := []v1alpha1.ServiceReplicaStatus{}
} else { for _, leaderWorkerSet := range leaderWorkerSets {
modified_, obj, err := r.createOrUpdateOrDeleteDeployments(ctx, generateResourceOption{ if !IsLeaderWorkerSetReady(leaderWorkerSet) {
dynamoComponentDeployment: dynamoComponentDeployment, allReady = false
})
if err != nil {
return ctrl.Result{}, err
} }
lwsReplicaStatuses = append(lwsReplicaStatuses, getLeaderWorkerSetReplicasStatus(leaderWorkerSet))
if modified_ {
modified = true
} }
deployment = obj if allReady {
} return ComponentReconcileResult{
modified: anyModified,
status: metav1.ConditionTrue,
reason: "AllLeaderWorkerSetsReady",
message: "All LeaderWorkerSets are ready",
serviceReplicaStatus: combineLWSReplicaStatuses(lwsReplicaStatuses),
}, nil
}
return ComponentReconcileResult{
modified: anyModified,
status: metav1.ConditionFalse,
reason: "SomeLeaderWorkerSetsNotReady",
message: "Some LeaderWorkerSets are not ready",
serviceReplicaStatus: combineLWSReplicaStatuses(lwsReplicaStatuses),
}, nil
// create or update api-server service }
modified_, err := r.createOrUpdateOrDeleteServices(ctx, generateResourceOption{
dynamoComponentDeployment: dynamoComponentDeployment,
})
if err != nil {
return
}
if modified_ { func (r *DynamoComponentDeploymentReconciler) setStatusConditionAndServiceReplicaStatus(ctx context.Context, dynamoComponentDeployment *v1alpha1.DynamoComponentDeployment, componentReconcileResult ComponentReconcileResult) error {
modified = true condition := metav1.Condition{
Type: v1alpha1.DynamoGraphDeploymentConditionTypeAvailable,
Status: componentReconcileResult.status,
Reason: componentReconcileResult.reason,
Message: componentReconcileResult.message,
} }
// create or update headless service for model endpoint discovery meta.SetStatusCondition(&dynamoComponentDeployment.Status.Conditions, condition)
componentMap := map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{ dynamoComponentDeployment.Status.Service = componentReconcileResult.serviceReplicaStatus
dynamoComponentDeployment.Name: &dynamoComponentDeployment.Spec.DynamoComponentDeploymentSharedSpec,
}
if err := dynamo.ReconcileModelServicesForComponents(
ctx,
r,
dynamoComponentDeployment,
componentMap,
dynamoComponentDeployment.Namespace,
); err != nil {
logs.Error(err, "Failed to reconcile model service")
return ctrl.Result{}, err
}
// create or update api-server ingresses err := r.Status().Update(ctx, dynamoComponentDeployment)
modified_, err = r.createOrUpdateOrDeleteIngress(ctx, generateResourceOption{
dynamoComponentDeployment: dynamoComponentDeployment,
})
if err != nil { if err != nil {
return return fmt.Errorf("failed to update DynamoComponentDeployment status: %w", err)
} }
return nil
}
if modified_ { func getLeaderWorkerSetReplicasStatus(leaderWorkerSet *leaderworkersetv1.LeaderWorkerSet) v1alpha1.ServiceReplicaStatus {
modified = true return v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindLeaderWorkerSet,
ComponentName: leaderWorkerSet.Name,
Replicas: leaderWorkerSet.Status.Replicas,
UpdatedReplicas: leaderWorkerSet.Status.UpdatedReplicas,
ReadyReplicas: &leaderWorkerSet.Status.ReadyReplicas,
} }
}
if !modified { func combineLWSReplicaStatuses(serviceReplicaStatuses []v1alpha1.ServiceReplicaStatus) v1alpha1.ServiceReplicaStatus {
r.Recorder.Eventf(dynamoComponentDeployment, corev1.EventTypeNormal, "UpdateDynamoGraphDeployment", "No changes to dynamo deployment %s", dynamoComponentDeployment.Name) if len(serviceReplicaStatuses) == 0 {
return v1alpha1.ServiceReplicaStatus{}
} }
logs.Info("Finished reconciling.") firstServiceStatus := serviceReplicaStatuses[0]
r.Recorder.Eventf(dynamoComponentDeployment, corev1.EventTypeNormal, "Update", "All resources updated!") var readyReplicas int32 = 0
if firstServiceStatus.ReadyReplicas != nil {
if dynamoComponentDeployment.IsMultinode() { readyReplicas = *firstServiceStatus.ReadyReplicas
err = r.computeAvailableStatusConditionForLeaderWorkerSets(ctx, req, leaderWorkerSets)
} else {
err = r.computeAvailableStatusCondition(ctx, req, deployment)
} }
for _, serviceReplicaStatus := range serviceReplicaStatuses[1:] {
return firstServiceStatus.Replicas += serviceReplicaStatus.Replicas
} firstServiceStatus.UpdatedReplicas += serviceReplicaStatus.UpdatedReplicas
if serviceReplicaStatus.ReadyReplicas != nil {
// computeAvailableStatusConditionForLeaderWorkerSet updates the status condition based on LeaderWorkerSet readiness readyReplicas += *serviceReplicaStatus.ReadyReplicas
func (r *DynamoComponentDeploymentReconciler) computeAvailableStatusConditionForLeaderWorkerSets(ctx context.Context, req ctrl.Request, leaderWorkerSets []*leaderworkersetv1.LeaderWorkerSet) error {
logs := log.FromContext(ctx)
allReady := true
for _, leaderWorkerSet := range leaderWorkerSets {
if !IsLeaderWorkerSetReady(leaderWorkerSet) {
allReady = false
break
} }
} }
if allReady { firstServiceStatus.ReadyReplicas = &readyReplicas
logs.Info("All LeaderWorkerSets are ready. Setting available status condition to true.") return firstServiceStatus
_, err := r.setStatusConditions(ctx, req,
metav1.Condition{
Type: v1alpha1.DynamoGraphDeploymentConditionTypeAvailable,
Status: metav1.ConditionTrue,
Reason: "AllLeaderWorkerSetsReady",
Message: "All LeaderWorkerSets are ready",
},
)
return err
} else {
logs.Info("Not all LeaderWorkerSets are ready. Setting available status condition to false.")
_, err := r.setStatusConditions(ctx, req,
metav1.Condition{
Type: v1alpha1.DynamoGraphDeploymentConditionTypeAvailable,
Status: metav1.ConditionFalse,
Reason: "LeaderWorkerSetsNotReady",
Message: "Not all LeaderWorkerSets are ready",
},
)
return err
}
} }
// IsLeaderWorkerSetReady determines if a LeaderWorkerSet is fully ready and available // IsLeaderWorkerSetReady determines if a LeaderWorkerSet is fully ready and available
...@@ -672,33 +736,6 @@ func (r *DynamoComponentDeploymentReconciler) FinalizeResource(ctx context.Conte ...@@ -672,33 +736,6 @@ func (r *DynamoComponentDeploymentReconciler) FinalizeResource(ctx context.Conte
return nil return nil
} }
func (r *DynamoComponentDeploymentReconciler) computeAvailableStatusCondition(ctx context.Context, req ctrl.Request, deployment *appsv1.Deployment) error {
logs := log.FromContext(ctx)
if IsDeploymentReady(deployment) {
logs.Info("Deployment is ready. Setting available status condition to true.")
_, err := r.setStatusConditions(ctx, req,
metav1.Condition{
Type: v1alpha1.DynamoGraphDeploymentConditionTypeAvailable,
Status: metav1.ConditionTrue,
Reason: "DeploymentReady",
Message: "Deployment is ready",
},
)
return err
} else {
logs.Info("Deployment is not ready. Setting available status condition to false.")
_, err := r.setStatusConditions(ctx, req,
metav1.Condition{
Type: v1alpha1.DynamoGraphDeploymentConditionTypeAvailable,
Status: metav1.ConditionFalse,
Reason: "DeploymentNotReady",
Message: "Deployment is not ready",
},
)
return err
}
}
// IsDeploymentReady determines if a Kubernetes Deployment is fully ready and available. // IsDeploymentReady determines if a Kubernetes Deployment is fully ready and available.
// It checks various status fields to ensure all replicas are available and the deployment // It checks various status fields to ensure all replicas are available and the deployment
// configuration has been fully applied. // configuration has been fully applied.
......
...@@ -42,6 +42,7 @@ import ( ...@@ -42,6 +42,7 @@ import (
"k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
"k8s.io/utils/ptr" "k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/client/fake"
leaderworkersetv1 "sigs.k8s.io/lws/api/leaderworkerset/v1" leaderworkersetv1 "sigs.k8s.io/lws/api/leaderworkerset/v1"
...@@ -1255,3 +1256,664 @@ func TestDynamoComponentDeploymentReconciler_createOrUpdateOrDeleteDeployments_R ...@@ -1255,3 +1256,664 @@ func TestDynamoComponentDeploymentReconciler_createOrUpdateOrDeleteDeployments_R
g.Expect(reconciledDeployment.Spec.Replicas).NotTo(gomega.BeNil()) g.Expect(reconciledDeployment.Spec.Replicas).NotTo(gomega.BeNil())
g.Expect(*reconciledDeployment.Spec.Replicas).To(gomega.Equal(int32(1)), "Deployment should have been reconciled back to 1 replica") g.Expect(*reconciledDeployment.Spec.Replicas).To(gomega.Equal(int32(1)), "Deployment should have been reconciled back to 1 replica")
} }
func Test_reconcileLeaderWorkerSetResources(t *testing.T) {
ctx := context.Background()
tests := []struct {
name string
replicas int32
existingLeaderWorkerSets []*leaderworkersetv1.LeaderWorkerSet
wantComponentReconcileResult ComponentReconcileResult
}{
{
name: "singular LWS replica ready",
replicas: 1,
existingLeaderWorkerSets: []*leaderworkersetv1.LeaderWorkerSet{
{
ObjectMeta: metav1.ObjectMeta{
Name: "test-component-0",
Namespace: "default",
},
Spec: leaderworkersetv1.LeaderWorkerSetSpec{
Replicas: ptr.To(int32(1)),
},
Status: leaderworkersetv1.LeaderWorkerSetStatus{
ReadyReplicas: 1,
UpdatedReplicas: 1,
Replicas: 1,
Conditions: []metav1.Condition{
{
Type: string(leaderworkersetv1.LeaderWorkerSetAvailable),
Status: metav1.ConditionTrue,
},
},
},
},
},
wantComponentReconcileResult: ComponentReconcileResult{
modified: true,
status: metav1.ConditionTrue,
reason: "AllLeaderWorkerSetsReady",
message: "All LeaderWorkerSets are ready",
serviceReplicaStatus: v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindLeaderWorkerSet,
ComponentName: "test-component-0",
ReadyReplicas: ptr.To(int32(1)),
UpdatedReplicas: 1,
Replicas: 1,
},
},
},
{
name: "multiple LWS replicas - at least one is unready",
replicas: 3,
existingLeaderWorkerSets: []*leaderworkersetv1.LeaderWorkerSet{
{
ObjectMeta: metav1.ObjectMeta{
Name: "test-component-0",
Namespace: "default",
},
Spec: leaderworkersetv1.LeaderWorkerSetSpec{
Replicas: ptr.To(int32(1)),
},
Status: leaderworkersetv1.LeaderWorkerSetStatus{
ReadyReplicas: 1,
Replicas: 1,
UpdatedReplicas: 1,
Conditions: []metav1.Condition{
{
Type: string(leaderworkersetv1.LeaderWorkerSetAvailable),
Status: metav1.ConditionTrue,
},
},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "test-component-1",
Namespace: "default",
},
Spec: leaderworkersetv1.LeaderWorkerSetSpec{
Replicas: ptr.To(int32(1)),
},
Status: leaderworkersetv1.LeaderWorkerSetStatus{
ReadyReplicas: 0, // Not ready
Replicas: 1,
UpdatedReplicas: 0,
Conditions: []metav1.Condition{
{
Type: string(leaderworkersetv1.LeaderWorkerSetAvailable),
Status: metav1.ConditionFalse,
},
},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "test-component-2",
Namespace: "default",
},
Spec: leaderworkersetv1.LeaderWorkerSetSpec{
Replicas: ptr.To(int32(1)),
},
Status: leaderworkersetv1.LeaderWorkerSetStatus{
ReadyReplicas: 1,
Replicas: 1,
UpdatedReplicas: 1,
Conditions: []metav1.Condition{
{
Type: string(leaderworkersetv1.LeaderWorkerSetAvailable),
Status: metav1.ConditionTrue,
},
},
},
},
},
wantComponentReconcileResult: ComponentReconcileResult{
modified: true,
status: metav1.ConditionFalse,
reason: "SomeLeaderWorkerSetsNotReady",
message: "Some LeaderWorkerSets are not ready",
serviceReplicaStatus: v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindLeaderWorkerSet,
ComponentName: "test-component-0",
ReadyReplicas: ptr.To(int32(2)),
UpdatedReplicas: 2,
Replicas: 3,
},
},
},
{
name: "multiple LWS replicas - all ready",
replicas: 3,
existingLeaderWorkerSets: []*leaderworkersetv1.LeaderWorkerSet{
{
ObjectMeta: metav1.ObjectMeta{
Name: "test-component-0",
Namespace: "default",
},
Spec: leaderworkersetv1.LeaderWorkerSetSpec{
Replicas: ptr.To(int32(1)),
},
Status: leaderworkersetv1.LeaderWorkerSetStatus{
ReadyReplicas: 1,
Replicas: 1,
UpdatedReplicas: 1,
Conditions: []metav1.Condition{
{
Type: string(leaderworkersetv1.LeaderWorkerSetAvailable),
Status: metav1.ConditionTrue,
},
},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "test-component-1",
Namespace: "default",
},
Spec: leaderworkersetv1.LeaderWorkerSetSpec{
Replicas: ptr.To(int32(1)),
},
Status: leaderworkersetv1.LeaderWorkerSetStatus{
ReadyReplicas: 1,
Replicas: 1,
UpdatedReplicas: 1,
Conditions: []metav1.Condition{
{
Type: string(leaderworkersetv1.LeaderWorkerSetAvailable),
Status: metav1.ConditionTrue,
},
},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "test-component-2",
Namespace: "default",
},
Spec: leaderworkersetv1.LeaderWorkerSetSpec{
Replicas: ptr.To(int32(1)),
},
Status: leaderworkersetv1.LeaderWorkerSetStatus{
ReadyReplicas: 1,
Replicas: 1,
UpdatedReplicas: 1,
Conditions: []metav1.Condition{
{
Type: string(leaderworkersetv1.LeaderWorkerSetAvailable),
Status: metav1.ConditionTrue,
},
},
},
},
},
wantComponentReconcileResult: ComponentReconcileResult{
modified: true,
status: metav1.ConditionTrue,
reason: "AllLeaderWorkerSetsReady",
message: "All LeaderWorkerSets are ready",
serviceReplicaStatus: v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindLeaderWorkerSet,
ComponentName: "test-component-0",
ReadyReplicas: ptr.To(int32(3)),
UpdatedReplicas: 3,
Replicas: 3,
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := gomega.NewGomegaWithT(t)
// Create a scheme with necessary types
s := scheme.Scheme
err := v1alpha1.AddToScheme(s)
g.Expect(err).NotTo(gomega.HaveOccurred())
err = leaderworkersetv1.AddToScheme(s)
g.Expect(err).NotTo(gomega.HaveOccurred())
err = volcanov1beta1.AddToScheme(s)
g.Expect(err).NotTo(gomega.HaveOccurred())
// Create DynamoComponentDeployment
dcd := &v1alpha1.DynamoComponentDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-component",
Namespace: "default",
},
Spec: v1alpha1.DynamoComponentDeploymentSpec{
BackendFramework: string(dynamo.BackendFrameworkVLLM),
DynamoComponentDeploymentSharedSpec: v1alpha1.DynamoComponentDeploymentSharedSpec{
ServiceName: "test-service",
DynamoNamespace: ptr.To("default"),
ComponentType: string(commonconsts.ComponentTypeDecode),
Replicas: &tt.replicas,
Multinode: &v1alpha1.MultinodeSpec{
NodeCount: 2,
},
Resources: &v1alpha1.Resources{
Limits: &v1alpha1.ResourceItem{
GPU: "1",
},
},
ExtraPodSpec: &v1alpha1.ExtraPodSpec{
MainContainer: &corev1.Container{
Image: "test-image:latest",
Args: []string{
"--test-arg",
},
},
},
},
},
}
// Prepare objects for fake client
var objects []client.Object
objects = append(objects, dcd)
for _, lws := range tt.existingLeaderWorkerSets {
objects = append(objects, lws)
}
// Add a mock ServiceAccount that the generateLeaderWorkerSet function needs
objects = append(objects, &corev1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
Name: "default-test-sa",
Namespace: "default",
Labels: map[string]string{
commonconsts.KubeLabelDynamoComponentPod: commonconsts.KubeLabelValueTrue,
},
},
})
// Set up fake client with the DCD and existing LWS objects
fakeKubeClient := fake.NewClientBuilder().
WithScheme(s).
WithObjects(objects...).
WithStatusSubresource(objects...).
Build()
// Set up reconciler
recorder := record.NewFakeRecorder(100)
reconciler := &DynamoComponentDeploymentReconciler{
Client: fakeKubeClient,
Recorder: recorder,
Config: controller_common.Config{},
EtcdStorage: nil,
DockerSecretRetriever: &mockDockerSecretRetriever{
GetSecretsFunc: func(namespace, imageName string) ([]string, error) {
return []string{}, nil
},
},
}
// Call the function under test
result, err := reconciler.reconcileLeaderWorkerSetResources(ctx, dcd)
g.Expect(err).NotTo(gomega.HaveOccurred())
// Assert the ComponentReconcileResult
g.Expect(result).To(gomega.Equal(tt.wantComponentReconcileResult))
})
}
}
func Test_reconcileDeploymentResources(t *testing.T) {
ctx := context.Background()
tests := []struct {
name string
replicas int32
existingDeployment *appsv1.Deployment
wantComponentReconcileResult ComponentReconcileResult
}{
{
name: "ready deployment",
replicas: 2,
existingDeployment: &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-component",
Namespace: "default",
Generation: 1,
},
Spec: appsv1.DeploymentSpec{
Replicas: ptr.To(int32(2)),
},
Status: appsv1.DeploymentStatus{
ObservedGeneration: 1,
Replicas: 2,
UpdatedReplicas: 2,
ReadyReplicas: 2,
AvailableReplicas: 2,
Conditions: []appsv1.DeploymentCondition{
{
Type: appsv1.DeploymentAvailable,
Status: corev1.ConditionTrue,
},
},
},
},
wantComponentReconcileResult: ComponentReconcileResult{
modified: true,
status: metav1.ConditionTrue,
reason: "DeploymentReady",
message: "Deployment is ready",
serviceReplicaStatus: v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindDeployment,
ComponentName: "test-component",
Replicas: 2,
UpdatedReplicas: 2,
ReadyReplicas: ptr.To(int32(2)),
AvailableReplicas: ptr.To(int32(2)),
},
},
},
{
name: "unready deployment",
replicas: 1,
existingDeployment: &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-component",
Namespace: "default",
Generation: 1,
},
Spec: appsv1.DeploymentSpec{
Replicas: ptr.To(int32(1)),
},
Status: appsv1.DeploymentStatus{
ObservedGeneration: 1,
Replicas: 1,
UpdatedReplicas: 1,
ReadyReplicas: 1,
AvailableReplicas: 0, // Not available
Conditions: []appsv1.DeploymentCondition{
{
Type: appsv1.DeploymentAvailable,
Status: corev1.ConditionFalse,
},
},
},
},
wantComponentReconcileResult: ComponentReconcileResult{
modified: true,
status: metav1.ConditionFalse,
reason: "DeploymentNotReady",
message: "Deployment is not ready",
serviceReplicaStatus: v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindDeployment,
ComponentName: "test-component",
Replicas: 1,
UpdatedReplicas: 1,
ReadyReplicas: ptr.To(int32(1)),
AvailableReplicas: ptr.To(int32(0)),
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := gomega.NewGomegaWithT(t)
// Create a scheme with necessary types
s := scheme.Scheme
err := v1alpha1.AddToScheme(s)
g.Expect(err).NotTo(gomega.HaveOccurred())
err = appsv1.AddToScheme(s)
g.Expect(err).NotTo(gomega.HaveOccurred())
err = corev1.AddToScheme(s)
g.Expect(err).NotTo(gomega.HaveOccurred())
// Create DynamoComponentDeployment
dcd := &v1alpha1.DynamoComponentDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-component",
Namespace: "default",
},
Spec: v1alpha1.DynamoComponentDeploymentSpec{
BackendFramework: string(dynamo.BackendFrameworkVLLM),
DynamoComponentDeploymentSharedSpec: v1alpha1.DynamoComponentDeploymentSharedSpec{
ServiceName: "test-service",
DynamoNamespace: ptr.To("default"),
ComponentType: string(commonconsts.ComponentTypeDecode),
Replicas: &tt.replicas,
ExtraPodSpec: &v1alpha1.ExtraPodSpec{
MainContainer: &corev1.Container{
Image: "test-image:latest",
Args: []string{
"--test-arg",
},
},
},
},
},
}
// Prepare objects for fake client
var objects []client.Object
objects = append(objects, dcd)
if tt.existingDeployment != nil {
objects = append(objects, tt.existingDeployment)
}
// Set up fake client with the DCD and existing Deployment
fakeKubeClient := fake.NewClientBuilder().
WithScheme(s).
WithObjects(objects...).
WithStatusSubresource(objects...).
Build()
// Set up reconciler
recorder := record.NewFakeRecorder(100)
reconciler := &DynamoComponentDeploymentReconciler{
Client: fakeKubeClient,
Recorder: recorder,
Config: controller_common.Config{},
EtcdStorage: nil,
DockerSecretRetriever: &mockDockerSecretRetriever{
GetSecretsFunc: func(namespace, imageName string) ([]string, error) {
return []string{}, nil
},
},
}
// Call the function under test
result, err := reconciler.reconcileDeploymentResources(ctx, dcd)
g.Expect(err).NotTo(gomega.HaveOccurred())
// Assert the ComponentReconcileResult
g.Expect(result).To(gomega.Equal(tt.wantComponentReconcileResult))
})
}
}
func Test_setStatusConditionAndServiceReplicaStatus(t *testing.T) {
ctx := context.Background()
tests := []struct {
name string
componentReconcileResult ComponentReconcileResult
wantConditionStatus metav1.ConditionStatus
wantConditionReason string
wantConditionMessage string
wantServiceReplicaStatus v1alpha1.ServiceReplicaStatus
}{
{
name: "deployment backed DCD that is unready",
componentReconcileResult: ComponentReconcileResult{
modified: true,
status: metav1.ConditionFalse,
reason: "DeploymentNotReady",
message: "Deployment is not ready",
serviceReplicaStatus: v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindDeployment,
ComponentName: "test-component",
Replicas: 1,
UpdatedReplicas: 1,
ReadyReplicas: ptr.To(int32(1)),
AvailableReplicas: ptr.To(int32(0)),
},
},
wantConditionStatus: metav1.ConditionFalse,
wantConditionReason: "DeploymentNotReady",
wantConditionMessage: "Deployment is not ready",
wantServiceReplicaStatus: v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindDeployment,
ComponentName: "test-component",
Replicas: 1,
UpdatedReplicas: 1,
ReadyReplicas: ptr.To(int32(1)),
AvailableReplicas: ptr.To(int32(0)),
},
},
{
name: "deployment backed DCD that is ready",
componentReconcileResult: ComponentReconcileResult{
modified: true,
status: metav1.ConditionTrue,
reason: "DeploymentReady",
message: "Deployment is ready",
serviceReplicaStatus: v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindDeployment,
ComponentName: "test-component",
Replicas: 2,
UpdatedReplicas: 2,
ReadyReplicas: ptr.To(int32(2)),
AvailableReplicas: ptr.To(int32(2)),
},
},
wantConditionStatus: metav1.ConditionTrue,
wantConditionReason: "DeploymentReady",
wantConditionMessage: "Deployment is ready",
wantServiceReplicaStatus: v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindDeployment,
ComponentName: "test-component",
Replicas: 2,
UpdatedReplicas: 2,
ReadyReplicas: ptr.To(int32(2)),
AvailableReplicas: ptr.To(int32(2)),
},
},
{
name: "LWS backed DCD that is unready",
componentReconcileResult: ComponentReconcileResult{
modified: true,
status: metav1.ConditionFalse,
reason: "SomeLeaderWorkerSetsNotReady",
message: "Some LeaderWorkerSets are not ready",
serviceReplicaStatus: v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindLeaderWorkerSet,
ComponentName: "test-component-0",
Replicas: 3,
UpdatedReplicas: 2,
ReadyReplicas: ptr.To(int32(2)),
},
},
wantConditionStatus: metav1.ConditionFalse,
wantConditionReason: "SomeLeaderWorkerSetsNotReady",
wantConditionMessage: "Some LeaderWorkerSets are not ready",
wantServiceReplicaStatus: v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindLeaderWorkerSet,
ComponentName: "test-component-0",
Replicas: 3,
UpdatedReplicas: 2,
ReadyReplicas: ptr.To(int32(2)),
},
},
{
name: "LWS backed DCD that is ready",
componentReconcileResult: ComponentReconcileResult{
modified: true,
status: metav1.ConditionTrue,
reason: "AllLeaderWorkerSetsReady",
message: "All LeaderWorkerSets are ready",
serviceReplicaStatus: v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindLeaderWorkerSet,
ComponentName: "test-component-0",
Replicas: 3,
UpdatedReplicas: 3,
ReadyReplicas: ptr.To(int32(3)),
},
},
wantConditionStatus: metav1.ConditionTrue,
wantConditionReason: "AllLeaderWorkerSetsReady",
wantConditionMessage: "All LeaderWorkerSets are ready",
wantServiceReplicaStatus: v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindLeaderWorkerSet,
ComponentName: "test-component-0",
Replicas: 3,
UpdatedReplicas: 3,
ReadyReplicas: ptr.To(int32(3)),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := gomega.NewGomegaWithT(t)
// Create a scheme with necessary types
s := scheme.Scheme
err := v1alpha1.AddToScheme(s)
g.Expect(err).NotTo(gomega.HaveOccurred())
// Create DynamoComponentDeployment
dcd := &v1alpha1.DynamoComponentDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-component",
Namespace: "default",
},
Spec: v1alpha1.DynamoComponentDeploymentSpec{
BackendFramework: string(dynamo.BackendFrameworkVLLM),
DynamoComponentDeploymentSharedSpec: v1alpha1.DynamoComponentDeploymentSharedSpec{
ServiceName: "test-service",
DynamoNamespace: ptr.To("default"),
ComponentType: string(commonconsts.ComponentTypeDecode),
},
},
}
// Set up fake client with the DCD
fakeKubeClient := fake.NewClientBuilder().
WithScheme(s).
WithObjects(dcd).
WithStatusSubresource(dcd).
Build()
// Set up reconciler
recorder := record.NewFakeRecorder(100)
reconciler := &DynamoComponentDeploymentReconciler{
Client: fakeKubeClient,
Recorder: recorder,
}
// Create the request
req := ctrl.Request{
NamespacedName: client.ObjectKey{
Name: "test-component",
Namespace: "default",
},
}
err = reconciler.setStatusConditionAndServiceReplicaStatus(ctx, dcd, tt.componentReconcileResult)
g.Expect(err).NotTo(gomega.HaveOccurred())
// Fetch the updated DCD to verify status was set
updatedDCD := &v1alpha1.DynamoComponentDeployment{}
err = fakeKubeClient.Get(ctx, req.NamespacedName, updatedDCD)
g.Expect(err).NotTo(gomega.HaveOccurred())
// Assert the status condition
g.Expect(updatedDCD.Status.Conditions).To(gomega.HaveLen(1))
condition := updatedDCD.Status.Conditions[0]
g.Expect(condition.Type).To(gomega.Equal(v1alpha1.DynamoGraphDeploymentConditionTypeAvailable))
g.Expect(condition.Status).To(gomega.Equal(tt.wantConditionStatus))
g.Expect(condition.Reason).To(gomega.Equal(tt.wantConditionReason))
g.Expect(condition.Message).To(gomega.Equal(tt.wantConditionMessage))
// Assert the service replica status
g.Expect(updatedDCD.Status.Service).To(gomega.Equal(tt.wantServiceReplicaStatus))
})
}
}
...@@ -20,6 +20,7 @@ package controller ...@@ -20,6 +20,7 @@ package controller
import ( import (
"context" "context"
"fmt" "fmt"
"sort"
"strings" "strings"
grovev1alpha1 "github.com/NVIDIA/grove/operator/api/core/v1alpha1" grovev1alpha1 "github.com/NVIDIA/grove/operator/api/core/v1alpha1"
...@@ -45,9 +46,10 @@ import ( ...@@ -45,9 +46,10 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/predicate"
"github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/v1alpha1"
nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/v1alpha1" nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/v1alpha1"
"github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/consts" "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/consts"
commonController "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/controller_common" commoncontroller "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/controller_common"
"github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/dynamo" "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/dynamo"
webhookvalidation "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/webhook/validation" webhookvalidation "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/webhook/validation"
rbacv1 "k8s.io/api/rbac/v1" rbacv1 "k8s.io/api/rbac/v1"
...@@ -75,7 +77,7 @@ type rbacManager interface { ...@@ -75,7 +77,7 @@ type rbacManager interface {
// DynamoGraphDeploymentReconciler reconciles a DynamoGraphDeployment object // DynamoGraphDeploymentReconciler reconciles a DynamoGraphDeployment object
type DynamoGraphDeploymentReconciler struct { type DynamoGraphDeploymentReconciler struct {
client.Client client.Client
Config commonController.Config Config commoncontroller.Config
Recorder record.EventRecorder Recorder record.EventRecorder
DockerSecretRetriever dockerSecretRetriever DockerSecretRetriever dockerSecretRetriever
ScaleClient scale.ScalesGetter ScaleClient scale.ScalesGetter
...@@ -174,7 +176,7 @@ func (r *DynamoGraphDeploymentReconciler) Reconcile(ctx context.Context, req ctr ...@@ -174,7 +176,7 @@ func (r *DynamoGraphDeploymentReconciler) Reconcile(ctx context.Context, req ctr
} }
} }
deleted, err := commonController.HandleFinalizer(ctx, dynamoDeployment, r.Client, r) deleted, err := commoncontroller.HandleFinalizer(ctx, dynamoDeployment, r.Client, r)
if err != nil { if err != nil {
logger.Error(err, "failed to handle the finalizer") logger.Error(err, "failed to handle the finalizer")
reason = "failed_to_handle_the_finalizer" reason = "failed_to_handle_the_finalizer"
...@@ -183,7 +185,12 @@ func (r *DynamoGraphDeploymentReconciler) Reconcile(ctx context.Context, req ctr ...@@ -183,7 +185,12 @@ func (r *DynamoGraphDeploymentReconciler) Reconcile(ctx context.Context, req ctr
if deleted { if deleted {
return ctrl.Result{}, nil return ctrl.Result{}, nil
} }
state, reason, message, err = r.reconcileResources(ctx, dynamoDeployment) reconcileResult, err := r.reconcileResources(ctx, dynamoDeployment)
state = reconcileResult.State
reason = reconcileResult.Reason
message = reconcileResult.Message
dynamoDeployment.Status.Services = reconcileResult.ServiceStatus
if err != nil { if err != nil {
logger.Error(err, "failed to reconcile the resources") logger.Error(err, "failed to reconcile the resources")
reason = "failed_to_reconcile_the_resources" reason = "failed_to_reconcile_the_resources"
...@@ -195,18 +202,26 @@ func (r *DynamoGraphDeploymentReconciler) Reconcile(ctx context.Context, req ctr ...@@ -195,18 +202,26 @@ func (r *DynamoGraphDeploymentReconciler) Reconcile(ctx context.Context, req ctr
type Resource interface { type Resource interface {
IsReady() (ready bool, reason string) IsReady() (ready bool, reason string)
GetName() string GetName() string
GetServiceStatuses() map[string]v1alpha1.ServiceReplicaStatus
}
type ReconcileResult struct {
State State
Reason Reason
Message Message
ServiceStatus map[string]nvidiacomv1alpha1.ServiceReplicaStatus
} }
func (r *DynamoGraphDeploymentReconciler) reconcileResources(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) (State, Reason, Message, error) { func (r *DynamoGraphDeploymentReconciler) reconcileResources(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) (ReconcileResult, error) {
logger := log.FromContext(ctx) logger := log.FromContext(ctx)
// Ensure planner RBAC exists in cluster-wide mode // Ensure planner RBAC exists in cluster-wide mode
if r.Config.RestrictedNamespace == "" { if r.Config.RestrictedNamespace == "" {
if r.RBACManager == nil { if r.RBACManager == nil {
return "", "", "", fmt.Errorf("RBAC manager not initialized in cluster-wide mode") return ReconcileResult{}, fmt.Errorf("RBAC manager not initialized in cluster-wide mode")
} }
if r.Config.RBAC.PlannerClusterRoleName == "" { if r.Config.RBAC.PlannerClusterRoleName == "" {
return "", "", "", fmt.Errorf("planner ClusterRole name is required in cluster-wide mode") return ReconcileResult{}, fmt.Errorf("planner ClusterRole name is required in cluster-wide mode")
} }
if err := r.RBACManager.EnsureServiceAccountWithRBAC( if err := r.RBACManager.EnsureServiceAccountWithRBAC(
ctx, ctx,
...@@ -215,7 +230,7 @@ func (r *DynamoGraphDeploymentReconciler) reconcileResources(ctx context.Context ...@@ -215,7 +230,7 @@ func (r *DynamoGraphDeploymentReconciler) reconcileResources(ctx context.Context
r.Config.RBAC.PlannerClusterRoleName, r.Config.RBAC.PlannerClusterRoleName,
); err != nil { ); err != nil {
logger.Error(err, "Failed to ensure planner RBAC") logger.Error(err, "Failed to ensure planner RBAC")
return "", "", "", fmt.Errorf("failed to ensure planner RBAC: %w", err) return ReconcileResult{}, fmt.Errorf("failed to ensure planner RBAC: %w", err)
} }
} }
...@@ -223,21 +238,21 @@ func (r *DynamoGraphDeploymentReconciler) reconcileResources(ctx context.Context ...@@ -223,21 +238,21 @@ func (r *DynamoGraphDeploymentReconciler) reconcileResources(ctx context.Context
err := r.reconcilePVCs(ctx, dynamoDeployment) err := r.reconcilePVCs(ctx, dynamoDeployment)
if err != nil { if err != nil {
logger.Error(err, "Failed to reconcile top-level PVCs") logger.Error(err, "Failed to reconcile top-level PVCs")
return "", "", "", fmt.Errorf("failed to reconcile top-level PVCs: %w", err) return ReconcileResult{}, fmt.Errorf("failed to reconcile top-level PVCs: %w", err)
} }
// Reconcile DynamoGraphDeploymentScalingAdapters for each service // Reconcile DynamoGraphDeploymentScalingAdapters for each service
err = r.reconcileScalingAdapters(ctx, dynamoDeployment) err = r.reconcileScalingAdapters(ctx, dynamoDeployment)
if err != nil { if err != nil {
logger.Error(err, "Failed to reconcile scaling adapters") logger.Error(err, "Failed to reconcile scaling adapters")
return "", "", "", fmt.Errorf("failed to reconcile scaling adapters: %w", err) return ReconcileResult{}, fmt.Errorf("failed to reconcile scaling adapters: %w", err)
} }
// Reconcile the SA, Role and RoleBinding if k8s discovery is enabled // Reconcile the SA, Role and RoleBinding if k8s discovery is enabled
err = r.reconcileK8sDiscoveryResources(ctx, dynamoDeployment) err = r.reconcileK8sDiscoveryResources(ctx, dynamoDeployment)
if err != nil { if err != nil {
logger.Error(err, "Failed to reconcile K8s discovery resources") logger.Error(err, "Failed to reconcile K8s discovery resources")
return "", "", "", fmt.Errorf("failed to reconcile K8s discovery resources: %w", err) return ReconcileResult{}, fmt.Errorf("failed to reconcile K8s discovery resources: %w", err)
} }
// Orchestrator selection via single boolean annotation: nvidia.com/enable-grove // Orchestrator selection via single boolean annotation: nvidia.com/enable-grove
...@@ -256,7 +271,7 @@ func (r *DynamoGraphDeploymentReconciler) reconcileResources(ctx context.Context ...@@ -256,7 +271,7 @@ func (r *DynamoGraphDeploymentReconciler) reconcileResources(ctx context.Context
err := r.MPISecretReplicator.Replicate(ctx, dynamoDeployment.Namespace) err := r.MPISecretReplicator.Replicate(ctx, dynamoDeployment.Namespace)
if err != nil { if err != nil {
logger.Error(err, "Failed to replicate MPI secret", "namespace", dynamoDeployment.Namespace) logger.Error(err, "Failed to replicate MPI secret", "namespace", dynamoDeployment.Namespace)
return "", "", "", fmt.Errorf("failed to replicate MPI secret: %w", err) return ReconcileResult{}, fmt.Errorf("failed to replicate MPI secret: %w", err)
} }
} }
...@@ -267,7 +282,7 @@ func (r *DynamoGraphDeploymentReconciler) reconcileResources(ctx context.Context ...@@ -267,7 +282,7 @@ func (r *DynamoGraphDeploymentReconciler) reconcileResources(ctx context.Context
if hasMultinode && !r.Config.LWS.Enabled { if hasMultinode && !r.Config.LWS.Enabled {
err := fmt.Errorf("no multinode orchestrator available") err := fmt.Errorf("no multinode orchestrator available")
logger.Error(err, err.Error(), "hasMultinode", hasMultinode, "lwsEnabled", r.Config.LWS.Enabled, "enableGrove", enableGrove, "groveEnabled", r.Config.Grove.Enabled) logger.Error(err, err.Error(), "hasMultinode", hasMultinode, "lwsEnabled", r.Config.LWS.Enabled, "enableGrove", enableGrove, "groveEnabled", r.Config.Grove.Enabled)
return "", "", "", err return ReconcileResult{}, fmt.Errorf("failed to reconcile Dynamo components deployments: %w", err)
} }
logger.Info("Reconciling Dynamo components deployments", "hasMultinode", hasMultinode, "lwsEnabled", r.Config.LWS.Enabled, "enableGrove", enableGrove, "groveEnabled", r.Config.Grove.Enabled) logger.Info("Reconciling Dynamo components deployments", "hasMultinode", hasMultinode, "lwsEnabled", r.Config.LWS.Enabled, "enableGrove", enableGrove, "groveEnabled", r.Config.Grove.Enabled)
return r.reconcileDynamoComponentsDeployments(ctx, dynamoDeployment) return r.reconcileDynamoComponentsDeployments(ctx, dynamoDeployment)
...@@ -289,7 +304,7 @@ func (r *DynamoGraphDeploymentReconciler) scaleGroveResource(ctx context.Context ...@@ -289,7 +304,7 @@ func (r *DynamoGraphDeploymentReconciler) scaleGroveResource(ctx context.Context
} }
// Use the generic scaling function // Use the generic scaling function
err := commonController.ScaleResource(ctx, r.ScaleClient, gvr, namespace, resourceName, newReplicas) err := commoncontroller.ScaleResource(ctx, r.ScaleClient, gvr, namespace, resourceName, newReplicas)
if err != nil { if err != nil {
if errors.IsNotFound(err) { if errors.IsNotFound(err) {
// Resource doesn't exist yet - this is normal during initial creation when Grove is still creating the resources asynchronously // Resource doesn't exist yet - this is normal during initial creation when Grove is still creating the resources asynchronously
...@@ -300,6 +315,40 @@ func (r *DynamoGraphDeploymentReconciler) scaleGroveResource(ctx context.Context ...@@ -300,6 +315,40 @@ func (r *DynamoGraphDeploymentReconciler) scaleGroveResource(ctx context.Context
return err return err
} }
func (r *DynamoGraphDeploymentReconciler) reconcileGrovePodCliqueSet(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) (*commoncontroller.Resource, error) {
logger := log.FromContext(ctx)
// generate the dynamoComponentsDeployments from the config
grovePodCliqueSet, err := dynamo.GenerateGrovePodCliqueSet(ctx, dynamoDeployment, r.Config, r.DockerSecretRetriever)
if err != nil {
logger.Error(err, "failed to generate the Grove GangSet")
return nil, fmt.Errorf("failed to generate the Grove GangSet: %w", err)
}
_, syncedGrovePodCliqueSet, err := commoncontroller.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*grovev1alpha1.PodCliqueSet, bool, error) {
return grovePodCliqueSet, false, nil
})
if err != nil {
logger.Error(err, "failed to sync the Grove GangSet")
return nil, fmt.Errorf("failed to sync the Grove GangSet: %w", err)
}
syncedGrovePodCliqueSetAsResource, err := commoncontroller.NewResourceWithServiceStatuses(
syncedGrovePodCliqueSet,
func() (bool, string, map[string]v1alpha1.ServiceReplicaStatus) {
// Grove readiness: all underlying PodCliques and PodCliqueScalingGroups have replicas == availableReplicas
allComponentsReady, reason, serviceStatuses := dynamo.GetComponentReadinessAndServiceReplicaStatuses(ctx, r.Client, dynamoDeployment)
if !allComponentsReady {
return false, reason, serviceStatuses
}
return true, "", serviceStatuses
},
)
if err != nil {
logger.Error(err, "failed to create the Grove PodClique Set resource")
return nil, fmt.Errorf("failed to create the Grove PodClique Set resource: %w", err)
}
return syncedGrovePodCliqueSetAsResource, nil
}
// reconcileGroveScaling handles scaling operations for Grove resources based on service replica changes // reconcileGroveScaling handles scaling operations for Grove resources based on service replica changes
func (r *DynamoGraphDeploymentReconciler) reconcileGroveScaling(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) error { func (r *DynamoGraphDeploymentReconciler) reconcileGroveScaling(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) error {
logger := log.FromContext(ctx) logger := log.FromContext(ctx)
...@@ -348,38 +397,19 @@ func (r *DynamoGraphDeploymentReconciler) reconcileGroveScaling(ctx context.Cont ...@@ -348,38 +397,19 @@ func (r *DynamoGraphDeploymentReconciler) reconcileGroveScaling(ctx context.Cont
return nil return nil
} }
func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) (State, Reason, Message, error) { func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) (ReconcileResult, error) {
logger := log.FromContext(ctx) logger := log.FromContext(ctx)
// generate the dynamoComponentsDeployments from the config grovePodCliqueSetAsResource, err := r.reconcileGrovePodCliqueSet(ctx, dynamoDeployment)
groveGangSet, err := dynamo.GenerateGrovePodCliqueSet(ctx, dynamoDeployment, r.Config, r.DockerSecretRetriever)
if err != nil {
logger.Error(err, "failed to generate the Grove GangSet")
return "", "", "", fmt.Errorf("failed to generate the Grove GangSet: %w", err)
}
_, syncedGroveGangSet, err := commonController.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*grovev1alpha1.PodCliqueSet, bool, error) {
return groveGangSet, false, nil
})
if err != nil { if err != nil {
logger.Error(err, "failed to sync the Grove GangSet") logger.Error(err, "failed to reconcile the Grove PodClique Set")
return "", "", "", fmt.Errorf("failed to sync the Grove GangSet: %w", err) return ReconcileResult{}, fmt.Errorf("failed to reconcile the Grove PodClique Set: %w", err)
} }
groveGangSetAsResource := commonController.WrapResource(
syncedGroveGangSet,
func() (bool, string) {
// Grove readiness: all underlying PodCliques and PodCliqueScalingGroups have replicas == availableReplicas
allComponentsReady, reason := dynamo.EvaluateAllComponentsReady(ctx, r.Client, dynamoDeployment)
if !allComponentsReady {
return false, reason
}
return true, ""
},
)
// Handle Grove scaling operations after structural changes // Handle Grove scaling operations after structural changes
if err := r.reconcileGroveScaling(ctx, dynamoDeployment); err != nil { if err := r.reconcileGroveScaling(ctx, dynamoDeployment); err != nil {
logger.Error(err, "failed to reconcile Grove scaling") logger.Error(err, "failed to reconcile Grove scaling")
return "", "", "", fmt.Errorf("failed to reconcile Grove scaling: %w", err) return ReconcileResult{}, fmt.Errorf("failed to reconcile Grove scaling: %w", err)
} }
// Reconcile headless services for model endpoint discovery // Reconcile headless services for model endpoint discovery
...@@ -391,10 +421,10 @@ func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Co ...@@ -391,10 +421,10 @@ func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Co
dynamoDeployment.Namespace, dynamoDeployment.Namespace,
); err != nil { ); err != nil {
logger.Error(err, "failed to reconcile model services") logger.Error(err, "failed to reconcile model services")
return "", "", "", fmt.Errorf("failed to reconcile model services: %w", err) return ReconcileResult{}, fmt.Errorf("failed to reconcile model services: %w", err)
} }
resources := []Resource{groveGangSetAsResource} resources := []Resource{grovePodCliqueSetAsResource}
for componentName, component := range dynamoDeployment.Spec.Services { for componentName, component := range dynamoDeployment.Spec.Services {
// if k8s discovery is enabled, create a service for each component // if k8s discovery is enabled, create a service for each component
...@@ -404,21 +434,26 @@ func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Co ...@@ -404,21 +434,26 @@ func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Co
mainComponentService, err := dynamo.GenerateComponentService(ctx, dynamoDeployment, component, componentName, isK8sDiscoveryEnabled) mainComponentService, err := dynamo.GenerateComponentService(ctx, dynamoDeployment, component, componentName, isK8sDiscoveryEnabled)
if err != nil { if err != nil {
logger.Error(err, "failed to generate the main component service") logger.Error(err, "failed to generate the main component service")
return "", "", "", fmt.Errorf("failed to generate the main component service: %w", err) return ReconcileResult{}, fmt.Errorf("failed to generate the main component service: %w", err)
} }
_, syncedMainComponentService, err := commonController.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*corev1.Service, bool, error) { _, syncedMainComponentService, err := commoncontroller.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*corev1.Service, bool, error) {
return mainComponentService, false, nil return mainComponentService, false, nil
}) })
if err != nil { if err != nil {
logger.Error(err, "failed to sync the main component service") logger.Error(err, "failed to sync the main component service")
return "", "", "", fmt.Errorf("failed to sync the main component service: %w", err) return ReconcileResult{}, fmt.Errorf("failed to sync the main component service: %w", err)
} }
mainComponentServiceAsResource := commonController.WrapResource(syncedMainComponentService, if syncedMainComponentService != nil {
mainComponentServiceAsResource, err := commoncontroller.NewResource(syncedMainComponentService,
func() (bool, string) { func() (bool, string) {
return true, "" return true, ""
}) })
if err != nil {
return ReconcileResult{}, fmt.Errorf("failed to sync the main component service: %w", err)
}
resources = append(resources, mainComponentServiceAsResource) resources = append(resources, mainComponentServiceAsResource)
} }
}
if component.ComponentType == consts.ComponentTypeFrontend { if component.ComponentType == consts.ComponentTypeFrontend {
// generate the main component ingress // generate the main component ingress
...@@ -427,7 +462,7 @@ func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Co ...@@ -427,7 +462,7 @@ func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Co
ingressSpec = *component.Ingress ingressSpec = *component.Ingress
} }
mainComponentIngress := dynamo.GenerateComponentIngress(ctx, dynamo.GetDynamoComponentName(dynamoDeployment, componentName), dynamoDeployment.Namespace, ingressSpec) mainComponentIngress := dynamo.GenerateComponentIngress(ctx, dynamo.GetDynamoComponentName(dynamoDeployment, componentName), dynamoDeployment.Namespace, ingressSpec)
_, syncedMainComponentIngress, err := commonController.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*networkingv1.Ingress, bool, error) { _, syncedMainComponentIngress, err := commoncontroller.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*networkingv1.Ingress, bool, error) {
if !ingressSpec.Enabled || ingressSpec.IngressControllerClassName == nil { if !ingressSpec.Enabled || ingressSpec.IngressControllerClassName == nil {
logger.Info("Ingress is not enabled") logger.Info("Ingress is not enabled")
return mainComponentIngress, true, nil return mainComponentIngress, true, nil
...@@ -436,16 +471,22 @@ func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Co ...@@ -436,16 +471,22 @@ func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Co
}) })
if err != nil { if err != nil {
logger.Error(err, "failed to sync the main component ingress") logger.Error(err, "failed to sync the main component ingress")
return "", "", "", fmt.Errorf("failed to sync the main component ingress: %w", err) return ReconcileResult{}, fmt.Errorf("failed to sync the main component ingress: %w", err)
} }
resources = append(resources, commonController.WrapResource(syncedMainComponentIngress, if syncedMainComponentIngress != nil {
mainComponentIngressAsResource, err := commoncontroller.NewResource(syncedMainComponentIngress,
func() (bool, string) { func() (bool, string) {
return true, "" return true, ""
})) })
if err != nil {
return ReconcileResult{}, fmt.Errorf("failed to create the main component ingress resource: %w", err)
}
resources = append(resources, mainComponentIngressAsResource)
}
// generate the main component virtual service // generate the main component virtual service
if r.Config.IngressConfig.UseVirtualService() { if r.Config.IngressConfig.UseVirtualService() {
mainComponentVirtualService := dynamo.GenerateComponentVirtualService(ctx, dynamo.GetDynamoComponentName(dynamoDeployment, componentName), dynamoDeployment.Namespace, ingressSpec) mainComponentVirtualService := dynamo.GenerateComponentVirtualService(ctx, dynamo.GetDynamoComponentName(dynamoDeployment, componentName), dynamoDeployment.Namespace, ingressSpec)
_, syncedMainComponentVirtualService, err := commonController.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*networkingv1beta1.VirtualService, bool, error) { _, syncedMainComponentVirtualService, err := commoncontroller.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*networkingv1beta1.VirtualService, bool, error) {
if !ingressSpec.IsVirtualServiceEnabled() { if !ingressSpec.IsVirtualServiceEnabled() {
logger.Info("VirtualService is not enabled") logger.Info("VirtualService is not enabled")
return mainComponentVirtualService, true, nil return mainComponentVirtualService, true, nil
...@@ -454,24 +495,41 @@ func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Co ...@@ -454,24 +495,41 @@ func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Co
}) })
if err != nil { if err != nil {
logger.Error(err, "failed to sync the main component virtual service") logger.Error(err, "failed to sync the main component virtual service")
return "", "", "", fmt.Errorf("failed to sync the main component virtual service: %w", err) return ReconcileResult{}, fmt.Errorf("failed to sync the main component virtual service: %w", err)
} }
resources = append(resources, commonController.WrapResource(syncedMainComponentVirtualService, if syncedMainComponentVirtualService != nil {
mainComponentVirtualServiceAsResource, err := commoncontroller.NewResource(syncedMainComponentVirtualService,
func() (bool, string) { func() (bool, string) {
return true, "" return true, ""
})) })
if err != nil {
return ReconcileResult{}, fmt.Errorf("failed to create the main component virtual service resource: %w", err)
}
resources = append(resources, mainComponentVirtualServiceAsResource)
} }
} }
} }
return r.checkResourcesReadiness(resources) }
return r.checkResourcesReadiness(resources), nil
} }
func (r *DynamoGraphDeploymentReconciler) checkResourcesReadiness(resources []Resource) (State, Reason, Message, error) { func (r *DynamoGraphDeploymentReconciler) checkResourcesReadiness(resources []Resource) ReconcileResult {
// Sort resources by name to ensure deterministic ordering
sort.Slice(resources, func(i, j int) bool {
return resources[i].GetName() < resources[j].GetName()
})
var notReadyReasons []string var notReadyReasons []string
notReadyResources := []string{} notReadyResources := []string{}
serviceStatuses := make(map[string]v1alpha1.ServiceReplicaStatus)
for _, resource := range resources { for _, resource := range resources {
ready, reason := resource.IsReady() ready, reason := resource.IsReady()
resourceServiceStatuses := resource.GetServiceStatuses()
for serviceName, serviceStatus := range resourceServiceStatuses {
serviceStatuses[serviceName] = serviceStatus
}
if !ready { if !ready {
notReadyResources = append(notReadyResources, resource.GetName()) notReadyResources = append(notReadyResources, resource.GetName())
notReadyReasons = append(notReadyReasons, fmt.Sprintf("%s: %s", resource.GetName(), reason)) notReadyReasons = append(notReadyReasons, fmt.Sprintf("%s: %s", resource.GetName(), reason))
...@@ -479,12 +537,22 @@ func (r *DynamoGraphDeploymentReconciler) checkResourcesReadiness(resources []Re ...@@ -479,12 +537,22 @@ func (r *DynamoGraphDeploymentReconciler) checkResourcesReadiness(resources []Re
} }
if len(notReadyResources) == 0 { if len(notReadyResources) == 0 {
return ReadyState, "all_resources_are_ready", Message("All resources are ready"), nil return ReconcileResult{
State: ReadyState,
Reason: "all_resources_are_ready",
Message: Message("All resources are ready"),
ServiceStatus: serviceStatuses,
}
}
return ReconcileResult{
State: PendingState,
Reason: "some_resources_are_not_ready",
Message: Message(fmt.Sprintf("Resources not ready: %s", strings.Join(notReadyReasons, "; "))),
ServiceStatus: serviceStatuses,
} }
return PendingState, "some_resources_are_not_ready", Message(fmt.Sprintf("Resources not ready: %s", strings.Join(notReadyReasons, "; "))), nil
} }
func (r *DynamoGraphDeploymentReconciler) reconcileDynamoComponentsDeployments(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) (State, Reason, Message, error) { func (r *DynamoGraphDeploymentReconciler) reconcileDynamoComponentsDeployments(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) (ReconcileResult, error) {
resources := []Resource{} resources := []Resource{}
logger := log.FromContext(ctx) logger := log.FromContext(ctx)
...@@ -493,23 +561,23 @@ func (r *DynamoGraphDeploymentReconciler) reconcileDynamoComponentsDeployments(c ...@@ -493,23 +561,23 @@ func (r *DynamoGraphDeploymentReconciler) reconcileDynamoComponentsDeployments(c
dynamoComponentsDeployments, err := dynamo.GenerateDynamoComponentsDeployments(ctx, dynamoDeployment, &defaultIngressSpec) dynamoComponentsDeployments, err := dynamo.GenerateDynamoComponentsDeployments(ctx, dynamoDeployment, &defaultIngressSpec)
if err != nil { if err != nil {
logger.Error(err, "failed to generate the DynamoComponentsDeployments") logger.Error(err, "failed to generate the DynamoComponentsDeployments")
return "", "", "", fmt.Errorf("failed to generate the DynamoComponentsDeployments: %w", err) return ReconcileResult{}, fmt.Errorf("failed to generate the DynamoComponentsDeployments: %w", err)
} }
// reconcile the dynamoComponentsDeployments // reconcile the dynamoComponentsDeployments
for serviceName, dynamoComponentDeployment := range dynamoComponentsDeployments { for serviceName, dynamoComponentDeployment := range dynamoComponentsDeployments {
logger.Info("Reconciling the DynamoComponentDeployment", "serviceName", serviceName, "dynamoComponentDeployment", dynamoComponentDeployment) logger.Info("Reconciling the DynamoComponentDeployment", "serviceName", serviceName, "dynamoComponentDeployment", dynamoComponentDeployment)
_, dynamoComponentDeployment, err = commonController.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*nvidiacomv1alpha1.DynamoComponentDeployment, bool, error) { _, dynamoComponentDeployment, err = commoncontroller.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*nvidiacomv1alpha1.DynamoComponentDeployment, bool, error) {
return dynamoComponentDeployment, false, nil return dynamoComponentDeployment, false, nil
}) })
if err != nil { if err != nil {
logger.Error(err, "failed to sync the DynamoComponentDeployment") logger.Error(err, "failed to sync the DynamoComponentDeployment")
return "", "", "", fmt.Errorf("failed to sync the DynamoComponentDeployment: %w", err) return ReconcileResult{}, fmt.Errorf("failed to sync the DynamoComponentDeployment: %w", err)
} }
resources = append(resources, dynamoComponentDeployment) resources = append(resources, dynamoComponentDeployment)
} }
return r.checkResourcesReadiness(resources) return r.checkResourcesReadiness(resources), nil
} }
// reconcilePVC reconciles a single top-level PVC defined in the DynamoGraphDeployment spec // reconcilePVC reconciles a single top-level PVC defined in the DynamoGraphDeployment spec
...@@ -559,7 +627,7 @@ func (r *DynamoGraphDeploymentReconciler) reconcileK8sDiscoveryResources(ctx con ...@@ -559,7 +627,7 @@ func (r *DynamoGraphDeploymentReconciler) reconcileK8sDiscoveryResources(ctx con
} }
serviceAccount := discovery.GetK8sDiscoveryServiceAccount(dynamoDeployment.Name, dynamoDeployment.Namespace) serviceAccount := discovery.GetK8sDiscoveryServiceAccount(dynamoDeployment.Name, dynamoDeployment.Namespace)
_, _, err := commonController.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*corev1.ServiceAccount, bool, error) { _, _, err := commoncontroller.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*corev1.ServiceAccount, bool, error) {
return serviceAccount, false, nil return serviceAccount, false, nil
}) })
if err != nil { if err != nil {
...@@ -568,7 +636,7 @@ func (r *DynamoGraphDeploymentReconciler) reconcileK8sDiscoveryResources(ctx con ...@@ -568,7 +636,7 @@ func (r *DynamoGraphDeploymentReconciler) reconcileK8sDiscoveryResources(ctx con
} }
role := discovery.GetK8sDiscoveryRole(dynamoDeployment.Name, dynamoDeployment.Namespace) role := discovery.GetK8sDiscoveryRole(dynamoDeployment.Name, dynamoDeployment.Namespace)
_, _, err = commonController.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*rbacv1.Role, bool, error) { _, _, err = commoncontroller.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*rbacv1.Role, bool, error) {
return role, false, nil return role, false, nil
}) })
if err != nil { if err != nil {
...@@ -577,7 +645,7 @@ func (r *DynamoGraphDeploymentReconciler) reconcileK8sDiscoveryResources(ctx con ...@@ -577,7 +645,7 @@ func (r *DynamoGraphDeploymentReconciler) reconcileK8sDiscoveryResources(ctx con
} }
roleBinding := discovery.GetK8sDiscoveryRoleBinding(dynamoDeployment.Name, dynamoDeployment.Namespace) roleBinding := discovery.GetK8sDiscoveryRoleBinding(dynamoDeployment.Name, dynamoDeployment.Namespace)
_, _, err = commonController.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*rbacv1.RoleBinding, bool, error) { _, _, err = commoncontroller.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*rbacv1.RoleBinding, bool, error) {
return roleBinding, false, nil return roleBinding, false, nil
}) })
if err != nil { if err != nil {
...@@ -634,7 +702,7 @@ func (r *DynamoGraphDeploymentReconciler) reconcileScalingAdapters(ctx context.C ...@@ -634,7 +702,7 @@ func (r *DynamoGraphDeploymentReconciler) reconcileScalingAdapters(ctx context.C
// Use SyncResource to handle creation/updates/deletion // Use SyncResource to handle creation/updates/deletion
// When toDelete=true, SyncResource will delete the existing resource if it exists // When toDelete=true, SyncResource will delete the existing resource if it exists
_, _, err := commonController.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*nvidiacomv1alpha1.DynamoGraphDeploymentScalingAdapter, bool, error) { _, _, err := commoncontroller.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*nvidiacomv1alpha1.DynamoGraphDeploymentScalingAdapter, bool, error) {
adapterName := generateAdapterName(dynamoDeployment.Name, serviceName) adapterName := generateAdapterName(dynamoDeployment.Name, serviceName)
adapter := &nvidiacomv1alpha1.DynamoGraphDeploymentScalingAdapter{ adapter := &nvidiacomv1alpha1.DynamoGraphDeploymentScalingAdapter{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
...@@ -731,7 +799,7 @@ func (r *DynamoGraphDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) err ...@@ -731,7 +799,7 @@ func (r *DynamoGraphDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) err
UpdateFunc: func(de event.UpdateEvent) bool { return true }, UpdateFunc: func(de event.UpdateEvent) bool { return true },
GenericFunc: func(ge event.GenericEvent) bool { return true }, GenericFunc: func(ge event.GenericEvent) bool { return true },
})). })).
WithEventFilter(commonController.EphemeralDeploymentEventFilter(r.Config)) WithEventFilter(commoncontroller.EphemeralDeploymentEventFilter(r.Config))
if r.Config.Grove.Enabled { if r.Config.Grove.Enabled {
ctrlBuilder = ctrlBuilder.Owns(&grovev1alpha1.PodCliqueSet{}, builder.WithPredicates(predicate.Funcs{ ctrlBuilder = ctrlBuilder.Owns(&grovev1alpha1.PodCliqueSet{}, builder.WithPredicates(predicate.Funcs{
// ignore creation cause we don't want to be called again after we create the pod gang set // ignore creation cause we don't want to be called again after we create the pod gang set
......
...@@ -21,11 +21,18 @@ import ( ...@@ -21,11 +21,18 @@ import (
"context" "context"
"testing" "testing"
grovev1alpha1 "github.com/NVIDIA/grove/operator/api/core/v1alpha1"
"github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/v1alpha1" "github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/v1alpha1"
"github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/consts" "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/consts"
commonconsts "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/consts"
"github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/controller_common"
"github.com/onsi/gomega"
autoscalingv1 "k8s.io/api/autoscaling/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/scale"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
"k8s.io/utils/ptr" "k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client"
...@@ -252,7 +259,7 @@ func TestDynamoGraphDeploymentReconciler_reconcileScalingAdapters(t *testing.T) ...@@ -252,7 +259,7 @@ func TestDynamoGraphDeploymentReconciler_reconcileScalingAdapters(t *testing.T)
}, },
expectedAdapterCount: 1, expectedAdapterCount: 1,
expectedAdapters: map[string]int32{ expectedAdapters: map[string]int32{
"my-dgd-myservice": 1, // lowercase "my-dgd-myservice": 1,
}, },
}, },
} }
...@@ -319,3 +326,907 @@ func TestDynamoGraphDeploymentReconciler_reconcileScalingAdapters(t *testing.T) ...@@ -319,3 +326,907 @@ func TestDynamoGraphDeploymentReconciler_reconcileScalingAdapters(t *testing.T)
}) })
} }
} }
// mockScaleClient implements scale.ScalesGetter for testing
type mockScaleClient struct{}
func (m *mockScaleClient) Scales(namespace string) scale.ScaleInterface {
return &mockScaleInterface{}
}
// mockScaleInterface implements scale.ScaleInterface for testing
type mockScaleInterface struct{}
func (m *mockScaleInterface) Get(ctx context.Context, resource schema.GroupResource, name string, opts metav1.GetOptions) (*autoscalingv1.Scale, error) {
// Return a dummy scale object - we don't actually need scaling in the test
return &autoscalingv1.Scale{}, nil
}
func (m *mockScaleInterface) Update(ctx context.Context, resource schema.GroupResource, scale *autoscalingv1.Scale, opts metav1.UpdateOptions) (*autoscalingv1.Scale, error) {
// Return success without actually doing anything
return scale, nil
}
func (m *mockScaleInterface) Patch(ctx context.Context, gvr schema.GroupVersionResource, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions) (*autoscalingv1.Scale, error) {
// Return a dummy scale object
return &autoscalingv1.Scale{}, nil
}
func Test_reconcileGroveResources(t *testing.T) {
ctx := context.Background()
tests := []struct {
name string
dgdSpec v1alpha1.DynamoGraphDeploymentSpec
existingGroveResources []client.Object
wantReconcileResult ReconcileResult
}{
{
name: "singular frontend service with 2 replicas - creates a PodClique with 2 replicas - ready",
dgdSpec: v1alpha1.DynamoGraphDeploymentSpec{
BackendFramework: "vllm",
Services: map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
"frontend": {
ComponentType: string(commonconsts.ComponentTypeFrontend),
Replicas: ptr.To(int32(2)),
},
},
},
existingGroveResources: []client.Object{
&grovev1alpha1.PodClique{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dgd-0-frontend",
Namespace: "default",
},
Spec: grovev1alpha1.PodCliqueSpec{
Replicas: 2,
},
Status: grovev1alpha1.PodCliqueStatus{
Replicas: 2,
UpdatedReplicas: 2,
ReadyReplicas: 2,
},
},
},
wantReconcileResult: ReconcileResult{
State: ReadyState,
Reason: "all_resources_are_ready",
Message: "All resources are ready",
ServiceStatus: map[string]v1alpha1.ServiceReplicaStatus{
"frontend": {
ComponentKind: v1alpha1.ComponentKindPodClique,
ComponentName: "test-dgd-0-frontend",
Replicas: 2,
UpdatedReplicas: 2,
ReadyReplicas: ptr.To(int32(2)),
},
},
},
},
{
name: "frontend service with 1 replica, decode service with 2 replicas - 2 PodCliques - one unready",
dgdSpec: v1alpha1.DynamoGraphDeploymentSpec{
BackendFramework: "vllm",
Services: map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
"frontend": {
ComponentType: string(commonconsts.ComponentTypeFrontend),
Replicas: ptr.To(int32(1)),
},
"decode": {
ComponentType: string(commonconsts.ComponentTypeDecode),
Replicas: ptr.To(int32(2)),
},
},
},
existingGroveResources: []client.Object{
&grovev1alpha1.PodClique{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dgd-0-frontend",
Namespace: "default",
},
Spec: grovev1alpha1.PodCliqueSpec{
Replicas: 1,
},
Status: grovev1alpha1.PodCliqueStatus{
Replicas: 1,
UpdatedReplicas: 1,
ReadyReplicas: 1,
},
},
&grovev1alpha1.PodClique{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dgd-0-decode",
Namespace: "default",
},
Spec: grovev1alpha1.PodCliqueSpec{
Replicas: 2,
},
Status: grovev1alpha1.PodCliqueStatus{
Replicas: 2,
UpdatedReplicas: 1,
ReadyReplicas: 1, // Only 1 ready, but 2 desired
},
},
},
wantReconcileResult: ReconcileResult{
State: PendingState,
Reason: "some_resources_are_not_ready",
Message: Message("Resources not ready: test-dgd: podclique/test-dgd-0-decode: desired=2, ready=1"),
ServiceStatus: map[string]v1alpha1.ServiceReplicaStatus{
"frontend": {
ComponentKind: v1alpha1.ComponentKindPodClique,
ComponentName: "test-dgd-0-frontend",
Replicas: 1,
UpdatedReplicas: 1,
ReadyReplicas: ptr.To(int32(1)),
},
"decode": {
ComponentKind: v1alpha1.ComponentKindPodClique,
ComponentName: "test-dgd-0-decode",
Replicas: 2,
UpdatedReplicas: 1,
ReadyReplicas: ptr.To(int32(1)),
},
},
},
},
{
name: "decode worker multinode (PCSG), prefill worker multinode (PCSG) - both ready",
dgdSpec: v1alpha1.DynamoGraphDeploymentSpec{
BackendFramework: "vllm",
Services: map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
"decode": {
ComponentType: string(commonconsts.ComponentTypeDecode),
Replicas: ptr.To(int32(1)),
Multinode: &v1alpha1.MultinodeSpec{
NodeCount: 2,
},
},
"prefill": {
ComponentType: string(commonconsts.ComponentTypeWorker),
Replicas: ptr.To(int32(1)),
Multinode: &v1alpha1.MultinodeSpec{
NodeCount: 4,
},
},
},
},
existingGroveResources: []client.Object{
&grovev1alpha1.PodCliqueScalingGroup{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dgd-0-decode",
Namespace: "default",
},
Spec: grovev1alpha1.PodCliqueScalingGroupSpec{
Replicas: 1,
},
Status: grovev1alpha1.PodCliqueScalingGroupStatus{
Replicas: 1,
UpdatedReplicas: 1,
AvailableReplicas: 1,
},
},
&grovev1alpha1.PodCliqueScalingGroup{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dgd-0-prefill",
Namespace: "default",
},
Spec: grovev1alpha1.PodCliqueScalingGroupSpec{
Replicas: 1,
},
Status: grovev1alpha1.PodCliqueScalingGroupStatus{
Replicas: 1,
UpdatedReplicas: 1,
AvailableReplicas: 1,
},
},
},
wantReconcileResult: ReconcileResult{
State: ReadyState,
Reason: "all_resources_are_ready",
Message: "All resources are ready",
ServiceStatus: map[string]v1alpha1.ServiceReplicaStatus{
"decode": {
ComponentKind: v1alpha1.ComponentKindPodCliqueScalingGroup,
ComponentName: "test-dgd-0-decode",
Replicas: 1,
UpdatedReplicas: 1,
AvailableReplicas: ptr.To(int32(1)),
},
"prefill": {
ComponentKind: v1alpha1.ComponentKindPodCliqueScalingGroup,
ComponentName: "test-dgd-0-prefill",
Replicas: 1,
UpdatedReplicas: 1,
AvailableReplicas: ptr.To(int32(1)),
},
},
},
},
{
name: "frontend worker (PodClique), aggregated worker multinode (PCSG) - PCSG unready",
dgdSpec: v1alpha1.DynamoGraphDeploymentSpec{
BackendFramework: "vllm",
Services: map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
"frontend": {
ComponentType: string(commonconsts.ComponentTypeFrontend),
Replicas: ptr.To(int32(1)),
},
"aggregated": {
ComponentType: string(commonconsts.ComponentTypeWorker),
Replicas: ptr.To(int32(2)),
Multinode: &v1alpha1.MultinodeSpec{
NodeCount: 8,
},
},
},
},
existingGroveResources: []client.Object{
&grovev1alpha1.PodClique{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dgd-0-frontend",
Namespace: "default",
},
Spec: grovev1alpha1.PodCliqueSpec{
Replicas: 1,
},
Status: grovev1alpha1.PodCliqueStatus{
Replicas: 1,
UpdatedReplicas: 1,
ReadyReplicas: 1,
},
},
&grovev1alpha1.PodCliqueScalingGroup{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dgd-0-aggregated",
Namespace: "default",
},
Spec: grovev1alpha1.PodCliqueScalingGroupSpec{
Replicas: 2,
},
Status: grovev1alpha1.PodCliqueScalingGroupStatus{
Replicas: 2,
UpdatedReplicas: 2,
AvailableReplicas: 1, // Only 1 available, but 2 desired
},
},
},
wantReconcileResult: ReconcileResult{
State: PendingState,
Reason: "some_resources_are_not_ready",
Message: Message("Resources not ready: test-dgd: pcsg/test-dgd-0-aggregated: desired=2, available=1"),
ServiceStatus: map[string]v1alpha1.ServiceReplicaStatus{
"frontend": {
ComponentKind: v1alpha1.ComponentKindPodClique,
ComponentName: "test-dgd-0-frontend",
Replicas: 1,
UpdatedReplicas: 1,
ReadyReplicas: ptr.To(int32(1)),
},
"aggregated": {
ComponentKind: v1alpha1.ComponentKindPodCliqueScalingGroup,
ComponentName: "test-dgd-0-aggregated",
Replicas: 2,
UpdatedReplicas: 2,
AvailableReplicas: ptr.To(int32(1)),
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := gomega.NewGomegaWithT(t)
s := scheme.Scheme
err := v1alpha1.AddToScheme(s)
g.Expect(err).NotTo(gomega.HaveOccurred())
err = grovev1alpha1.AddToScheme(s)
g.Expect(err).NotTo(gomega.HaveOccurred())
dgd := &v1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dgd",
Namespace: "default",
},
Spec: tt.dgdSpec,
}
var objects []client.Object
objects = append(objects, dgd)
objects = append(objects, tt.existingGroveResources...)
fakeKubeClient := fake.NewClientBuilder().
WithScheme(s).
WithObjects(objects...).
WithStatusSubresource(objects...).
Build()
recorder := record.NewFakeRecorder(100)
reconciler := &DynamoGraphDeploymentReconciler{
Client: fakeKubeClient,
Recorder: recorder,
Config: controller_common.Config{},
ScaleClient: &mockScaleClient{},
DockerSecretRetriever: &mockDockerSecretRetriever{
GetSecretsFunc: func(namespace, imageName string) ([]string, error) {
return []string{}, nil
},
},
}
result, err := reconciler.reconcileGroveResources(ctx, dgd)
g.Expect(err).NotTo(gomega.HaveOccurred())
g.Expect(result).To(gomega.Equal(tt.wantReconcileResult))
})
}
}
func Test_reconcileDynamoComponentsDeployments(t *testing.T) {
ctx := context.Background()
tests := []struct {
name string
dgdSpec v1alpha1.DynamoGraphDeploymentSpec
existingDCDs []client.Object
wantReconcileResult ReconcileResult
}{
{
name: "single service - DCD ready (Available condition = True)",
dgdSpec: v1alpha1.DynamoGraphDeploymentSpec{
BackendFramework: "vllm",
Services: map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
"frontend": {
ServiceName: "frontend",
DynamoNamespace: ptr.To("default"),
ComponentType: string(commonconsts.ComponentTypeFrontend),
Replicas: ptr.To(int32(2)),
},
},
},
existingDCDs: []client.Object{
&v1alpha1.DynamoComponentDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dgd-frontend",
Namespace: "default",
},
Spec: v1alpha1.DynamoComponentDeploymentSpec{
BackendFramework: "vllm",
DynamoComponentDeploymentSharedSpec: v1alpha1.DynamoComponentDeploymentSharedSpec{
ServiceName: "frontend",
Replicas: ptr.To(int32(2)),
},
},
Status: v1alpha1.DynamoComponentDeploymentStatus{
Conditions: []metav1.Condition{
{
Type: v1alpha1.DynamoGraphDeploymentConditionTypeAvailable,
Status: metav1.ConditionTrue,
},
},
Service: v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindDeployment,
ComponentName: "test-dgd-frontend-deployment",
Replicas: 2,
UpdatedReplicas: 2,
ReadyReplicas: ptr.To(int32(2)),
AvailableReplicas: ptr.To(int32(2)),
},
},
},
},
wantReconcileResult: ReconcileResult{
State: ReadyState,
Reason: "all_resources_are_ready",
Message: "All resources are ready",
ServiceStatus: map[string]v1alpha1.ServiceReplicaStatus{
"frontend": {
ComponentKind: v1alpha1.ComponentKindDeployment,
ComponentName: "test-dgd-frontend-deployment",
Replicas: 2,
UpdatedReplicas: 2,
ReadyReplicas: ptr.To(int32(2)),
AvailableReplicas: ptr.To(int32(2)),
},
},
},
},
{
name: "single service - DCD not ready (Available condition = False)",
dgdSpec: v1alpha1.DynamoGraphDeploymentSpec{
BackendFramework: "vllm",
Services: map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
"frontend": {
ServiceName: "frontend",
DynamoNamespace: ptr.To("default"),
ComponentType: string(commonconsts.ComponentTypeFrontend),
Replicas: ptr.To(int32(2)),
},
},
},
existingDCDs: []client.Object{
&v1alpha1.DynamoComponentDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dgd-frontend",
Namespace: "default",
},
Spec: v1alpha1.DynamoComponentDeploymentSpec{
BackendFramework: "vllm",
DynamoComponentDeploymentSharedSpec: v1alpha1.DynamoComponentDeploymentSharedSpec{
ServiceName: "frontend",
Replicas: ptr.To(int32(2)),
},
},
Status: v1alpha1.DynamoComponentDeploymentStatus{
Conditions: []metav1.Condition{
{
Type: v1alpha1.DynamoGraphDeploymentConditionTypeAvailable,
Status: metav1.ConditionFalse,
},
},
Service: v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindDeployment,
ComponentName: "test-dgd-frontend-deployment",
Replicas: 2,
UpdatedReplicas: 1,
ReadyReplicas: ptr.To(int32(1)),
AvailableReplicas: ptr.To(int32(0)),
},
},
},
},
wantReconcileResult: ReconcileResult{
State: PendingState,
Reason: "some_resources_are_not_ready",
Message: "Resources not ready: test-dgd-frontend: Component deployment not ready - Available condition not true",
ServiceStatus: map[string]v1alpha1.ServiceReplicaStatus{
"frontend": {
ComponentKind: v1alpha1.ComponentKindDeployment,
ComponentName: "test-dgd-frontend-deployment",
Replicas: 2,
UpdatedReplicas: 1,
ReadyReplicas: ptr.To(int32(1)),
AvailableReplicas: ptr.To(int32(0)),
},
},
},
},
{
name: "multiple services - all DCDs ready",
dgdSpec: v1alpha1.DynamoGraphDeploymentSpec{
BackendFramework: "vllm",
Services: map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
"frontend": {
ServiceName: "frontend",
DynamoNamespace: ptr.To("default"),
ComponentType: string(commonconsts.ComponentTypeFrontend),
Replicas: ptr.To(int32(1)),
},
"decode": {
ServiceName: "decode",
DynamoNamespace: ptr.To("default"),
ComponentType: string(commonconsts.ComponentTypeDecode),
Replicas: ptr.To(int32(2)),
},
"prefill": {
ServiceName: "prefill",
DynamoNamespace: ptr.To("default"),
ComponentType: string(commonconsts.ComponentTypePrefill),
Replicas: ptr.To(int32(3)),
},
},
},
existingDCDs: []client.Object{
&v1alpha1.DynamoComponentDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dgd-frontend",
Namespace: "default",
},
Spec: v1alpha1.DynamoComponentDeploymentSpec{
BackendFramework: "vllm",
DynamoComponentDeploymentSharedSpec: v1alpha1.DynamoComponentDeploymentSharedSpec{
ServiceName: "frontend",
Replicas: ptr.To(int32(1)),
},
},
Status: v1alpha1.DynamoComponentDeploymentStatus{
Conditions: []metav1.Condition{
{
Type: v1alpha1.DynamoGraphDeploymentConditionTypeAvailable,
Status: metav1.ConditionTrue,
},
},
Service: v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindDeployment,
ComponentName: "test-dgd-frontend-deployment",
Replicas: 1,
UpdatedReplicas: 1,
ReadyReplicas: ptr.To(int32(1)),
AvailableReplicas: ptr.To(int32(1)),
},
},
},
&v1alpha1.DynamoComponentDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dgd-decode",
Namespace: "default",
},
Spec: v1alpha1.DynamoComponentDeploymentSpec{
BackendFramework: "vllm",
DynamoComponentDeploymentSharedSpec: v1alpha1.DynamoComponentDeploymentSharedSpec{
ServiceName: "decode",
Replicas: ptr.To(int32(2)),
},
},
Status: v1alpha1.DynamoComponentDeploymentStatus{
Conditions: []metav1.Condition{
{
Type: v1alpha1.DynamoGraphDeploymentConditionTypeAvailable,
Status: metav1.ConditionTrue,
},
},
Service: v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindDeployment,
ComponentName: "test-dgd-decode-deployment",
Replicas: 2,
UpdatedReplicas: 2,
ReadyReplicas: ptr.To(int32(2)),
AvailableReplicas: ptr.To(int32(2)),
},
},
},
&v1alpha1.DynamoComponentDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dgd-prefill",
Namespace: "default",
},
Spec: v1alpha1.DynamoComponentDeploymentSpec{
BackendFramework: "vllm",
DynamoComponentDeploymentSharedSpec: v1alpha1.DynamoComponentDeploymentSharedSpec{
ServiceName: "prefill",
Replicas: ptr.To(int32(3)),
},
},
Status: v1alpha1.DynamoComponentDeploymentStatus{
Conditions: []metav1.Condition{
{
Type: v1alpha1.DynamoGraphDeploymentConditionTypeAvailable,
Status: metav1.ConditionTrue,
},
},
Service: v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindDeployment,
ComponentName: "test-dgd-prefill-deployment",
Replicas: 3,
UpdatedReplicas: 3,
ReadyReplicas: ptr.To(int32(3)),
AvailableReplicas: ptr.To(int32(3)),
},
},
},
},
wantReconcileResult: ReconcileResult{
State: ReadyState,
Reason: "all_resources_are_ready",
Message: "All resources are ready",
ServiceStatus: map[string]v1alpha1.ServiceReplicaStatus{
"frontend": {
ComponentKind: v1alpha1.ComponentKindDeployment,
ComponentName: "test-dgd-frontend-deployment",
Replicas: 1,
UpdatedReplicas: 1,
ReadyReplicas: ptr.To(int32(1)),
AvailableReplicas: ptr.To(int32(1)),
},
"decode": {
ComponentKind: v1alpha1.ComponentKindDeployment,
ComponentName: "test-dgd-decode-deployment",
Replicas: 2,
UpdatedReplicas: 2,
ReadyReplicas: ptr.To(int32(2)),
AvailableReplicas: ptr.To(int32(2)),
},
"prefill": {
ComponentKind: v1alpha1.ComponentKindDeployment,
ComponentName: "test-dgd-prefill-deployment",
Replicas: 3,
UpdatedReplicas: 3,
ReadyReplicas: ptr.To(int32(3)),
AvailableReplicas: ptr.To(int32(3)),
},
},
},
},
{
name: "multiple services - some DCDs ready, some not ready",
dgdSpec: v1alpha1.DynamoGraphDeploymentSpec{
BackendFramework: "vllm",
Services: map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
"frontend": {
ServiceName: "frontend",
DynamoNamespace: ptr.To("default"),
ComponentType: string(commonconsts.ComponentTypeFrontend),
Replicas: ptr.To(int32(1)),
},
"decode": {
ServiceName: "decode",
DynamoNamespace: ptr.To("default"),
ComponentType: string(commonconsts.ComponentTypeDecode),
Replicas: ptr.To(int32(2)),
},
"prefill": {
ServiceName: "prefill",
DynamoNamespace: ptr.To("default"),
ComponentType: string(commonconsts.ComponentTypePrefill),
Replicas: ptr.To(int32(3)),
},
},
},
existingDCDs: []client.Object{
&v1alpha1.DynamoComponentDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dgd-frontend",
Namespace: "default",
},
Spec: v1alpha1.DynamoComponentDeploymentSpec{
BackendFramework: "vllm",
DynamoComponentDeploymentSharedSpec: v1alpha1.DynamoComponentDeploymentSharedSpec{
ServiceName: "frontend",
Replicas: ptr.To(int32(1)),
},
},
Status: v1alpha1.DynamoComponentDeploymentStatus{
Conditions: []metav1.Condition{
{
Type: v1alpha1.DynamoGraphDeploymentConditionTypeAvailable,
Status: metav1.ConditionTrue,
},
},
Service: v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindDeployment,
ComponentName: "test-dgd-frontend-deployment",
Replicas: 1,
UpdatedReplicas: 1,
ReadyReplicas: ptr.To(int32(1)),
AvailableReplicas: ptr.To(int32(1)),
},
},
},
&v1alpha1.DynamoComponentDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dgd-decode",
Namespace: "default",
},
Spec: v1alpha1.DynamoComponentDeploymentSpec{
BackendFramework: "vllm",
DynamoComponentDeploymentSharedSpec: v1alpha1.DynamoComponentDeploymentSharedSpec{
ServiceName: "decode",
Replicas: ptr.To(int32(2)),
},
},
Status: v1alpha1.DynamoComponentDeploymentStatus{
Conditions: []metav1.Condition{
{
Type: v1alpha1.DynamoGraphDeploymentConditionTypeAvailable,
Status: metav1.ConditionFalse,
},
},
Service: v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindDeployment,
ComponentName: "test-dgd-decode-deployment",
Replicas: 2,
UpdatedReplicas: 1,
ReadyReplicas: ptr.To(int32(1)),
AvailableReplicas: ptr.To(int32(0)),
},
},
},
&v1alpha1.DynamoComponentDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dgd-prefill",
Namespace: "default",
},
Spec: v1alpha1.DynamoComponentDeploymentSpec{
BackendFramework: "vllm",
DynamoComponentDeploymentSharedSpec: v1alpha1.DynamoComponentDeploymentSharedSpec{
ServiceName: "prefill",
Replicas: ptr.To(int32(3)),
},
},
Status: v1alpha1.DynamoComponentDeploymentStatus{
Conditions: []metav1.Condition{
{
Type: v1alpha1.DynamoGraphDeploymentConditionTypeAvailable,
Status: metav1.ConditionTrue,
},
},
Service: v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindDeployment,
ComponentName: "test-dgd-prefill-deployment",
Replicas: 3,
UpdatedReplicas: 3,
ReadyReplicas: ptr.To(int32(3)),
AvailableReplicas: ptr.To(int32(3)),
},
},
},
},
wantReconcileResult: ReconcileResult{
State: PendingState,
Reason: "some_resources_are_not_ready",
Message: "Resources not ready: test-dgd-decode: Component deployment not ready - Available condition not true",
ServiceStatus: map[string]v1alpha1.ServiceReplicaStatus{
"frontend": {
ComponentKind: v1alpha1.ComponentKindDeployment,
ComponentName: "test-dgd-frontend-deployment",
Replicas: 1,
UpdatedReplicas: 1,
ReadyReplicas: ptr.To(int32(1)),
AvailableReplicas: ptr.To(int32(1)),
},
"decode": {
ComponentKind: v1alpha1.ComponentKindDeployment,
ComponentName: "test-dgd-decode-deployment",
Replicas: 2,
UpdatedReplicas: 1,
ReadyReplicas: ptr.To(int32(1)),
AvailableReplicas: ptr.To(int32(0)),
},
"prefill": {
ComponentKind: v1alpha1.ComponentKindDeployment,
ComponentName: "test-dgd-prefill-deployment",
Replicas: 3,
UpdatedReplicas: 3,
ReadyReplicas: ptr.To(int32(3)),
AvailableReplicas: ptr.To(int32(3)),
},
},
},
},
{
name: "multiple services - all DCDs not ready",
dgdSpec: v1alpha1.DynamoGraphDeploymentSpec{
BackendFramework: "vllm",
Services: map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
"frontend": {
ServiceName: "frontend",
DynamoNamespace: ptr.To("default"),
ComponentType: string(commonconsts.ComponentTypeFrontend),
Replicas: ptr.To(int32(1)),
},
"decode": {
ServiceName: "decode",
DynamoNamespace: ptr.To("default"),
ComponentType: string(commonconsts.ComponentTypeDecode),
Replicas: ptr.To(int32(2)),
},
},
},
existingDCDs: []client.Object{
&v1alpha1.DynamoComponentDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dgd-frontend",
Namespace: "default",
},
Spec: v1alpha1.DynamoComponentDeploymentSpec{
BackendFramework: "vllm",
DynamoComponentDeploymentSharedSpec: v1alpha1.DynamoComponentDeploymentSharedSpec{
ServiceName: "frontend",
Replicas: ptr.To(int32(1)),
},
},
Status: v1alpha1.DynamoComponentDeploymentStatus{
Conditions: []metav1.Condition{
{
Type: v1alpha1.DynamoGraphDeploymentConditionTypeAvailable,
Status: metav1.ConditionFalse,
},
},
Service: v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindDeployment,
ComponentName: "test-dgd-frontend-deployment",
Replicas: 1,
UpdatedReplicas: 0,
ReadyReplicas: ptr.To(int32(0)),
AvailableReplicas: ptr.To(int32(0)),
},
},
},
&v1alpha1.DynamoComponentDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dgd-decode",
Namespace: "default",
},
Spec: v1alpha1.DynamoComponentDeploymentSpec{
BackendFramework: "vllm",
DynamoComponentDeploymentSharedSpec: v1alpha1.DynamoComponentDeploymentSharedSpec{
ServiceName: "decode",
Replicas: ptr.To(int32(2)),
},
},
Status: v1alpha1.DynamoComponentDeploymentStatus{
Conditions: []metav1.Condition{
{
Type: v1alpha1.DynamoGraphDeploymentConditionTypeAvailable,
Status: metav1.ConditionFalse,
},
},
Service: v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindDeployment,
ComponentName: "test-dgd-decode-deployment",
Replicas: 2,
UpdatedReplicas: 1,
ReadyReplicas: ptr.To(int32(1)),
AvailableReplicas: ptr.To(int32(0)),
},
},
},
},
wantReconcileResult: ReconcileResult{
State: PendingState,
Reason: "some_resources_are_not_ready",
Message: "Resources not ready: test-dgd-decode: Component deployment not ready - Available condition not true; test-dgd-frontend: Component deployment not ready - Available condition not true",
ServiceStatus: map[string]v1alpha1.ServiceReplicaStatus{
"frontend": {
ComponentKind: v1alpha1.ComponentKindDeployment,
ComponentName: "test-dgd-frontend-deployment",
Replicas: 1,
UpdatedReplicas: 0,
ReadyReplicas: ptr.To(int32(0)),
AvailableReplicas: ptr.To(int32(0)),
},
"decode": {
ComponentKind: v1alpha1.ComponentKindDeployment,
ComponentName: "test-dgd-decode-deployment",
Replicas: 2,
UpdatedReplicas: 1,
ReadyReplicas: ptr.To(int32(1)),
AvailableReplicas: ptr.To(int32(0)),
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := gomega.NewGomegaWithT(t)
s := scheme.Scheme
err := v1alpha1.AddToScheme(s)
g.Expect(err).NotTo(gomega.HaveOccurred())
dgd := &v1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dgd",
Namespace: "default",
},
Spec: tt.dgdSpec,
}
var objects []client.Object
objects = append(objects, dgd)
objects = append(objects, tt.existingDCDs...)
fakeKubeClient := fake.NewClientBuilder().
WithScheme(s).
WithObjects(objects...).
WithStatusSubresource(objects...).
Build()
recorder := record.NewFakeRecorder(100)
reconciler := &DynamoGraphDeploymentReconciler{
Client: fakeKubeClient,
Recorder: recorder,
Config: controller_common.Config{},
}
result, err := reconciler.reconcileDynamoComponentsDeployments(ctx, dgd)
g.Expect(err).NotTo(gomega.HaveOccurred())
g.Expect(result).To(gomega.Equal(tt.wantReconcileResult))
})
}
}
...@@ -515,21 +515,49 @@ func AppendUniqueImagePullSecrets(existing, additional []corev1.LocalObjectRefer ...@@ -515,21 +515,49 @@ func AppendUniqueImagePullSecrets(existing, additional []corev1.LocalObjectRefer
} }
type Resource struct { type Resource struct {
client.Object object client.Object
isReady func() (bool, string) isReady bool
readyReason string
serviceStatuses map[string]v1alpha1.ServiceReplicaStatus
} }
func WrapResource[T client.Object](resource T, isReady func() (bool, string)) *Resource { func NewResource[T client.Object](resource T, isReady func() (bool, string)) (*Resource, error) {
v := reflect.ValueOf(resource)
// handles untype nil and typed nil
if !v.IsValid() || v.IsNil() {
return nil, fmt.Errorf("resource is nil")
}
ready, reason := isReady()
return &Resource{ return &Resource{
Object: resource, object: resource,
isReady: isReady, isReady: ready,
readyReason: reason,
}, nil
}
func NewResourceWithServiceStatuses[T client.Object](resource T, isReadyAndServiceStatuses func() (bool, string, map[string]v1alpha1.ServiceReplicaStatus)) (*Resource, error) {
v := reflect.ValueOf(resource)
// handles untype nil and typed nil
if !v.IsValid() || v.IsNil() {
return nil, fmt.Errorf("resource is nil")
} }
ready, reason, serviceStatuses := isReadyAndServiceStatuses()
return &Resource{
object: resource,
isReady: ready,
readyReason: reason,
serviceStatuses: serviceStatuses,
}, nil
} }
func (r *Resource) IsReady() (bool, string) { func (r *Resource) IsReady() (bool, string) {
return r.isReady() return r.isReady, r.readyReason
} }
func (r *Resource) GetName() string { func (r *Resource) GetName() string {
return r.Object.GetName() return r.object.GetName()
}
func (r *Resource) GetServiceStatuses() map[string]v1alpha1.ServiceReplicaStatus {
return r.serviceStatuses
} }
...@@ -12,6 +12,7 @@ import ( ...@@ -12,6 +12,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/log"
"github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/v1alpha1"
nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/v1alpha1" nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/v1alpha1"
commonconsts "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/consts" commonconsts "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/consts"
"github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/controller_common" "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/controller_common"
...@@ -51,94 +52,116 @@ func (d *GroveMultinodeDeployer) GetHostNames(serviceName string, numberOfNodes ...@@ -51,94 +52,116 @@ func (d *GroveMultinodeDeployer) GetHostNames(serviceName string, numberOfNodes
return hostnames return hostnames
} }
// EvaluateAllComponentsReady determines if all Grove components are ready // GetComponentReadinessAndServiceReplicaStatuses determines if all Grove components are ready
// and returns the service replica statuses for each component.
// - PodCliques: spec.replicas == status.readyReplicas // - PodCliques: spec.replicas == status.readyReplicas
// - PodCliqueScalingGroups: spec.replicas == status.availableReplicas // - PodCliqueScalingGroups: spec.replicas == status.availableReplicas
func EvaluateAllComponentsReady(ctx context.Context, client client.Client, dgd *nvidiacomv1alpha1.DynamoGraphDeployment) (bool, string) { func GetComponentReadinessAndServiceReplicaStatuses(ctx context.Context, client client.Client, dgd *nvidiacomv1alpha1.DynamoGraphDeployment) (bool, string, map[string]v1alpha1.ServiceReplicaStatus) {
logger := log.FromContext(ctx) logger := log.FromContext(ctx)
var notReadyComponents []string var notReadyComponents []string
serviceStatuses := make(map[string]v1alpha1.ServiceReplicaStatus, len(dgd.Spec.Services))
for serviceName, component := range dgd.Spec.Services { for serviceName, component := range dgd.Spec.Services {
numberOfNodes := component.GetNumberOfNodes() isMultinode := component.GetNumberOfNodes() > 1
isMultinode := numberOfNodes > 1
resourceName := fmt.Sprintf("%s-0-%s", dgd.Name, strings.ToLower(serviceName)) resourceName := fmt.Sprintf("%s-0-%s", dgd.Name, strings.ToLower(serviceName))
if isMultinode { if isMultinode {
// Check PodCliqueScalingGroup: spec.replicas == status.availableReplicas // Check PodCliqueScalingGroup: spec.replicas == status.availableReplicas
if ok, reason := checkPCSGReady(ctx, client, resourceName, dgd.Namespace, logger); !ok { ok, reason, serviceStatus := checkPCSGReady(ctx, client, resourceName, dgd.Namespace, logger)
serviceStatuses[serviceName] = serviceStatus
if !ok {
notReadyComponents = append(notReadyComponents, fmt.Sprintf("pcsg/%s: %s", resourceName, reason)) notReadyComponents = append(notReadyComponents, fmt.Sprintf("pcsg/%s: %s", resourceName, reason))
} }
} else { } else {
// Check PodClique: spec.replicas == status.readyReplicas // Check PodClique: spec.replicas == status.readyReplicas
if ok, reason := checkPodCliqueReady(ctx, client, resourceName, dgd.Namespace, logger); !ok { ok, reason, serviceStatus := checkPodCliqueReady(ctx, client, resourceName, dgd.Namespace, logger)
serviceStatuses[serviceName] = serviceStatus
if !ok {
notReadyComponents = append(notReadyComponents, fmt.Sprintf("podclique/%s: %s", resourceName, reason)) notReadyComponents = append(notReadyComponents, fmt.Sprintf("podclique/%s: %s", resourceName, reason))
} }
} }
} }
if len(notReadyComponents) > 0 { if len(notReadyComponents) > 0 {
return false, strings.Join(notReadyComponents, "; ") return false, strings.Join(notReadyComponents, "; "), serviceStatuses
} }
return true, "" return true, "", serviceStatuses
} }
// checkPodCliqueReady checks if a PodClique has spec.replicas == status.readyReplicas // checkPodCliqueReady checks if a PodClique has spec.replicas == status.readyReplicas
func checkPodCliqueReady(ctx context.Context, client client.Client, resourceName, namespace string, logger logr.Logger) (bool, string) { func checkPodCliqueReady(ctx context.Context, client client.Client, resourceName, namespace string, logger logr.Logger) (bool, string, v1alpha1.ServiceReplicaStatus) {
podClique := &grovev1alpha1.PodClique{} podClique := &grovev1alpha1.PodClique{}
err := client.Get(ctx, types.NamespacedName{Name: resourceName, Namespace: namespace}, podClique) err := client.Get(ctx, types.NamespacedName{Name: resourceName, Namespace: namespace}, podClique)
if err != nil { if err != nil {
if errors.IsNotFound(err) { if errors.IsNotFound(err) {
logger.V(2).Info("PodClique not found", "resourceName", resourceName) logger.V(2).Info("PodClique not found", "resourceName", resourceName)
return false, "resource not found" return false, "resource not found", v1alpha1.ServiceReplicaStatus{}
} }
logger.V(1).Info("Failed to get PodClique", "error", err, "resourceName", resourceName) logger.V(1).Info("Failed to get PodClique", "error", err, "resourceName", resourceName)
return false, fmt.Sprintf("get error: %v", err) return false, fmt.Sprintf("get error: %v", err), v1alpha1.ServiceReplicaStatus{}
} }
desiredReplicas := podClique.Spec.Replicas desiredReplicas := podClique.Spec.Replicas
readyReplicas := podClique.Status.ReadyReplicas readyReplicas := podClique.Status.ReadyReplicas
serviceStatus := v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindPodClique,
ComponentName: resourceName,
Replicas: podClique.Status.Replicas,
UpdatedReplicas: podClique.Status.UpdatedReplicas,
ReadyReplicas: &readyReplicas,
}
if desiredReplicas == 0 { if desiredReplicas == 0 {
// No replicas desired, so it's ready // No replicas desired, so it's ready
return true, "" return true, "", serviceStatus
} }
if desiredReplicas != readyReplicas { if desiredReplicas != readyReplicas {
logger.V(1).Info("PodClique not ready", "resourceName", resourceName, "desired", desiredReplicas, "ready", readyReplicas) logger.V(1).Info("PodClique not ready", "resourceName", resourceName, "desired", desiredReplicas, "ready", readyReplicas)
return false, fmt.Sprintf("desired=%d, ready=%d", desiredReplicas, readyReplicas) return false, fmt.Sprintf("desired=%d, ready=%d", desiredReplicas, readyReplicas), serviceStatus
} }
return true, "" return true, "", serviceStatus
} }
// checkPCSGReady checks if a PodCliqueScalingGroup has spec.replicas == status.availableReplicas // checkPCSGReady checks if a PodCliqueScalingGroup has spec.replicas == status.availableReplicas
func checkPCSGReady(ctx context.Context, client client.Client, resourceName, namespace string, logger logr.Logger) (bool, string) { func checkPCSGReady(ctx context.Context, client client.Client, resourceName, namespace string, logger logr.Logger) (bool, string, v1alpha1.ServiceReplicaStatus) {
pcsg := &grovev1alpha1.PodCliqueScalingGroup{} pcsg := &grovev1alpha1.PodCliqueScalingGroup{}
err := client.Get(ctx, types.NamespacedName{Name: resourceName, Namespace: namespace}, pcsg) err := client.Get(ctx, types.NamespacedName{Name: resourceName, Namespace: namespace}, pcsg)
if err != nil { if err != nil {
if errors.IsNotFound(err) { if errors.IsNotFound(err) {
logger.V(2).Info("PodCliqueScalingGroup not found", "resourceName", resourceName) logger.V(2).Info("PodCliqueScalingGroup not found", "resourceName", resourceName)
return false, "resource not found" return false, "resource not found", v1alpha1.ServiceReplicaStatus{}
} }
logger.V(1).Info("Failed to get PodCliqueScalingGroup", "error", err, "resourceName", resourceName) logger.V(1).Info("Failed to get PodCliqueScalingGroup", "error", err, "resourceName", resourceName)
return false, fmt.Sprintf("get error: %v", err) return false, fmt.Sprintf("get error: %v", err), v1alpha1.ServiceReplicaStatus{}
} }
desiredReplicas := pcsg.Spec.Replicas desiredReplicas := pcsg.Spec.Replicas
availableReplicas := pcsg.Status.AvailableReplicas availableReplicas := pcsg.Status.AvailableReplicas
serviceStatus := v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindPodCliqueScalingGroup,
ComponentName: resourceName,
Replicas: pcsg.Status.Replicas,
UpdatedReplicas: pcsg.Status.UpdatedReplicas,
AvailableReplicas: &availableReplicas,
}
if desiredReplicas == 0 { if desiredReplicas == 0 {
// No replicas desired, so it's ready // No replicas desired, so it's ready
return true, "" return true, "", serviceStatus
} }
if desiredReplicas != availableReplicas { if desiredReplicas != availableReplicas {
logger.V(1).Info("PodCliqueScalingGroup not ready", "resourceName", resourceName, "desired", desiredReplicas, "available", availableReplicas) logger.V(1).Info("PodCliqueScalingGroup not ready", "resourceName", resourceName, "desired", desiredReplicas, "available", availableReplicas)
return false, fmt.Sprintf("desired=%d, available=%d", desiredReplicas, availableReplicas) return false, fmt.Sprintf("desired=%d, available=%d", desiredReplicas, availableReplicas), serviceStatus
} }
return true, "" return true, "", serviceStatus
} }
// resolveKaiSchedulerQueueName extracts the queue name from annotations or returns default // resolveKaiSchedulerQueueName extracts the queue name from annotations or returns default
......
...@@ -6,14 +6,20 @@ import ( ...@@ -6,14 +6,20 @@ import (
"testing" "testing"
grovev1alpha1 "github.com/NVIDIA/grove/operator/api/core/v1alpha1" grovev1alpha1 "github.com/NVIDIA/grove/operator/api/core/v1alpha1"
v1alpha1 "github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/v1alpha1"
commonconsts "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/consts" commonconsts "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/consts"
"github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/controller_common" "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/controller_common"
"github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
dynamicfake "k8s.io/client-go/dynamic/fake" dynamicfake "k8s.io/client-go/dynamic/fake"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
) )
func TestResolveKaiSchedulerQueueName(t *testing.T) { func TestResolveKaiSchedulerQueueName(t *testing.T) {
...@@ -311,3 +317,328 @@ func TestEnsureQueueExists(t *testing.T) { ...@@ -311,3 +317,328 @@ func TestEnsureQueueExists(t *testing.T) {
}) })
} }
} }
func Test_GetComponentReadinessAndServiceReplicaStatuses(t *testing.T) {
ctx := context.Background()
tests := []struct {
name string
dgdSpec v1alpha1.DynamoGraphDeploymentSpec
existingGroveResources []client.Object
wantReady bool
wantReason string
wantServiceStatuses map[string]v1alpha1.ServiceReplicaStatus
}{
{
name: "single-node service not ready - PodClique not ready",
dgdSpec: v1alpha1.DynamoGraphDeploymentSpec{
Services: map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
"frontend": {
ServiceName: "frontend",
DynamoNamespace: ptr.To("default"),
ComponentType: string(commonconsts.ComponentTypeFrontend),
Replicas: ptr.To(int32(2)),
},
},
},
existingGroveResources: []client.Object{
&grovev1alpha1.PodClique{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dgd-0-frontend",
Namespace: "default",
},
Spec: grovev1alpha1.PodCliqueSpec{
Replicas: 2,
},
Status: grovev1alpha1.PodCliqueStatus{
Replicas: 2,
UpdatedReplicas: 2,
ReadyReplicas: 1,
},
},
},
wantReady: false,
wantReason: "podclique/test-dgd-0-frontend: desired=2, ready=1",
wantServiceStatuses: map[string]v1alpha1.ServiceReplicaStatus{
"frontend": {
ComponentKind: v1alpha1.ComponentKindPodClique,
ComponentName: "test-dgd-0-frontend",
Replicas: 2,
UpdatedReplicas: 2,
ReadyReplicas: ptr.To(int32(1)),
},
},
},
{
name: "all multinode services ready - all PCSGs ready",
dgdSpec: v1alpha1.DynamoGraphDeploymentSpec{
Services: map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
"decode": {
ServiceName: "decode",
DynamoNamespace: ptr.To("default"),
ComponentType: string(commonconsts.ComponentTypeDecode),
Replicas: ptr.To(int32(2)),
Multinode: &v1alpha1.MultinodeSpec{
NodeCount: 2,
},
},
"prefill": {
ServiceName: "prefill",
DynamoNamespace: ptr.To("default"),
ComponentType: string(commonconsts.ComponentTypePrefill),
Replicas: ptr.To(int32(3)),
Multinode: &v1alpha1.MultinodeSpec{
NodeCount: 4,
},
},
},
},
existingGroveResources: []client.Object{
&grovev1alpha1.PodCliqueScalingGroup{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dgd-0-decode",
Namespace: "default",
},
Spec: grovev1alpha1.PodCliqueScalingGroupSpec{
Replicas: 2,
},
Status: grovev1alpha1.PodCliqueScalingGroupStatus{
Replicas: 2,
UpdatedReplicas: 2,
AvailableReplicas: 2,
},
},
&grovev1alpha1.PodCliqueScalingGroup{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dgd-0-prefill",
Namespace: "default",
},
Spec: grovev1alpha1.PodCliqueScalingGroupSpec{
Replicas: 3,
},
Status: grovev1alpha1.PodCliqueScalingGroupStatus{
Replicas: 3,
UpdatedReplicas: 3,
AvailableReplicas: 3,
},
},
},
wantReady: true,
wantReason: "",
wantServiceStatuses: map[string]v1alpha1.ServiceReplicaStatus{
"decode": {
ComponentKind: v1alpha1.ComponentKindPodCliqueScalingGroup,
ComponentName: "test-dgd-0-decode",
Replicas: 2,
UpdatedReplicas: 2,
AvailableReplicas: ptr.To(int32(2)),
},
"prefill": {
ComponentKind: v1alpha1.ComponentKindPodCliqueScalingGroup,
ComponentName: "test-dgd-0-prefill",
Replicas: 3,
UpdatedReplicas: 3,
AvailableReplicas: ptr.To(int32(3)),
},
},
},
{
name: "multinode service not ready - PCSG not ready",
dgdSpec: v1alpha1.DynamoGraphDeploymentSpec{
Services: map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
"worker": {
ServiceName: "worker",
DynamoNamespace: ptr.To("default"),
ComponentType: string(commonconsts.ComponentTypeWorker),
Replicas: ptr.To(int32(2)),
Multinode: &v1alpha1.MultinodeSpec{
NodeCount: 4,
},
},
},
},
existingGroveResources: []client.Object{
&grovev1alpha1.PodCliqueScalingGroup{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dgd-0-worker",
Namespace: "default",
},
Spec: grovev1alpha1.PodCliqueScalingGroupSpec{
Replicas: 2,
},
Status: grovev1alpha1.PodCliqueScalingGroupStatus{
Replicas: 2,
UpdatedReplicas: 2,
AvailableReplicas: 1,
},
},
},
wantReady: false,
wantReason: "pcsg/test-dgd-0-worker: desired=2, available=1",
wantServiceStatuses: map[string]v1alpha1.ServiceReplicaStatus{
"worker": {
ComponentKind: v1alpha1.ComponentKindPodCliqueScalingGroup,
ComponentName: "test-dgd-0-worker",
Replicas: 2,
UpdatedReplicas: 2,
AvailableReplicas: ptr.To(int32(1)),
},
},
},
{
name: "mixed services - some ready, some not - combination of PodClique and PCSG",
dgdSpec: v1alpha1.DynamoGraphDeploymentSpec{
Services: map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
"frontend": {
ServiceName: "frontend",
DynamoNamespace: ptr.To("default"),
ComponentType: string(commonconsts.ComponentTypeFrontend),
Replicas: ptr.To(int32(1)),
},
"decode": {
ServiceName: "decode",
DynamoNamespace: ptr.To("default"),
ComponentType: string(commonconsts.ComponentTypeDecode),
Replicas: ptr.To(int32(2)),
Multinode: &v1alpha1.MultinodeSpec{
NodeCount: 2,
},
},
"prefill": {
ServiceName: "prefill",
DynamoNamespace: ptr.To("default"),
ComponentType: string(commonconsts.ComponentTypePrefill),
Replicas: ptr.To(int32(2)),
Multinode: &v1alpha1.MultinodeSpec{
NodeCount: 2,
},
},
},
},
existingGroveResources: []client.Object{
&grovev1alpha1.PodClique{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dgd-0-frontend",
Namespace: "default",
},
Spec: grovev1alpha1.PodCliqueSpec{
Replicas: 1,
},
Status: grovev1alpha1.PodCliqueStatus{
Replicas: 1,
UpdatedReplicas: 1,
ReadyReplicas: 1,
},
},
&grovev1alpha1.PodCliqueScalingGroup{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dgd-0-decode",
Namespace: "default",
},
Spec: grovev1alpha1.PodCliqueScalingGroupSpec{
Replicas: 2,
},
Status: grovev1alpha1.PodCliqueScalingGroupStatus{
Replicas: 2,
UpdatedReplicas: 2,
AvailableReplicas: 1,
},
},
&grovev1alpha1.PodCliqueScalingGroup{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dgd-0-prefill",
Namespace: "default",
},
Spec: grovev1alpha1.PodCliqueScalingGroupSpec{
Replicas: 2,
},
Status: grovev1alpha1.PodCliqueScalingGroupStatus{
Replicas: 2,
UpdatedReplicas: 2,
AvailableReplicas: 2,
},
},
},
wantReady: false,
wantReason: "pcsg/test-dgd-0-decode: desired=2, available=1",
wantServiceStatuses: map[string]v1alpha1.ServiceReplicaStatus{
"frontend": {
ComponentKind: v1alpha1.ComponentKindPodClique,
ComponentName: "test-dgd-0-frontend",
Replicas: 1,
UpdatedReplicas: 1,
ReadyReplicas: ptr.To(int32(1)),
},
"decode": {
ComponentKind: v1alpha1.ComponentKindPodCliqueScalingGroup,
ComponentName: "test-dgd-0-decode",
Replicas: 2,
UpdatedReplicas: 2,
AvailableReplicas: ptr.To(int32(1)),
},
"prefill": {
ComponentKind: v1alpha1.ComponentKindPodCliqueScalingGroup,
ComponentName: "test-dgd-0-prefill",
Replicas: 2,
UpdatedReplicas: 2,
AvailableReplicas: ptr.To(int32(2)),
},
},
},
{
name: "service resource not found - PodClique missing",
dgdSpec: v1alpha1.DynamoGraphDeploymentSpec{
Services: map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
"frontend": {
ServiceName: "frontend",
DynamoNamespace: ptr.To("default"),
ComponentType: string(commonconsts.ComponentTypeFrontend),
Replicas: ptr.To(int32(1)),
},
},
},
existingGroveResources: []client.Object{},
wantReady: false,
wantReason: "podclique/test-dgd-0-frontend: resource not found",
wantServiceStatuses: map[string]v1alpha1.ServiceReplicaStatus{
"frontend": {},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := gomega.NewGomegaWithT(t)
s := scheme.Scheme
err := v1alpha1.AddToScheme(s)
g.Expect(err).NotTo(gomega.HaveOccurred())
err = grovev1alpha1.AddToScheme(s)
g.Expect(err).NotTo(gomega.HaveOccurred())
dgd := &v1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dgd",
Namespace: "default",
},
Spec: tt.dgdSpec,
}
var objects []client.Object
objects = append(objects, dgd)
objects = append(objects, tt.existingGroveResources...)
fakeKubeClient := fake.NewClientBuilder().
WithScheme(s).
WithObjects(objects...).
WithStatusSubresource(objects...).
Build()
ready, reason, serviceStatuses := GetComponentReadinessAndServiceReplicaStatuses(ctx, fakeKubeClient, dgd)
g.Expect(ready).To(gomega.Equal(tt.wantReady))
g.Expect(reason).To(gomega.Equal(tt.wantReason))
g.Expect(serviceStatuses).To(gomega.Equal(tt.wantServiceStatuses))
})
}
}
...@@ -67,6 +67,26 @@ _Appears in:_ ...@@ -67,6 +67,26 @@ _Appears in:_
#### ComponentKind
_Underlying type:_ _string_
ComponentKind represents the type of underlying Kubernetes resource.
_Validation:_
- Enum: [PodClique PodCliqueScalingGroup Deployment LeaderWorkerSet]
_Appears in:_
- [ServiceReplicaStatus](#servicereplicastatus)
| Field | Description |
| --- | --- |
| `PodClique` | ComponentKindPodClique represents a PodClique resource.<br /> |
| `PodCliqueScalingGroup` | ComponentKindPodCliqueScalingGroup represents a PodCliqueScalingGroup resource.<br /> |
| `Deployment` | ComponentKindDeployment represents a Deployment resource.<br /> |
| `LeaderWorkerSet` | ComponentKindLeaderWorkerSet represents a LeaderWorkerSet resource.<br /> |
#### ConfigMapKeySelector #### ConfigMapKeySelector
...@@ -430,6 +450,7 @@ _Appears in:_ ...@@ -430,6 +450,7 @@ _Appears in:_
| --- | --- | --- | --- | | --- | --- | --- | --- |
| `state` _string_ | State is a high-level textual status of the graph deployment lifecycle. | | | | `state` _string_ | State is a high-level textual status of the graph deployment lifecycle. | | |
| `conditions` _[Condition](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.28/#condition-v1-meta) array_ | Conditions contains the latest observed conditions of the graph deployment.<br />The slice is merged by type on patch updates. | | | | `conditions` _[Condition](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.28/#condition-v1-meta) array_ | Conditions contains the latest observed conditions of the graph deployment.<br />The slice is merged by type on patch updates. | | |
| `services` _object (keys:string, values:[ServiceReplicaStatus](#servicereplicastatus))_ | Services contains per-service replica status information.<br />The map key is the service name from spec.services. | | |
#### DynamoModel #### DynamoModel
...@@ -739,6 +760,27 @@ _Appears in:_ ...@@ -739,6 +760,27 @@ _Appears in:_
| `disable` _boolean_ | Disable indicates whether the ScalingAdapter should be disabled for this service.<br />When false (default), a DGDSA is created and owns the replicas field.<br />When true, no DGDSA is created and replicas can be modified directly in the DGD. | false | | | `disable` _boolean_ | Disable indicates whether the ScalingAdapter should be disabled for this service.<br />When false (default), a DGDSA is created and owns the replicas field.<br />When true, no DGDSA is created and replicas can be modified directly in the DGD. | false | |
#### ServiceReplicaStatus
ServiceReplicaStatus contains replica information for a single service.
_Appears in:_
- [DynamoGraphDeploymentStatus](#dynamographdeploymentstatus)
| Field | Description | Default | Validation |
| --- | --- | --- | --- |
| `componentKind` _[ComponentKind](#componentkind)_ | ComponentKind is the underlying resource kind (e.g., "PodClique", "PodCliqueScalingGroup", "Deployment", "LeaderWorkerSet"). | | Enum: [PodClique PodCliqueScalingGroup Deployment LeaderWorkerSet] <br /> |
| `componentName` _string_ | ComponentName is the name of the underlying resource. | | |
| `replicas` _integer_ | Replicas is the total number of non-terminated replicas.<br />Required for all component kinds. | | Minimum: 0 <br /> |
| `updatedReplicas` _integer_ | UpdatedReplicas is the number of replicas at the current/desired revision.<br />Required for all component kinds. | | Minimum: 0 <br /> |
| `readyReplicas` _integer_ | ReadyReplicas is the number of ready replicas.<br />Populated for PodClique, Deployment, and LeaderWorkerSet.<br />Not available for PodCliqueScalingGroup.<br />When nil, the field is omitted from the API response. | | Minimum: 0 <br /> |
| `availableReplicas` _integer_ | AvailableReplicas is the number of available replicas.<br />For Deployment: replicas ready for >= minReadySeconds.<br />For PodCliqueScalingGroup: replicas where all constituent PodCliques have >= MinAvailable ready pods.<br />Not available for PodClique or LeaderWorkerSet.<br />When nil, the field is omitted from the API response. | | Minimum: 0 <br /> |
#### SharedMemorySpec #### SharedMemorySpec
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment