Unverified Commit 062d3e6c authored by Schwinn Saereesitthipitak's avatar Schwinn Saereesitthipitak Committed by GitHub
Browse files

fix(operator): version snapshot artifacts on readable checkpoints (#7533)


Signed-off-by: default avatarSchwinn Saereesitthipitak <schwinns@nvidia.com>
parent fa92958a
...@@ -128,6 +128,40 @@ func TestHelpers(t *testing.T) { ...@@ -128,6 +128,40 @@ func TestHelpers(t *testing.T) {
assert.False(t, info.Ready) assert.False(t, info.Ready)
} }
func TestArtifactVersionHelpers(t *testing.T) {
t.Run("new checkpoints default to version 1", func(t *testing.T) {
ckpt := &nvidiacomv1alpha1.DynamoCheckpoint{}
assert.Nil(t, ckpt.Annotations)
assert.Equal(t, "checkpoint-job-"+testHash+"-"+consts.DefaultCheckpointArtifactVersion, "checkpoint-job-"+testHash+"-"+consts.DefaultCheckpointArtifactVersion)
})
t.Run("annotation overrides desired version", func(t *testing.T) {
ckpt := &nvidiacomv1alpha1.DynamoCheckpoint{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
consts.KubeAnnotationCheckpointArtifactVersion: "3",
},
},
}
assert.Equal(t, "3", ckpt.Annotations[consts.KubeAnnotationCheckpointArtifactVersion])
assert.Equal(t, "checkpoint-job-"+testHash+"-3", "checkpoint-job-"+testHash+"-"+ckpt.Annotations[consts.KubeAnnotationCheckpointArtifactVersion])
})
}
func TestResolveCheckpointStorage(t *testing.T) {
config := testPVCConfig()
location, storageType, err := ResolveCheckpointStorage(testHash, "", config)
require.NoError(t, err)
assert.Equal(t, "/checkpoints/"+testHash+"/versions/"+consts.DefaultCheckpointArtifactVersion, location)
assert.Equal(t, nvidiacomv1alpha1.DynamoCheckpointStorageType("pvc"), storageType)
location, storageType, err = ResolveCheckpointStorage(testHash, "7", config)
require.NoError(t, err)
assert.Equal(t, "/checkpoints/"+testHash+"/versions/7", location)
assert.Equal(t, nvidiacomv1alpha1.DynamoCheckpointStorageType("pvc"), storageType)
}
func TestCreateOrGetAutoCheckpointDeduplicatesConcurrentSameHashCheckpoint(t *testing.T) { func TestCreateOrGetAutoCheckpointDeduplicatesConcurrentSameHashCheckpoint(t *testing.T) {
ctx := context.Background() ctx := context.Background()
s := testScheme() s := testScheme()
...@@ -178,6 +212,17 @@ func TestCreateOrGetAutoCheckpointDeduplicatesConcurrentSameHashCheckpoint(t *te ...@@ -178,6 +212,17 @@ func TestCreateOrGetAutoCheckpointDeduplicatesConcurrentSameHashCheckpoint(t *te
assert.Equal(t, friendly.Name, list.Items[0].Name) assert.Equal(t, friendly.Name, list.Items[0].Name)
} }
func TestCreateOrGetAutoCheckpointSetsDefaultArtifactVersion(t *testing.T) {
ctx := context.Background()
s := testScheme()
c := fake.NewClientBuilder().WithScheme(s).Build()
ckpt, err := CreateOrGetAutoCheckpoint(ctx, c, testNamespace, testIdentity(), corev1.PodTemplateSpec{})
require.NoError(t, err)
require.NotNil(t, ckpt.Annotations)
assert.Equal(t, consts.DefaultCheckpointArtifactVersion, ckpt.Annotations[consts.KubeAnnotationCheckpointArtifactVersion])
}
// --- Injection idempotency tests --- // --- Injection idempotency tests ---
func TestInjectionIdempotency(t *testing.T) { func TestInjectionIdempotency(t *testing.T) {
...@@ -251,6 +296,20 @@ func TestInjectCheckpointIntoPodSpec(t *testing.T) { ...@@ -251,6 +296,20 @@ func TestInjectCheckpointIntoPodSpec(t *testing.T) {
assert.Nil(t, podSpec.Containers[0].Args) assert.Nil(t, podSpec.Containers[0].Args)
}) })
t.Run("ready checkpoint preserves published versioned location", func(t *testing.T) {
podSpec := testPodSpec()
info := &CheckpointInfo{
Enabled: true,
Ready: true,
Hash: testHash,
Location: "/checkpoints/" + testHash + "/versions/2",
StorageType: "pvc",
}
require.NoError(t, InjectCheckpointIntoPodSpec(podSpec, info, testPVCConfig()))
assert.Equal(t, "/checkpoints/"+testHash+"/versions/2", info.Location)
assert.Equal(t, nvidiacomv1alpha1.DynamoCheckpointStorageType("pvc"), info.StorageType)
})
t.Run("not-ready checkpoint preserves original command", func(t *testing.T) { t.Run("not-ready checkpoint preserves original command", func(t *testing.T) {
podSpec := testPodSpec() podSpec := testPodSpec()
require.NoError(t, InjectCheckpointIntoPodSpec(podSpec, testInfo(), testPVCConfig())) require.NoError(t, InjectCheckpointIntoPodSpec(podSpec, testInfo(), testPVCConfig()))
......
...@@ -242,38 +242,55 @@ func InjectCheckpointIntoPodSpec( ...@@ -242,38 +242,55 @@ func InjectCheckpointIntoPodSpec(
storageType = storageConfig.Type storageType = storageConfig.Type
} }
} }
if err := injectCheckpointStorage(podSpec, mainContainer, info, storageType, storageConfig); err != nil {
return err
}
InjectPodInfoVolume(podSpec)
InjectPodInfoVolumeMount(mainContainer)
return nil
}
func injectCheckpointStorage(
podSpec *corev1.PodSpec,
mainContainer *corev1.Container,
info *CheckpointInfo,
storageType string,
storageConfig *configv1alpha1.CheckpointStorageConfiguration,
) error {
if info.StorageType == "" {
info.StorageType = nvidiacomv1alpha1.DynamoCheckpointStorageType(storageType)
}
switch storageType { switch storageType {
case configv1alpha1.CheckpointStorageTypeS3: case configv1alpha1.CheckpointStorageTypeS3:
info.StorageType = nvidiacomv1alpha1.DynamoCheckpointStorageType(storageType)
if storageConfig == nil || storageConfig.S3.URI == "" { if storageConfig == nil || storageConfig.S3.URI == "" {
return fmt.Errorf("S3 storage type selected but no S3 URI configured (set checkpoint.storage.s3.uri)") return fmt.Errorf("S3 storage type selected but no S3 URI configured (set checkpoint.storage.s3.uri)")
} }
info.Location = fmt.Sprintf("%s/%s.tar", storageConfig.S3.URI, info.Hash) if info.Location == "" {
info.Location = fmt.Sprintf("%s/%s.tar", storageConfig.S3.URI, info.Hash)
}
return nil
case configv1alpha1.CheckpointStorageTypeOCI: case configv1alpha1.CheckpointStorageTypeOCI:
info.StorageType = nvidiacomv1alpha1.DynamoCheckpointStorageType(storageType)
if storageConfig == nil || storageConfig.OCI.URI == "" { if storageConfig == nil || storageConfig.OCI.URI == "" {
return fmt.Errorf("OCI storage type selected but no OCI URI configured (set checkpoint.storage.oci.uri)") return fmt.Errorf("OCI storage type selected but no OCI URI configured (set checkpoint.storage.oci.uri)")
} }
info.Location = fmt.Sprintf("%s:%s", storageConfig.OCI.URI, info.Hash) if info.Location == "" {
default: info.Location = fmt.Sprintf("%s:%s", storageConfig.OCI.URI, info.Hash)
info.StorageType = nvidiacomv1alpha1.DynamoCheckpointStorageType(storageType)
basePath := ""
if storageConfig != nil && storageConfig.PVC.BasePath != "" {
basePath = storageConfig.PVC.BasePath
} }
return nil
default:
if storageConfig == nil || storageConfig.PVC.PVCName == "" { if storageConfig == nil || storageConfig.PVC.PVCName == "" {
return fmt.Errorf("PVC storage type selected but no PVC name configured (set checkpoint.storage.pvc.pvcName)") return fmt.Errorf("PVC storage type selected but no PVC name configured (set checkpoint.storage.pvc.pvcName)")
} }
if basePath == "" { if storageConfig.PVC.BasePath == "" {
return fmt.Errorf("PVC storage type selected but no PVC base path configured (set checkpoint.storage.pvc.basePath)") return fmt.Errorf("PVC storage type selected but no PVC base path configured (set checkpoint.storage.pvc.basePath)")
} }
info.Location = fmt.Sprintf("%s/%s", basePath, info.Hash) if info.Location == "" {
info.Location = fmt.Sprintf("%s/%s", storageConfig.PVC.BasePath, info.Hash)
}
InjectCheckpointVolume(podSpec, storageConfig.PVC.PVCName) InjectCheckpointVolume(podSpec, storageConfig.PVC.PVCName)
InjectCheckpointVolumeMount(mainContainer, basePath) InjectCheckpointVolumeMount(mainContainer, storageConfig.PVC.BasePath)
return nil
} }
InjectPodInfoVolume(podSpec)
InjectPodInfoVolumeMount(mainContainer)
return nil
} }
...@@ -20,9 +20,11 @@ package checkpoint ...@@ -20,9 +20,11 @@ package checkpoint
import ( import (
"context" "context"
"fmt" "fmt"
"strings"
configv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/config/v1alpha1" configv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/config/v1alpha1"
nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1" nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client"
) )
...@@ -106,8 +108,14 @@ func ResolveCheckpointForService( ...@@ -106,8 +108,14 @@ func ResolveCheckpointForService(
func ResolveCheckpointStorage( func ResolveCheckpointStorage(
hash string, hash string,
version string,
config *configv1alpha1.CheckpointConfiguration, config *configv1alpha1.CheckpointConfiguration,
) (string, nvidiacomv1alpha1.DynamoCheckpointStorageType, error) { ) (string, nvidiacomv1alpha1.DynamoCheckpointStorageType, error) {
version = strings.TrimSpace(version)
if version == "" {
version = consts.DefaultCheckpointArtifactVersion
}
storageType := configv1alpha1.CheckpointStorageTypePVC storageType := configv1alpha1.CheckpointStorageTypePVC
if config != nil && config.Storage.Type != "" { if config != nil && config.Storage.Type != "" {
storageType = config.Storage.Type storageType = config.Storage.Type
...@@ -118,16 +126,16 @@ func ResolveCheckpointStorage( ...@@ -118,16 +126,16 @@ func ResolveCheckpointStorage(
if config == nil || config.Storage.S3.URI == "" { if config == nil || config.Storage.S3.URI == "" {
return "", "", fmt.Errorf("S3 storage type selected but no S3 URI configured (set checkpoint.storage.s3.uri)") return "", "", fmt.Errorf("S3 storage type selected but no S3 URI configured (set checkpoint.storage.s3.uri)")
} }
return fmt.Sprintf("%s/%s.tar", config.Storage.S3.URI, hash), nvidiacomv1alpha1.DynamoCheckpointStorageType(storageType), nil return fmt.Sprintf("%s/%s/versions/%s.tar", config.Storage.S3.URI, hash, version), nvidiacomv1alpha1.DynamoCheckpointStorageType(storageType), nil
case configv1alpha1.CheckpointStorageTypeOCI: case configv1alpha1.CheckpointStorageTypeOCI:
if config == nil || config.Storage.OCI.URI == "" { if config == nil || config.Storage.OCI.URI == "" {
return "", "", fmt.Errorf("OCI storage type selected but no OCI URI configured (set checkpoint.storage.oci.uri)") return "", "", fmt.Errorf("OCI storage type selected but no OCI URI configured (set checkpoint.storage.oci.uri)")
} }
return fmt.Sprintf("%s:%s", config.Storage.OCI.URI, hash), nvidiacomv1alpha1.DynamoCheckpointStorageType(storageType), nil return fmt.Sprintf("%s:%s-%s", config.Storage.OCI.URI, hash, version), nvidiacomv1alpha1.DynamoCheckpointStorageType(storageType), nil
default: default:
if config == nil || config.Storage.PVC.BasePath == "" { if config == nil || config.Storage.PVC.BasePath == "" {
return "", "", fmt.Errorf("PVC storage type selected but no PVC base path configured (set checkpoint.storage.pvc.basePath)") return "", "", fmt.Errorf("PVC storage type selected but no PVC base path configured (set checkpoint.storage.pvc.basePath)")
} }
return fmt.Sprintf("%s/%s", config.Storage.PVC.BasePath, hash), nvidiacomv1alpha1.DynamoCheckpointStorageType(storageType), nil return fmt.Sprintf("%s/%s/versions/%s", config.Storage.PVC.BasePath, hash, version), nvidiacomv1alpha1.DynamoCheckpointStorageType(storageType), nil
} }
} }
...@@ -120,6 +120,9 @@ func CreateOrGetAutoCheckpoint( ...@@ -120,6 +120,9 @@ func CreateOrGetAutoCheckpoint(
Labels: map[string]string{ Labels: map[string]string{
consts.KubeLabelCheckpointHash: hash, consts.KubeLabelCheckpointHash: hash,
}, },
Annotations: map[string]string{
consts.KubeAnnotationCheckpointArtifactVersion: consts.DefaultCheckpointArtifactVersion,
},
}, },
Spec: nvidiacomv1alpha1.DynamoCheckpointSpec{ Spec: nvidiacomv1alpha1.DynamoCheckpointSpec{
Identity: identity, Identity: identity,
......
...@@ -144,11 +144,13 @@ const ( ...@@ -144,11 +144,13 @@ const (
// deploy/snapshot/pkg/config/constants.go. If you change a value here, update there too. // deploy/snapshot/pkg/config/constants.go. If you change a value here, update there too.
// Kubernetes labels // Kubernetes labels
KubeLabelIsCheckpointSource = "nvidia.com/snapshot-is-checkpoint-source" // Pod label that triggers DaemonSet auto-checkpoint KubeLabelIsCheckpointSource = "nvidia.com/snapshot-is-checkpoint-source" // Pod label that triggers DaemonSet auto-checkpoint
KubeLabelCheckpointHash = "nvidia.com/snapshot-checkpoint-hash" // Checkpoint identity hash used for lookup/reuse (may differ from DynamoCheckpoint metadata.name) KubeLabelCheckpointHash = "nvidia.com/snapshot-checkpoint-hash" // Checkpoint identity hash used for lookup/reuse (may differ from DynamoCheckpoint metadata.name)
KubeLabelIsRestoreTarget = "nvidia.com/snapshot-is-restore-target" // Pod label that triggers DaemonSet auto-restore KubeLabelIsRestoreTarget = "nvidia.com/snapshot-is-restore-target" // Pod label that triggers DaemonSet auto-restore
KubeAnnotationCheckpointLocation = "nvidia.com/snapshot-checkpoint-location" // Pod annotation that tells snapshot-agent where the checkpoint lives KubeAnnotationCheckpointArtifactVersion = "nvidia.com/snapshot-artifact-version" // Checkpoint artifact generation; changing it triggers a new immutable capture attempt
KubeAnnotationCheckpointStorageType = "nvidia.com/snapshot-checkpoint-storage-type" // Pod annotation that tells snapshot-agent which storage backend owns the checkpoint DefaultCheckpointArtifactVersion = "1"
KubeAnnotationCheckpointLocation = "nvidia.com/snapshot-checkpoint-location" // Pod annotation that tells snapshot-agent where the checkpoint lives
KubeAnnotationCheckpointStorageType = "nvidia.com/snapshot-checkpoint-storage-type" // Pod annotation that tells snapshot-agent which storage backend owns the checkpoint
// Environment variables injected into pods // Environment variables injected into pods
EnvReadyForCheckpointFile = "DYN_READY_FOR_CHECKPOINT_FILE" // Ready-for-checkpoint file path — checkpoint job pods EnvReadyForCheckpointFile = "DYN_READY_FOR_CHECKPOINT_FILE" // Ready-for-checkpoint file path — checkpoint job pods
......
...@@ -20,6 +20,7 @@ package controller ...@@ -20,6 +20,7 @@ package controller
import ( import (
"context" "context"
"fmt" "fmt"
"strings"
"time" "time"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/dynamo" "github.com/ai-dynamo/dynamo/deploy/operator/internal/dynamo"
...@@ -79,6 +80,23 @@ func checkpointLeaseExpired(lease *coordinationv1.Lease, now time.Time) bool { ...@@ -79,6 +80,23 @@ func checkpointLeaseExpired(lease *coordinationv1.Lease, now time.Time) bool {
return now.After(leaseTime.Time.Add(time.Duration(*lease.Spec.LeaseDurationSeconds) * time.Second)) return now.After(leaseTime.Time.Add(time.Duration(*lease.Spec.LeaseDurationSeconds) * time.Second))
} }
func desiredArtifactVersion(ckpt *nvidiacomv1alpha1.DynamoCheckpoint) string {
version := consts.DefaultCheckpointArtifactVersion
if ckpt.Annotations == nil {
return version
}
annotatedVersion := strings.TrimSpace(ckpt.Annotations[consts.KubeAnnotationCheckpointArtifactVersion])
if annotatedVersion != "" {
version = annotatedVersion
}
return version
}
func desiredCheckpointJobName(ckpt *nvidiacomv1alpha1.DynamoCheckpoint, identityHash string) string {
return "checkpoint-job-" + identityHash + "-" + desiredArtifactVersion(ckpt)
}
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamocheckpoints,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=nvidia.com,resources=dynamocheckpoints,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamocheckpoints/status,verbs=get;update;patch // +kubebuilder:rbac:groups=nvidia.com,resources=dynamocheckpoints/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamocheckpoints/finalizers,verbs=update // +kubebuilder:rbac:groups=nvidia.com,resources=dynamocheckpoints/finalizers,verbs=update
...@@ -124,6 +142,22 @@ func (r *CheckpointReconciler) Reconcile(ctx context.Context, req ctrl.Request) ...@@ -124,6 +142,22 @@ func (r *CheckpointReconciler) Reconcile(ctx context.Context, req ctrl.Request)
ckpt.Status.IdentityHash = identityHash ckpt.Status.IdentityHash = identityHash
needsStatusUpdate = true needsStatusUpdate = true
} }
existing, err := checkpoint.FindCheckpointByIdentityHash(ctx, r.Client, ckpt.Namespace, identityHash, ckpt.Name)
if err != nil {
return ctrl.Result{}, err
}
if existing != nil {
ckpt.Status.Phase = nvidiacomv1alpha1.DynamoCheckpointPhaseFailed
ckpt.Status.JobName = ""
ckpt.Status.CreatedAt = nil
ckpt.Status.Message = fmt.Sprintf("checkpoint identity hash %s is already owned by %s", identityHash, existing.Name)
if err := r.Status().Update(ctx, ckpt); err != nil {
logger.Error(err, "Failed to mark duplicate DynamoCheckpoint as failed")
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
desiredJobName := desiredCheckpointJobName(ckpt, identityHash)
switch ckpt.Status.Phase { switch ckpt.Status.Phase {
case "", nvidiacomv1alpha1.DynamoCheckpointPhasePending, nvidiacomv1alpha1.DynamoCheckpointPhaseCreating, nvidiacomv1alpha1.DynamoCheckpointPhaseReady, nvidiacomv1alpha1.DynamoCheckpointPhaseFailed: case "", nvidiacomv1alpha1.DynamoCheckpointPhasePending, nvidiacomv1alpha1.DynamoCheckpointPhaseCreating, nvidiacomv1alpha1.DynamoCheckpointPhaseReady, nvidiacomv1alpha1.DynamoCheckpointPhaseFailed:
default: default:
...@@ -136,6 +170,15 @@ func (r *CheckpointReconciler) Reconcile(ctx context.Context, req ctrl.Request) ...@@ -136,6 +170,15 @@ func (r *CheckpointReconciler) Reconcile(ctx context.Context, req ctrl.Request)
ckpt.Status.Message = "" ckpt.Status.Message = ""
needsStatusUpdate = true needsStatusUpdate = true
} }
if ckpt.Status.Phase != nvidiacomv1alpha1.DynamoCheckpointPhaseCreating &&
ckpt.Status.JobName != "" &&
ckpt.Status.JobName != desiredJobName {
ckpt.Status.Phase = nvidiacomv1alpha1.DynamoCheckpointPhasePending
ckpt.Status.JobName = ""
ckpt.Status.CreatedAt = nil
ckpt.Status.Message = ""
needsStatusUpdate = true
}
if needsStatusUpdate { if needsStatusUpdate {
if err := r.Status().Update(ctx, ckpt); err != nil { if err := r.Status().Update(ctx, ckpt); err != nil {
logger.Error(err, "Failed to initialize DynamoCheckpoint status") logger.Error(err, "Failed to initialize DynamoCheckpoint status")
...@@ -156,11 +199,7 @@ func (r *CheckpointReconciler) Reconcile(ctx context.Context, req ctrl.Request) ...@@ -156,11 +199,7 @@ func (r *CheckpointReconciler) Reconcile(ctx context.Context, req ctrl.Request)
// Nothing to do, checkpoint is ready // Nothing to do, checkpoint is ready
return ctrl.Result{}, nil return ctrl.Result{}, nil
case nvidiacomv1alpha1.DynamoCheckpointPhaseFailed: case nvidiacomv1alpha1.DynamoCheckpointPhaseFailed:
// Re-evaluate the Job in case retries succeeded after a transient failure. return ctrl.Result{}, nil
if ckpt.Status.JobName == "" {
return ctrl.Result{}, nil
}
return r.handleCreating(ctx, ckpt)
default: default:
// Unknown phase, reset to Pending // Unknown phase, reset to Pending
ckpt.Status.Phase = nvidiacomv1alpha1.DynamoCheckpointPhasePending ckpt.Status.Phase = nvidiacomv1alpha1.DynamoCheckpointPhasePending
...@@ -182,7 +221,12 @@ func (r *CheckpointReconciler) handlePending(ctx context.Context, ckpt *nvidiaco ...@@ -182,7 +221,12 @@ func (r *CheckpointReconciler) handlePending(ctx context.Context, ckpt *nvidiaco
return ctrl.Result{}, fmt.Errorf("failed to compute checkpoint identity hash: %w", err) return ctrl.Result{}, fmt.Errorf("failed to compute checkpoint identity hash: %w", err)
} }
} }
jobName := fmt.Sprintf("checkpoint-job-%s", hash) version := desiredArtifactVersion(ckpt)
jobName := desiredCheckpointJobName(ckpt, hash)
location, storageType, err := checkpoint.ResolveCheckpointStorage(hash, version, &r.Config.Checkpoint)
if err != nil {
return ctrl.Result{}, err
}
// Use SyncResource to create/update the checkpoint Job // Use SyncResource to create/update the checkpoint Job
modified, _, err := commonController.SyncResource(ctx, r, ckpt, func(ctx context.Context) (*batchv1.Job, bool, error) { modified, _, err := commonController.SyncResource(ctx, r, ckpt, func(ctx context.Context) (*batchv1.Job, bool, error) {
...@@ -201,6 +245,9 @@ func (r *CheckpointReconciler) handlePending(ctx context.Context, ckpt *nvidiaco ...@@ -201,6 +245,9 @@ func (r *CheckpointReconciler) handlePending(ctx context.Context, ckpt *nvidiaco
// Update status to Creating phase // Update status to Creating phase
ckpt.Status.Phase = nvidiacomv1alpha1.DynamoCheckpointPhaseCreating ckpt.Status.Phase = nvidiacomv1alpha1.DynamoCheckpointPhaseCreating
ckpt.Status.JobName = jobName ckpt.Status.JobName = jobName
ckpt.Status.Location = location
ckpt.Status.StorageType = storageType
ckpt.Status.CreatedAt = nil
ckpt.Status.Message = "" ckpt.Status.Message = ""
meta.SetStatusCondition(&ckpt.Status.Conditions, metav1.Condition{ meta.SetStatusCondition(&ckpt.Status.Conditions, metav1.Condition{
Type: string(nvidiacomv1alpha1.DynamoCheckpointConditionJobCreated), Type: string(nvidiacomv1alpha1.DynamoCheckpointConditionJobCreated),
...@@ -234,9 +281,7 @@ func (r *CheckpointReconciler) handleCreating(ctx context.Context, ckpt *nvidiac ...@@ -234,9 +281,7 @@ func (r *CheckpointReconciler) handleCreating(ctx context.Context, ckpt *nvidiac
job := &batchv1.Job{} job := &batchv1.Job{}
if err := r.Get(ctx, client.ObjectKey{Namespace: ckpt.Namespace, Name: ckpt.Status.JobName}, job); err != nil { if err := r.Get(ctx, client.ObjectKey{Namespace: ckpt.Namespace, Name: ckpt.Status.JobName}, job); err != nil {
if apierrors.IsNotFound(err) { if apierrors.IsNotFound(err) {
// Job was deleted, go back to Pending ckpt.Status.Phase = nvidiacomv1alpha1.DynamoCheckpointPhaseFailed
ckpt.Status.Phase = nvidiacomv1alpha1.DynamoCheckpointPhasePending
ckpt.Status.JobName = ""
ckpt.Status.Message = "checkpoint job was deleted" ckpt.Status.Message = "checkpoint job was deleted"
meta.SetStatusCondition(&ckpt.Status.Conditions, metav1.Condition{ meta.SetStatusCondition(&ckpt.Status.Conditions, metav1.Condition{
Type: string(nvidiacomv1alpha1.DynamoCheckpointConditionJobCreated), Type: string(nvidiacomv1alpha1.DynamoCheckpointConditionJobCreated),
...@@ -337,15 +382,23 @@ func (r *CheckpointReconciler) handleCreating(ctx context.Context, ckpt *nvidiac ...@@ -337,15 +382,23 @@ func (r *CheckpointReconciler) handleCreating(ctx context.Context, ckpt *nvidiac
logger.Info("Checkpoint Job succeeded", "job", job.Name) logger.Info("Checkpoint Job succeeded", "job", job.Name)
r.Recorder.Event(ckpt, corev1.EventTypeNormal, "CheckpointReady", "Checkpoint creation completed successfully") r.Recorder.Event(ckpt, corev1.EventTypeNormal, "CheckpointReady", "Checkpoint creation completed successfully")
now := metav1.Now() if ckpt.Status.Location == "" || ckpt.Status.StorageType == "" {
location, storageType, err := checkpoint.ResolveCheckpointStorage(ckpt.Status.IdentityHash, &r.Config.Checkpoint) version := desiredArtifactVersion(ckpt)
if err != nil { location, storageType, err := checkpoint.ResolveCheckpointStorage(
return ctrl.Result{}, err ckpt.Status.IdentityHash,
version,
&r.Config.Checkpoint,
)
if err != nil {
return ctrl.Result{}, err
}
ckpt.Status.Location = location
ckpt.Status.StorageType = storageType
} }
now := metav1.Now()
ckpt.Status.Phase = nvidiacomv1alpha1.DynamoCheckpointPhaseReady ckpt.Status.Phase = nvidiacomv1alpha1.DynamoCheckpointPhaseReady
ckpt.Status.CreatedAt = &now ckpt.Status.CreatedAt = &now
ckpt.Status.Location = location
ckpt.Status.StorageType = storageType
ckpt.Status.Message = "" ckpt.Status.Message = ""
meta.SetStatusCondition(&ckpt.Status.Conditions, metav1.Condition{ meta.SetStatusCondition(&ckpt.Status.Conditions, metav1.Condition{
Type: string(nvidiacomv1alpha1.DynamoCheckpointConditionJobCompleted), Type: string(nvidiacomv1alpha1.DynamoCheckpointConditionJobCompleted),
...@@ -421,6 +474,7 @@ func (r *CheckpointReconciler) buildCheckpointJob(ckpt *nvidiacomv1alpha1.Dynamo ...@@ -421,6 +474,7 @@ func (r *CheckpointReconciler) buildCheckpointJob(ckpt *nvidiacomv1alpha1.Dynamo
if hash == "" { if hash == "" {
hash, _ = checkpoint.ComputeIdentityHash(ckpt.Spec.Identity) hash, _ = checkpoint.ComputeIdentityHash(ckpt.Spec.Identity)
} }
version := desiredArtifactVersion(ckpt)
// Add checkpoint-related labels // Add checkpoint-related labels
if podTemplate.Labels == nil { if podTemplate.Labels == nil {
...@@ -429,7 +483,11 @@ func (r *CheckpointReconciler) buildCheckpointJob(ckpt *nvidiacomv1alpha1.Dynamo ...@@ -429,7 +483,11 @@ func (r *CheckpointReconciler) buildCheckpointJob(ckpt *nvidiacomv1alpha1.Dynamo
if podTemplate.Annotations == nil { if podTemplate.Annotations == nil {
podTemplate.Annotations = make(map[string]string) podTemplate.Annotations = make(map[string]string)
} }
location, storageType, err := checkpoint.ResolveCheckpointStorage(hash, &r.Config.Checkpoint) location, storageType, err := checkpoint.ResolveCheckpointStorage(
hash,
version,
&r.Config.Checkpoint,
)
if err != nil { if err != nil {
location = "" location = ""
storageType = "" storageType = ""
......
...@@ -58,6 +58,8 @@ var testHash = func() string { ...@@ -58,6 +58,8 @@ var testHash = func() string {
return hash return hash
}() }()
var defaultCheckpointJobName = "checkpoint-job-" + testHash + "-" + consts.DefaultCheckpointArtifactVersion
func checkpointTestScheme() *runtime.Scheme { func checkpointTestScheme() *runtime.Scheme {
s := runtime.NewScheme() s := runtime.NewScheme()
_ = nvidiacomv1alpha1.AddToScheme(s) _ = nvidiacomv1alpha1.AddToScheme(s)
...@@ -141,7 +143,7 @@ func TestBuildCheckpointJob(t *testing.T) { ...@@ -141,7 +143,7 @@ func TestBuildCheckpointJob(t *testing.T) {
} }
r := makeCheckpointReconciler(s, ckpt) r := makeCheckpointReconciler(s, ckpt)
job := r.buildCheckpointJob(ckpt, "checkpoint-job-"+testHash) job := r.buildCheckpointJob(ckpt, defaultCheckpointJobName)
podSpec := job.Spec.Template.Spec podSpec := job.Spec.Template.Spec
main := podSpec.Containers[0] main := podSpec.Containers[0]
...@@ -239,7 +241,7 @@ func TestBuildCheckpointJob(t *testing.T) { ...@@ -239,7 +241,7 @@ func TestBuildCheckpointJob(t *testing.T) {
ckpt.Spec.Job.ActiveDeadlineSeconds = &deadline ckpt.Spec.Job.ActiveDeadlineSeconds = &deadline
ckpt.Spec.Job.BackoffLimit = &backoff //nolint:staticcheck // Compatibility test: deprecated field must remain ignored by checkpoint Jobs. ckpt.Spec.Job.BackoffLimit = &backoff //nolint:staticcheck // Compatibility test: deprecated field must remain ignored by checkpoint Jobs.
ckpt.Spec.Job.TTLSecondsAfterFinished = &ttl ckpt.Spec.Job.TTLSecondsAfterFinished = &ttl
job = r.buildCheckpointJob(ckpt, "checkpoint-job-"+testHash) job = r.buildCheckpointJob(ckpt, defaultCheckpointJobName)
assert.Equal(t, int64(7200), *job.Spec.ActiveDeadlineSeconds) assert.Equal(t, int64(7200), *job.Spec.ActiveDeadlineSeconds)
assert.Equal(t, int32(0), *job.Spec.BackoffLimit) assert.Equal(t, int32(0), *job.Spec.BackoffLimit)
assert.Equal(t, int32(600), *job.Spec.TTLSecondsAfterFinished) assert.Equal(t, int32(600), *job.Spec.TTLSecondsAfterFinished)
...@@ -249,7 +251,7 @@ func TestBuildCheckpointJob(t *testing.T) { ...@@ -249,7 +251,7 @@ func TestBuildCheckpointJob(t *testing.T) {
corev1.ResourceName("nvidia.com/gpu"): resource.MustParse("2"), corev1.ResourceName("nvidia.com/gpu"): resource.MustParse("2"),
}, },
} }
job = r.buildCheckpointJob(ckpt, "checkpoint-job-"+testHash) job = r.buildCheckpointJob(ckpt, defaultCheckpointJobName)
assert.Equal(t, []string{"cuda-checkpoint", "--launch-job", "python3", "-m", "dynamo.vllm"}, job.Spec.Template.Spec.Containers[0].Command) assert.Equal(t, []string{"cuda-checkpoint", "--launch-job", "python3", "-m", "dynamo.vllm"}, job.Spec.Template.Spec.Containers[0].Command)
} }
...@@ -272,7 +274,7 @@ func TestBuildCheckpointJobInjectsStandardEnvVars(t *testing.T) { ...@@ -272,7 +274,7 @@ func TestBuildCheckpointJobInjectsStandardEnvVars(t *testing.T) {
customShmSize := resource.MustParse("16Gi") customShmSize := resource.MustParse("16Gi")
ckpt.Spec.Job.SharedMemory = &nvidiacomv1alpha1.SharedMemorySpec{Size: customShmSize} ckpt.Spec.Job.SharedMemory = &nvidiacomv1alpha1.SharedMemorySpec{Size: customShmSize}
job := r.buildCheckpointJob(ckpt, "checkpoint-job-"+testHash) job := r.buildCheckpointJob(ckpt, defaultCheckpointJobName)
foundCustomShmVolume := false foundCustomShmVolume := false
for _, v := range job.Spec.Template.Spec.Volumes { for _, v := range job.Spec.Template.Spec.Volumes {
if v.Name == consts.KubeValueNameSharedMemory { if v.Name == consts.KubeValueNameSharedMemory {
...@@ -366,7 +368,49 @@ func TestCheckpointReconciler_Reconcile(t *testing.T) { ...@@ -366,7 +368,49 @@ func TestCheckpointReconciler_Reconcile(t *testing.T) {
updated := &nvidiacomv1alpha1.DynamoCheckpoint{} updated := &nvidiacomv1alpha1.DynamoCheckpoint{}
require.NoError(t, r.Get(ctx, types.NamespacedName{Name: testHash, Namespace: testNamespace}, updated)) require.NoError(t, r.Get(ctx, types.NamespacedName{Name: testHash, Namespace: testNamespace}, updated))
assert.Equal(t, nvidiacomv1alpha1.DynamoCheckpointPhaseCreating, updated.Status.Phase) assert.Equal(t, nvidiacomv1alpha1.DynamoCheckpointPhaseCreating, updated.Status.Phase)
assert.Equal(t, "checkpoint-job-"+testHash, updated.Status.JobName) assert.Equal(t, defaultCheckpointJobName, updated.Status.JobName)
})
t.Run("artifact version bump starts a new checkpoint job", func(t *testing.T) {
ckpt := makeTestCheckpoint(nvidiacomv1alpha1.DynamoCheckpointPhaseReady)
ckpt.Status.IdentityHash = testHash
ckpt.Status.JobName = defaultCheckpointJobName
ckpt.Status.Location = "/checkpoints/" + testHash + "/versions/" + consts.DefaultCheckpointArtifactVersion
ckpt.Annotations = map[string]string{consts.KubeAnnotationCheckpointArtifactVersion: "2"}
r := makeCheckpointReconciler(s, ckpt)
_, err := r.Reconcile(ctx, ctrl.Request{
NamespacedName: types.NamespacedName{Name: ckpt.Name, Namespace: testNamespace},
})
require.NoError(t, err)
updated := &nvidiacomv1alpha1.DynamoCheckpoint{}
require.NoError(t, r.Get(ctx, types.NamespacedName{Name: ckpt.Name, Namespace: testNamespace}, updated))
assert.Equal(t, nvidiacomv1alpha1.DynamoCheckpointPhaseCreating, updated.Status.Phase)
assert.Equal(t, "checkpoint-job-"+testHash+"-2", updated.Status.JobName)
assert.Equal(t, "/checkpoints/"+testHash+"/versions/2", updated.Status.Location)
})
t.Run("duplicate identity hash is rejected even with a readable name", func(t *testing.T) {
primary := makeTestCheckpoint(nvidiacomv1alpha1.DynamoCheckpointPhaseReady)
primary.Name = "friendly-primary"
primary.Status.IdentityHash = testHash
primary.Status.JobName = defaultCheckpointJobName
duplicate := makeTestCheckpoint(nvidiacomv1alpha1.DynamoCheckpointPhaseReady)
duplicate.Name = "friendly-duplicate"
duplicate.Status.IdentityHash = testHash
duplicate.Status.JobName = "checkpoint-job-" + testHash + "-2"
r := makeCheckpointReconciler(s, primary, duplicate)
_, err := r.Reconcile(ctx, ctrl.Request{
NamespacedName: types.NamespacedName{Name: duplicate.Name, Namespace: testNamespace},
})
require.NoError(t, err)
updated := &nvidiacomv1alpha1.DynamoCheckpoint{}
require.NoError(t, r.Get(ctx, types.NamespacedName{Name: duplicate.Name, Namespace: testNamespace}, updated))
assert.Equal(t, nvidiacomv1alpha1.DynamoCheckpointPhaseFailed, updated.Status.Phase)
assert.Contains(t, updated.Status.Message, primary.Name)
}) })
} }
...@@ -386,10 +430,12 @@ func TestCheckpointReconciler_HandleCreating(t *testing.T) { ...@@ -386,10 +430,12 @@ func TestCheckpointReconciler_HandleCreating(t *testing.T) {
} }
t.Run("succeeded job transitions to Ready", func(t *testing.T) { t.Run("succeeded job transitions to Ready", func(t *testing.T) {
ckpt := makeCreatingCkpt(testHash, "job-ok") ckpt := makeCreatingCkpt(testHash, defaultCheckpointJobName)
ckpt.Status.Location = "/checkpoints/" + testHash + "/versions/" + consts.DefaultCheckpointArtifactVersion
ckpt.Status.StorageType = "pvc"
job := &batchv1.Job{ job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "job-ok", Name: defaultCheckpointJobName,
Namespace: testNamespace, Namespace: testNamespace,
Annotations: map[string]string{checkpointStatusAnnotation: checkpointStatusCompleted}, Annotations: map[string]string{checkpointStatusAnnotation: checkpointStatusCompleted},
}, },
...@@ -408,7 +454,7 @@ func TestCheckpointReconciler_HandleCreating(t *testing.T) { ...@@ -408,7 +454,7 @@ func TestCheckpointReconciler_HandleCreating(t *testing.T) {
updated := &nvidiacomv1alpha1.DynamoCheckpoint{} updated := &nvidiacomv1alpha1.DynamoCheckpoint{}
require.NoError(t, r.Get(ctx, types.NamespacedName{Name: testHash, Namespace: testNamespace}, updated)) require.NoError(t, r.Get(ctx, types.NamespacedName{Name: testHash, Namespace: testNamespace}, updated))
assert.Equal(t, nvidiacomv1alpha1.DynamoCheckpointPhaseReady, updated.Status.Phase) assert.Equal(t, nvidiacomv1alpha1.DynamoCheckpointPhaseReady, updated.Status.Phase)
assert.Equal(t, "/checkpoints/"+testHash, updated.Status.Location) assert.Equal(t, "/checkpoints/"+testHash+"/versions/"+consts.DefaultCheckpointArtifactVersion, updated.Status.Location)
assert.Equal(t, nvidiacomv1alpha1.DynamoCheckpointStorageType("pvc"), updated.Status.StorageType) assert.Equal(t, nvidiacomv1alpha1.DynamoCheckpointStorageType("pvc"), updated.Status.StorageType)
assert.NotNil(t, updated.Status.CreatedAt) assert.NotNil(t, updated.Status.CreatedAt)
}) })
...@@ -542,6 +588,35 @@ func TestCheckpointReconciler_HandleCreating(t *testing.T) { ...@@ -542,6 +588,35 @@ func TestCheckpointReconciler_HandleCreating(t *testing.T) {
assert.Equal(t, nvidiacomv1alpha1.DynamoCheckpointPhaseCreating, updated.Status.Phase) assert.Equal(t, nvidiacomv1alpha1.DynamoCheckpointPhaseCreating, updated.Status.Phase)
}) })
t.Run("in-flight version changes do not relabel the running job's artifact", func(t *testing.T) {
ckpt := makeCreatingCkpt(testHash, defaultCheckpointJobName)
ckpt.Status.Location = "/checkpoints/" + testHash + "/versions/" + consts.DefaultCheckpointArtifactVersion
ckpt.Status.StorageType = "pvc"
ckpt.Annotations = map[string]string{consts.KubeAnnotationCheckpointArtifactVersion: "2"}
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: defaultCheckpointJobName,
Namespace: testNamespace,
Annotations: map[string]string{checkpointStatusAnnotation: checkpointStatusCompleted},
},
Status: batchv1.JobStatus{
Succeeded: 1,
Conditions: []batchv1.JobCondition{
{Type: batchv1.JobComplete, Status: corev1.ConditionTrue, LastTransitionTime: metav1.Now()},
},
},
}
r := makeCheckpointReconciler(s, ckpt, job)
_, err := r.handleCreating(ctx, ckpt)
require.NoError(t, err)
updated := &nvidiacomv1alpha1.DynamoCheckpoint{}
require.NoError(t, r.Get(ctx, types.NamespacedName{Name: testHash, Namespace: testNamespace}, updated))
assert.Equal(t, nvidiacomv1alpha1.DynamoCheckpointPhaseReady, updated.Status.Phase)
assert.Equal(t, "/checkpoints/"+testHash+"/versions/"+consts.DefaultCheckpointArtifactVersion, updated.Status.Location)
})
t.Run("succeeded count without complete condition keeps Creating phase", func(t *testing.T) { t.Run("succeeded count without complete condition keeps Creating phase", func(t *testing.T) {
ckpt := makeCreatingCkpt(testHash, "job-succeeded-not-complete") ckpt := makeCreatingCkpt(testHash, "job-succeeded-not-complete")
job := &batchv1.Job{ job := &batchv1.Job{
...@@ -558,7 +633,7 @@ func TestCheckpointReconciler_HandleCreating(t *testing.T) { ...@@ -558,7 +633,7 @@ func TestCheckpointReconciler_HandleCreating(t *testing.T) {
assert.Equal(t, nvidiacomv1alpha1.DynamoCheckpointPhaseCreating, updated.Status.Phase) assert.Equal(t, nvidiacomv1alpha1.DynamoCheckpointPhaseCreating, updated.Status.Phase)
}) })
t.Run("deleted job resets to Pending", func(t *testing.T) { t.Run("deleted job transitions to Failed without retrying", func(t *testing.T) {
ckpt := makeCreatingCkpt(testHash, "job-deleted") ckpt := makeCreatingCkpt(testHash, "job-deleted")
r := makeCheckpointReconciler(s, ckpt) // no job object r := makeCheckpointReconciler(s, ckpt) // no job object
...@@ -567,8 +642,9 @@ func TestCheckpointReconciler_HandleCreating(t *testing.T) { ...@@ -567,8 +642,9 @@ func TestCheckpointReconciler_HandleCreating(t *testing.T) {
updated := &nvidiacomv1alpha1.DynamoCheckpoint{} updated := &nvidiacomv1alpha1.DynamoCheckpoint{}
require.NoError(t, r.Get(ctx, types.NamespacedName{Name: testHash, Namespace: testNamespace}, updated)) require.NoError(t, r.Get(ctx, types.NamespacedName{Name: testHash, Namespace: testNamespace}, updated))
assert.Equal(t, nvidiacomv1alpha1.DynamoCheckpointPhasePending, updated.Status.Phase) assert.Equal(t, nvidiacomv1alpha1.DynamoCheckpointPhaseFailed, updated.Status.Phase)
assert.Empty(t, updated.Status.JobName) assert.Equal(t, "job-deleted", updated.Status.JobName)
assert.Equal(t, "checkpoint job was deleted", updated.Status.Message)
}) })
} }
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment