Unverified Commit 60a6a96a authored by julienmancuso's avatar julienmancuso Committed by GitHub
Browse files

fix: Reflect actual status of Grove PGS in Dynamo DGD (#2710)


Signed-off-by: default avatarJulien Mancuso <jmancuso@nvidia.com>
parent f99b569d
......@@ -202,17 +202,18 @@ func init() {
SchemeBuilder.Register(&DynamoComponentDeployment{}, &DynamoComponentDeploymentList{})
}
func (s *DynamoComponentDeployment) IsReady() bool {
return s.Status.IsReady()
func (s *DynamoComponentDeployment) IsReady() (bool, string) {
ready, reason := s.Status.IsReady()
return ready, reason
}
func (s *DynamoComponentDeploymentStatus) IsReady() bool {
func (s *DynamoComponentDeploymentStatus) IsReady() (bool, string) {
for _, condition := range s.Conditions {
if condition.Type == DynamoGraphDeploymentConditionTypeAvailable && condition.Status == metav1.ConditionTrue {
return true
return true, ""
}
}
return false
return false, "Component deployment not ready - Available condition not true"
}
func (s *DynamoComponentDeployment) GetSpec() any {
......
......@@ -6,7 +6,7 @@ toolchain go1.24.3
require (
emperror.dev/errors v0.8.1
github.com/NVIDIA/grove/operator/api v0.0.0-20250801123021-8b42bac59ef2
github.com/NVIDIA/grove/operator/api v0.0.0-20250825164137-da01400261a6
github.com/bsm/gomega v1.27.10
github.com/google/go-cmp v0.7.0
github.com/imdario/mergo v0.3.6
......
emperror.dev/errors v0.8.1 h1:UavXZ5cSX/4u9iyvH6aDcuGkVjeexUGJ7Ij7G4VfQT0=
emperror.dev/errors v0.8.1/go.mod h1:YcRvLPh626Ubn2xqtoprejnA5nFha+TJ+2vew48kWuE=
github.com/NVIDIA/grove/operator/api v0.0.0-20250801123021-8b42bac59ef2 h1:JLOj0GiubP3VlR0okIbuqljvl+e2Vccnu6LX6wL34G0=
github.com/NVIDIA/grove/operator/api v0.0.0-20250801123021-8b42bac59ef2/go.mod h1:QlsR2wQLj9m/zVEqv5SsCPzyjN2ykYZ0r/NEnDf4WB4=
github.com/NVIDIA/grove/operator/api v0.0.0-20250825164137-da01400261a6 h1:JkW8LeRVsQH/YkRTz80T/JxlDgfk0URKgTUKyYKxbso=
github.com/NVIDIA/grove/operator/api v0.0.0-20250825164137-da01400261a6/go.mod h1:QlsR2wQLj9m/zVEqv5SsCPzyjN2ykYZ0r/NEnDf4WB4=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM=
......
......@@ -105,7 +105,6 @@ func (r *DynamoGraphDeploymentReconciler) Reconcile(ctx context.Context, req ctr
reason := Reason("undefined")
message := Message("")
state := PendingState
readyStatus := metav1.ConditionFalse
// retrieve the CRD
dynamoDeployment := &nvidiacomv1alpha1.DynamoGraphDeployment{}
if err = r.Get(ctx, req.NamespacedName, dynamoDeployment); err != nil {
......@@ -123,10 +122,13 @@ func (r *DynamoGraphDeploymentReconciler) Reconcile(ctx context.Context, req ctr
logger.Error(err, "Reconciliation failed")
}
dynamoDeployment.SetState(string(state))
readyStatus := metav1.ConditionFalse
if state == ReadyState {
readyStatus = metav1.ConditionTrue
}
// update the CRD status condition
// Update Ready condition
dynamoDeployment.AddStatusCondition(metav1.Condition{
Type: "Ready",
Status: readyStatus,
......@@ -134,6 +136,7 @@ func (r *DynamoGraphDeploymentReconciler) Reconcile(ctx context.Context, req ctr
Message: string(message),
LastTransitionTime: metav1.Now(),
})
err = r.Status().Update(ctx, dynamoDeployment)
if err != nil {
logger.Error(err, "Unable to update the CRD status", "crd", req.NamespacedName)
......@@ -160,7 +163,7 @@ func (r *DynamoGraphDeploymentReconciler) Reconcile(ctx context.Context, req ctr
}
type Resource interface {
IsReady() bool
IsReady() (ready bool, reason string)
GetName() string
}
......@@ -267,12 +270,17 @@ func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Co
logger.Error(err, "failed to sync the Grove GangSet")
return "", "", "", fmt.Errorf("failed to sync the Grove GangSet: %w", err)
}
groveGangSetAsResource := commonController.WrapResource(syncedGroveGangSet, func() bool {
if syncedGroveGangSet.Status.LastOperation != nil && syncedGroveGangSet.Status.LastOperation.State == grovev1alpha1.LastOperationStateSucceeded {
return true
}
return false
})
groveGangSetAsResource := commonController.WrapResource(
syncedGroveGangSet,
func() (bool, string) {
// Grove readiness: all underlying PodCliques and PodCliqueScalingGroups have replicas == availableReplicas
allComponentsReady, reason := dynamo.EvaluateAllComponentsReady(ctx, r.Client, dynamoDeployment)
if !allComponentsReady {
return false, reason
}
return true, ""
},
)
// Handle Grove scaling operations after structural changes
if err := r.reconcileGroveScaling(ctx, dynamoDeployment); err != nil {
......@@ -296,9 +304,10 @@ func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Co
logger.Error(err, "failed to sync the main component service")
return "", "", "", fmt.Errorf("failed to sync the main component service: %w", err)
}
mainComponentServiceAsResource := commonController.WrapResource(syncedMainComponentService, func() bool {
return true
})
mainComponentServiceAsResource := commonController.WrapResource(syncedMainComponentService,
func() (bool, string) {
return true, ""
})
resources = append(resources, mainComponentServiceAsResource)
// generate the main component ingress
ingressSpec := dynamo.GenerateDefaultIngressSpec(dynamoDeployment, r.Config.IngressConfig)
......@@ -317,9 +326,10 @@ func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Co
logger.Error(err, "failed to sync the main component ingress")
return "", "", "", fmt.Errorf("failed to sync the main component ingress: %w", err)
}
resources = append(resources, commonController.WrapResource(syncedMainComponentIngress, func() bool {
return true
}))
resources = append(resources, commonController.WrapResource(syncedMainComponentIngress,
func() (bool, string) {
return true, ""
}))
// generate the main component virtual service
if r.Config.IngressConfig.UseVirtualService() {
mainComponentVirtualService := dynamo.GenerateComponentVirtualService(ctx, dynamo.GetDynamoComponentName(dynamoDeployment, componentName), dynamoDeployment.Namespace, ingressSpec)
......@@ -334,9 +344,10 @@ func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Co
logger.Error(err, "failed to sync the main component virtual service")
return "", "", "", fmt.Errorf("failed to sync the main component virtual service: %w", err)
}
resources = append(resources, commonController.WrapResource(syncedMainComponentVirtualService, func() bool {
return true
}))
resources = append(resources, commonController.WrapResource(syncedMainComponentVirtualService,
func() (bool, string) {
return true, ""
}))
}
}
}
......@@ -344,16 +355,21 @@ func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Co
}
func (r *DynamoGraphDeploymentReconciler) checkResourcesReadiness(resources []Resource) (State, Reason, Message, error) {
var notReadyReasons []string
notReadyResources := []string{}
for _, resource := range resources {
if !resource.IsReady() {
ready, reason := resource.IsReady()
if !ready {
notReadyResources = append(notReadyResources, resource.GetName())
notReadyReasons = append(notReadyReasons, fmt.Sprintf("%s: %s", resource.GetName(), reason))
}
}
if len(notReadyResources) == 0 {
return ReadyState, "all_resources_are_ready", Message("All resources are ready"), nil
}
return PendingState, "some_resources_are_not_ready", Message(fmt.Sprintf("%d resources not ready: %v", len(notReadyResources), notReadyResources)), nil
return PendingState, "some_resources_are_not_ready", Message(fmt.Sprintf("Resources not ready: %s", strings.Join(notReadyReasons, "; "))), nil
}
func (r *DynamoGraphDeploymentReconciler) reconcileDynamoComponentsDeployments(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) (State, Reason, Message, error) {
......
......@@ -473,16 +473,20 @@ func GetResourcesConfig(resources *common.Resources) (*corev1.ResourceRequiremen
type Resource struct {
client.Object
isReady func() bool
isReady func() (bool, string)
}
func WrapResource[T client.Object](resource T, isReady func() bool) *Resource {
func WrapResource[T client.Object](resource T, isReady func() (bool, string)) *Resource {
return &Resource{
Object: resource,
isReady: isReady,
}
}
func (r *Resource) IsReady() bool {
func (r *Resource) IsReady() (bool, string) {
return r.isReady()
}
func (r *Resource) GetName() string {
return r.Object.GetName()
}
......@@ -4523,7 +4523,7 @@ func TestGenerateBasePodSpec_Worker(t *testing.T) {
VolumeSource: corev1.VolumeSource{
EmptyDir: &corev1.EmptyDirVolumeSource{
Medium: corev1.StorageMediumMemory,
SizeLimit: func() *resource.Quantity { q := resource.MustParse("8Gi"); return &q }(),
SizeLimit: func() *resource.Quantity { q := resource.MustParse(commonconsts.DefaultSharedMemorySize); return &q }(),
},
},
},
......
package dynamo
import (
"context"
"fmt"
"strings"
grovev1alpha1 "github.com/NVIDIA/grove/operator/api/core/v1alpha1"
"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/v1alpha1"
commonconsts "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/consts"
)
......@@ -30,3 +40,93 @@ func (d *GroveMultinodeDeployer) GetHostNames(serviceName string, numberOfNodes
}
return hostnames
}
// EvaluateAllComponentsReady determines if all Grove components are ready
// - PodCliques: spec.replicas == status.readyReplicas
// - PodCliqueScalingGroups: spec.replicas == status.availableReplicas
func EvaluateAllComponentsReady(ctx context.Context, client client.Client, dgd *nvidiacomv1alpha1.DynamoGraphDeployment) (bool, string) {
logger := log.FromContext(ctx)
var notReadyComponents []string
for serviceName, component := range dgd.Spec.Services {
numberOfNodes := component.GetNumberOfNodes()
isMultinode := numberOfNodes > 1
resourceName := fmt.Sprintf("%s-0-%s", dgd.Name, strings.ToLower(serviceName))
if isMultinode {
// Check PodCliqueScalingGroup: spec.replicas == status.availableReplicas
if ok, reason := checkPCSGReady(ctx, client, resourceName, dgd.Namespace, logger); !ok {
notReadyComponents = append(notReadyComponents, fmt.Sprintf("pcsg/%s: %s", resourceName, reason))
}
} else {
// Check PodClique: spec.replicas == status.readyReplicas
if ok, reason := checkPodCliqueReady(ctx, client, resourceName, dgd.Namespace, logger); !ok {
notReadyComponents = append(notReadyComponents, fmt.Sprintf("podclique/%s: %s", resourceName, reason))
}
}
}
if len(notReadyComponents) > 0 {
return false, strings.Join(notReadyComponents, "; ")
}
return true, ""
}
// checkPodCliqueReady checks if a PodClique has spec.replicas == status.readyReplicas
func checkPodCliqueReady(ctx context.Context, client client.Client, resourceName, namespace string, logger logr.Logger) (bool, string) {
podClique := &grovev1alpha1.PodClique{}
err := client.Get(ctx, types.NamespacedName{Name: resourceName, Namespace: namespace}, podClique)
if err != nil {
if errors.IsNotFound(err) {
logger.V(2).Info("PodClique not found", "resourceName", resourceName)
return false, "resource not found"
}
logger.V(1).Info("Failed to get PodClique", "error", err, "resourceName", resourceName)
return false, fmt.Sprintf("get error: %v", err)
}
desiredReplicas := podClique.Spec.Replicas
readyReplicas := podClique.Status.ReadyReplicas
if desiredReplicas == 0 {
// No replicas desired, so it's ready
return true, ""
}
if desiredReplicas != readyReplicas {
logger.V(1).Info("PodClique not ready", "resourceName", resourceName, "desired", desiredReplicas, "ready", readyReplicas)
return false, fmt.Sprintf("desired=%d, ready=%d", desiredReplicas, readyReplicas)
}
return true, ""
}
// checkPCSGReady checks if a PodCliqueScalingGroup has spec.replicas == status.availableReplicas
func checkPCSGReady(ctx context.Context, client client.Client, resourceName, namespace string, logger logr.Logger) (bool, string) {
pcsg := &grovev1alpha1.PodCliqueScalingGroup{}
err := client.Get(ctx, types.NamespacedName{Name: resourceName, Namespace: namespace}, pcsg)
if err != nil {
if errors.IsNotFound(err) {
logger.V(2).Info("PodCliqueScalingGroup not found", "resourceName", resourceName)
return false, "resource not found"
}
logger.V(1).Info("Failed to get PodCliqueScalingGroup", "error", err, "resourceName", resourceName)
return false, fmt.Sprintf("get error: %v", err)
}
desiredReplicas := pcsg.Spec.Replicas
availableReplicas := pcsg.Status.AvailableReplicas
if desiredReplicas == 0 {
// No replicas desired, so it's ready
return true, ""
}
if desiredReplicas != availableReplicas {
logger.V(1).Info("PodCliqueScalingGroup not ready", "resourceName", resourceName, "desired", desiredReplicas, "available", availableReplicas)
return false, fmt.Sprintf("desired=%d, available=%d", desiredReplicas, availableReplicas)
}
return true, ""
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment