Unverified Commit 38bb9d37 authored by Schwinn Saereesitthipitak's avatar Schwinn Saereesitthipitak Committed by GitHub
Browse files

refactor: clean up checkpoint orchestration (#7309)


Signed-off-by: default avatarSchwinn Saereesitthipitak <schwinns@nvidia.com>
parent 9ea3acad
......@@ -272,6 +272,11 @@ func (in *DynamoCheckpointIdentity) DeepCopy() *DynamoCheckpointIdentity {
func (in *DynamoCheckpointJobConfig) DeepCopyInto(out *DynamoCheckpointJobConfig) {
*out = *in
in.PodTemplateSpec.DeepCopyInto(&out.PodTemplateSpec)
if in.SharedMemory != nil {
in, out := &in.SharedMemory, &out.SharedMemory
*out = new(SharedMemorySpec)
(*in).DeepCopyInto(*out)
}
if in.ActiveDeadlineSeconds != nil {
in, out := &in.ActiveDeadlineSeconds, &out.ActiveDeadlineSeconds
*out = new(int64)
......
......@@ -124,11 +124,12 @@ spec:
default: 3600
description: ActiveDeadlineSeconds specifies the maximum time the Job can run
format: int64
minimum: 1
type: integer
backoffLimit:
default: 3
description: BackoffLimit specifies the number of retries before marking the Job failed
description: 'Deprecated: BackoffLimit is ignored. Checkpoint Jobs never retry.'
format: int32
minimum: 0
type: integer
podTemplateSpec:
description: |-
......@@ -8154,10 +8155,28 @@ spec:
- containers
type: object
type: object
sharedMemory:
description: |-
SharedMemory controls the tmpfs mounted at /dev/shm for the checkpoint Job pod.
When omitted, checkpoint Jobs use the same default 8Gi tmpfs as Dynamo components.
properties:
disabled:
type: boolean
size:
anyOf:
- type: integer
- type: string
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
type: object
x-kubernetes-validations:
- message: sharedMemory.size must not be set when sharedMemory.disabled is true
rule: '!(self.disabled && has(self.size))'
ttlSecondsAfterFinished:
default: 300
description: TTLSecondsAfterFinished specifies how long to keep the Job after completion
format: int32
minimum: 0
type: integer
required:
- podTemplateSpec
......@@ -8170,7 +8189,7 @@ spec:
description: DynamoCheckpointStatus defines the observed state of DynamoCheckpoint
properties:
conditions:
description: Conditions represent the latest available observations of the checkpoint's state
description: 'DEPRECATED: Conditions are deprecated. Use status.phase instead.'
items:
description: Condition contains details for one aspect of the current state of this API Resource.
properties:
......
......@@ -678,8 +678,8 @@ spec:
properties:
checkpointRef:
description: |-
CheckpointRef references an existing Checkpoint CR to use
If specified, Identity is ignored and this checkpoint is used directly
CheckpointRef references an existing DynamoCheckpoint CR by metadata.name.
If specified, this service's Identity is ignored and the referenced checkpoint is used directly.
type: string
enabled:
default: false
......@@ -11211,6 +11211,9 @@ spec:
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
type: object
x-kubernetes-validations:
- message: sharedMemory.size must not be set when sharedMemory.disabled is true
rule: '!(self.disabled && has(self.size))'
subComponentType:
description: SubComponentType indicates the sub-role of this component (for example, "prefill").
type: string
......
......@@ -887,8 +887,8 @@ spec:
properties:
checkpointRef:
description: |-
CheckpointRef references an existing Checkpoint CR to use
If specified, Identity is ignored and this checkpoint is used directly
CheckpointRef references an existing DynamoCheckpoint CR by metadata.name.
If specified, this service's Identity is ignored and the referenced checkpoint is used directly.
type: string
enabled:
default: false
......@@ -11420,6 +11420,9 @@ spec:
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
type: object
x-kubernetes-validations:
- message: sharedMemory.size must not be set when sharedMemory.disabled is true
rule: '!(self.disabled && has(self.size))'
subComponentType:
description: SubComponentType indicates the sub-role of this component (for example, "prefill").
type: string
......@@ -11466,7 +11469,7 @@ spec:
description: IdentityHash is the computed hash of the checkpoint identity
type: string
ready:
description: Ready indicates if the checkpoint is ready for use
description: Ready indicates if the checkpoint was visible to the worker at startup
type: boolean
type: object
description: |-
......
......@@ -16,7 +16,7 @@
apiVersion: nvidia.com/v1alpha1
kind: DynamoCheckpoint
metadata:
name: vllm-llama3-8b-tp1
name: llama3-8b-bf16
spec:
# Identity - determines the checkpoint hash
identity:
......@@ -33,7 +33,6 @@ spec:
# Job configuration for checkpoint creation
job:
activeDeadlineSeconds: 3600
backoffLimit: 3
ttlSecondsAfterFinished: 300
podTemplateSpec:
spec:
......@@ -60,4 +59,3 @@ spec:
limits:
nvidia.com/gpu: 1
restartPolicy: Never
......@@ -242,17 +242,6 @@ These are injected into all components when the corresponding infrastructure ser
| --- | --- | --- | --- | --- |
| `OMPI_MCA_orte_keep_fqdn_hostnames` | Instructs OpenMPI to preserve FQDN hostnames for inter-node communication | `1` | `string` | Multinode deployments only |
### Checkpoint / Restore
These environment variables are injected when checkpoint/restore is enabled for a component.
| Variable | Purpose | Default | Type | Condition |
| --- | --- | --- | --- | --- |
| `DYN_CHECKPOINT_PATH` | Base directory where checkpoint data is stored | From operator checkpoint config `storage.pvc.basePath` | `string` | PVC storage type |
| `DYN_CHECKPOINT_LOCATION` | Full checkpoint URI (for non-PVC backends) | — | `string` | S3 or OCI storage type |
| `DYN_CHECKPOINT_HASH` | Identity hash that uniquely identifies the checkpoint | — | `string` | Always set when checkpoint is enabled |
| `SKIP_WAIT_FOR_CHECKPOINT` | Skips the checkpoint readiness polling loop; checks once and proceeds | — | `string` | Set on restored and DGD pods |
## Service Accounts
The following component types automatically receive dedicated service accounts:
......
......@@ -30,6 +30,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
)
......@@ -80,34 +81,103 @@ func testInfo() *CheckpointInfo {
return &CheckpointInfo{Enabled: true, Hash: testHash}
}
// --- Helper function tests ---
type createHookClient struct {
client.Client
onCreate func(ctx context.Context, obj client.Object) error
}
func TestHelpers(t *testing.T) {
// GetPVCBasePath
assert.Equal(t, "", GetPVCBasePath(nil))
assert.Equal(t, "/checkpoints", GetPVCBasePath(testPVCConfig()))
func (c *createHookClient) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error {
if c.onCreate != nil {
if err := c.onCreate(ctx, obj); err != nil {
return err
}
c.onCreate = nil
}
return c.Client.Create(ctx, obj, opts...)
}
// --- Resource helper tests ---
// getCheckpointInfoFromCheckpoint — ready
func TestHelpers(t *testing.T) {
// checkpointInfoFromObject — ready
hash, err := ComputeIdentityHash(testIdentity())
require.NoError(t, err)
ckpt := &nvidiacomv1alpha1.DynamoCheckpoint{
ObjectMeta: metav1.ObjectMeta{Name: "ckpt-abc"},
ObjectMeta: metav1.ObjectMeta{Name: hash},
Spec: nvidiacomv1alpha1.DynamoCheckpointSpec{Identity: testIdentity()},
Status: nvidiacomv1alpha1.DynamoCheckpointStatus{
Phase: nvidiacomv1alpha1.DynamoCheckpointPhaseReady, IdentityHash: testHash,
Location: "/checkpoints/" + testHash, StorageType: "pvc",
Phase: nvidiacomv1alpha1.DynamoCheckpointPhaseReady,
IdentityHash: hash,
Location: "/checkpoints/" + hash,
StorageType: "pvc",
},
}
info := getCheckpointInfoFromCheckpoint(ckpt)
info, err := checkpointInfoFromObject(ckpt)
require.NoError(t, err)
assert.True(t, info.Enabled)
assert.True(t, info.Ready)
assert.Equal(t, testHash, info.Hash)
assert.Equal(t, "/checkpoints/"+testHash, info.Location)
assert.Equal(t, hash, info.Hash)
assert.Equal(t, "/checkpoints/"+hash, info.Location)
assert.Equal(t, ckpt.Name, info.CheckpointName)
// getCheckpointInfoFromCheckpoint — not ready
// checkpointInfoFromObject — not ready
ckpt.Status.Phase = nvidiacomv1alpha1.DynamoCheckpointPhaseCreating
info = getCheckpointInfoFromCheckpoint(ckpt)
info, err = checkpointInfoFromObject(ckpt)
require.NoError(t, err)
assert.False(t, info.Ready)
}
func TestCreateOrGetAutoCheckpointDeduplicatesConcurrentSameHashCheckpoint(t *testing.T) {
ctx := context.Background()
s := testScheme()
identity := testIdentity()
hash, err := ComputeIdentityHash(identity)
require.NoError(t, err)
friendly := &nvidiacomv1alpha1.DynamoCheckpoint{
ObjectMeta: metav1.ObjectMeta{
Name: "friendly-checkpoint",
Namespace: testNamespace,
Labels: map[string]string{
consts.KubeLabelCheckpointHash: hash,
},
},
Spec: nvidiacomv1alpha1.DynamoCheckpointSpec{
Identity: identity,
Job: nvidiacomv1alpha1.DynamoCheckpointJobConfig{
PodTemplateSpec: corev1.PodTemplateSpec{},
},
},
Status: nvidiacomv1alpha1.DynamoCheckpointStatus{
IdentityHash: hash,
Phase: nvidiacomv1alpha1.DynamoCheckpointPhaseReady,
},
}
baseClient := fake.NewClientBuilder().WithScheme(s).Build()
c := &createHookClient{
Client: baseClient,
onCreate: func(ctx context.Context, obj client.Object) error {
_, ok := obj.(*nvidiacomv1alpha1.DynamoCheckpoint)
if !ok {
return nil
}
return baseClient.Create(ctx, friendly.DeepCopy())
},
}
ckpt, err := CreateOrGetAutoCheckpoint(ctx, c, testNamespace, identity, corev1.PodTemplateSpec{})
require.NoError(t, err)
assert.Equal(t, friendly.Name, ckpt.Name)
list := &nvidiacomv1alpha1.DynamoCheckpointList{}
require.NoError(t, baseClient.List(ctx, list))
require.Len(t, list.Items, 1)
assert.Equal(t, friendly.Name, list.Items[0].Name)
}
// --- Injection idempotency tests ---
func TestInjectionIdempotency(t *testing.T) {
......@@ -126,87 +196,40 @@ func TestInjectionIdempotency(t *testing.T) {
assert.Len(t, container.VolumeMounts, 2)
}
// --- InjectCheckpointEnvVars tests ---
func TestApplyCheckpointPodMetadata(t *testing.T) {
t.Run("checkpoint source metadata uses annotations for location and storage", func(t *testing.T) {
labels := map[string]string{}
annotations := map[string]string{}
func TestInjectCheckpointEnvVars(t *testing.T) {
t.Run("PVC storage injects PATH and HASH", func(t *testing.T) {
container := &corev1.Container{}
InjectCheckpointEnvVars(container, testInfo(), testPVCConfig())
ApplyCheckpointSourcePodMetadata(labels, annotations, testHash, "/checkpoints/"+testHash, "pvc")
envMap := make(map[string]string, len(container.Env))
for _, e := range container.Env {
envMap[e.Name] = e.Value
}
assert.Equal(t, "/checkpoints", envMap[consts.EnvCheckpointPath])
assert.Equal(t, testHash, envMap[consts.EnvCheckpointHash])
_, hasLocation := envMap[consts.EnvCheckpointLocation]
assert.False(t, hasLocation)
assert.Equal(t, consts.KubeLabelValueTrue, labels[consts.KubeLabelIsCheckpointSource])
assert.Equal(t, testHash, labels[consts.KubeLabelCheckpointHash])
assert.Equal(t, "/checkpoints/"+testHash, annotations[consts.KubeAnnotationCheckpointLocation])
assert.Equal(t, "pvc", annotations[consts.KubeAnnotationCheckpointStorageType])
})
t.Run("S3 storage injects LOCATION and HASH", func(t *testing.T) {
container := &corev1.Container{}
info := &CheckpointInfo{Enabled: true, Hash: testHash, Location: "s3://bucket/" + testHash + ".tar"}
config := &configv1alpha1.CheckpointConfiguration{
Storage: configv1alpha1.CheckpointStorageConfiguration{
Type: configv1alpha1.CheckpointStorageTypeS3,
S3: configv1alpha1.CheckpointS3Config{URI: "s3://bucket"},
},
}
InjectCheckpointEnvVars(container, info, config)
envMap := make(map[string]string, len(container.Env))
for _, e := range container.Env {
envMap[e.Name] = e.Value
t.Run("restore metadata clears stale values when checkpoint is not ready", func(t *testing.T) {
labels := map[string]string{
consts.KubeLabelIsRestoreTarget: consts.KubeLabelValueTrue,
consts.KubeLabelCheckpointHash: "stale-hash",
}
assert.Equal(t, "s3://bucket/"+testHash+".tar", envMap[consts.EnvCheckpointLocation])
assert.Equal(t, testHash, envMap[consts.EnvCheckpointHash])
})
t.Run("disabled is a no-op", func(t *testing.T) {
container := &corev1.Container{}
InjectCheckpointEnvVars(container, &CheckpointInfo{Enabled: false}, testPVCConfig())
assert.Empty(t, container.Env)
})
t.Run("preserves existing env vars", func(t *testing.T) {
container := &corev1.Container{Env: []corev1.EnvVar{{Name: "EXISTING", Value: "keep"}}}
InjectCheckpointEnvVars(container, testInfo(), testPVCConfig())
envMap := make(map[string]string, len(container.Env))
for _, e := range container.Env {
envMap[e.Name] = e.Value
annotations := map[string]string{
consts.KubeAnnotationCheckpointLocation: "/checkpoints/stale-hash",
consts.KubeAnnotationCheckpointStorageType: "pvc",
}
assert.Equal(t, "keep", envMap["EXISTING"])
assert.Equal(t, testHash, envMap[consts.EnvCheckpointHash])
})
}
// --- InjectCheckpointLabelsFromConfig tests ---
func TestInjectCheckpointLabelsFromConfig(t *testing.T) {
// Disabled/nil configs are no-ops
for _, cfg := range []*nvidiacomv1alpha1.ServiceCheckpointConfig{nil, {Enabled: false}} {
labels := map[string]string{"existing": "value"}
result, err := InjectCheckpointLabelsFromConfig(labels, cfg)
require.NoError(t, err)
assert.Equal(t, map[string]string{"existing": "value"}, result)
}
ApplyRestorePodMetadata(labels, annotations, &CheckpointInfo{Enabled: true, Ready: false})
// Enabled with identity adds hash label
identity := testIdentity()
result, err := InjectCheckpointLabelsFromConfig(nil, &nvidiacomv1alpha1.ServiceCheckpointConfig{
Enabled: true, Identity: &identity,
_, hasRestoreTarget := labels[consts.KubeLabelIsRestoreTarget]
_, hasCheckpointHash := labels[consts.KubeLabelCheckpointHash]
_, hasLocation := annotations[consts.KubeAnnotationCheckpointLocation]
_, hasStorageType := annotations[consts.KubeAnnotationCheckpointStorageType]
assert.False(t, hasRestoreTarget)
assert.False(t, hasCheckpointHash)
assert.False(t, hasLocation)
assert.False(t, hasStorageType)
})
require.NoError(t, err)
hash, ok := result[consts.KubeLabelCheckpointHash]
assert.True(t, ok)
assert.Len(t, hash, 16)
// Enabled without identity does not add hash
result, err = InjectCheckpointLabelsFromConfig(map[string]string{}, &nvidiacomv1alpha1.ServiceCheckpointConfig{Enabled: true})
require.NoError(t, err)
_, ok = result[consts.KubeLabelCheckpointHash]
assert.False(t, ok)
}
// --- InjectCheckpointIntoPodSpec tests ---
......@@ -251,7 +274,7 @@ func TestInjectCheckpointIntoPodSpec(t *testing.T) {
require.NotNil(t, podSpec.SecurityContext.SeccompProfile)
})
t.Run("PVC storage injects volumes, mounts, and env vars", func(t *testing.T) {
t.Run("PVC storage injects volumes and mounts", func(t *testing.T) {
podSpec := testPodSpec()
require.NoError(t, InjectCheckpointIntoPodSpec(podSpec, testInfo(), testPVCConfig()))
......@@ -262,6 +285,20 @@ func TestInjectCheckpointIntoPodSpec(t *testing.T) {
if v.Name == consts.CheckpointVolumeName {
assert.Equal(t, "snapshot-pvc", v.PersistentVolumeClaim.ClaimName)
}
if v.Name == consts.PodInfoVolumeName {
require.NotNil(t, v.DownwardAPI)
fieldPaths := map[string]string{}
for _, item := range v.DownwardAPI.Items {
if item.FieldRef != nil {
fieldPaths[item.Path] = item.FieldRef.FieldPath
}
}
assert.Equal(t, "metadata.labels['"+consts.KubeLabelDynamoNamespace+"']", fieldPaths[consts.PodInfoFileDynNamespace])
assert.Equal(t, "metadata.labels['"+consts.KubeLabelDynamoWorkerHash+"']", fieldPaths[consts.PodInfoFileDynNamespaceWorkerSuffix])
assert.Equal(t, "metadata.labels['"+consts.KubeLabelDynamoComponentType+"']", fieldPaths[consts.PodInfoFileDynComponent])
assert.Equal(t, "metadata.labels['"+consts.KubeLabelDynamoGraphDeploymentName+"']", fieldPaths[consts.PodInfoFileDynParentDGDName])
assert.Equal(t, consts.PodInfoFieldPodNamespace, fieldPaths[consts.PodInfoFileDynParentDGDNamespace])
}
}
assert.True(t, volNames[consts.CheckpointVolumeName])
assert.True(t, volNames[consts.PodInfoVolumeName])
......@@ -273,14 +310,6 @@ func TestInjectCheckpointIntoPodSpec(t *testing.T) {
}
assert.Equal(t, "/checkpoints", mountPaths[consts.CheckpointVolumeName])
assert.Equal(t, consts.PodInfoMountPath, mountPaths[consts.PodInfoVolumeName])
// Env
envMap := make(map[string]string, len(podSpec.Containers[0].Env))
for _, e := range podSpec.Containers[0].Env {
envMap[e.Name] = e.Value
}
assert.Equal(t, "/checkpoints", envMap[consts.EnvCheckpointPath])
assert.Equal(t, testHash, envMap[consts.EnvCheckpointHash])
})
t.Run("computes hash from identity when hash is empty", func(t *testing.T) {
......@@ -328,9 +357,6 @@ func TestInjectCheckpointIntoPodSpec(t *testing.T) {
{"PVC name missing", testPodSpec(), testInfo(), &configv1alpha1.CheckpointConfiguration{
Storage: configv1alpha1.CheckpointStorageConfiguration{Type: "pvc", PVC: configv1alpha1.CheckpointPVCConfig{BasePath: "/checkpoints"}},
}, "no PVC name"},
{"PVC base path missing", testPodSpec(), testInfo(), &configv1alpha1.CheckpointConfiguration{
Storage: configv1alpha1.CheckpointStorageConfiguration{Type: "pvc", PVC: configv1alpha1.CheckpointPVCConfig{PVCName: "snapshot-pvc"}},
}, "no PVC base path"},
{"S3 URI missing", testPodSpec(), testInfo(), &configv1alpha1.CheckpointConfiguration{
Storage: configv1alpha1.CheckpointStorageConfiguration{Type: "s3"},
}, "S3"},
......@@ -370,39 +396,48 @@ func TestResolveCheckpointForService(t *testing.T) {
})
t.Run("checkpointRef resolves ready CR", func(t *testing.T) {
hash, err := ComputeIdentityHash(testIdentity())
require.NoError(t, err)
ckpt := &nvidiacomv1alpha1.DynamoCheckpoint{
ObjectMeta: metav1.ObjectMeta{Name: "my-ckpt", Namespace: testNamespace},
ObjectMeta: metav1.ObjectMeta{Name: hash, Namespace: testNamespace},
Spec: nvidiacomv1alpha1.DynamoCheckpointSpec{Identity: testIdentity()},
Status: nvidiacomv1alpha1.DynamoCheckpointStatus{
Phase: nvidiacomv1alpha1.DynamoCheckpointPhaseReady, IdentityHash: testHash,
Location: "/checkpoints/" + testHash, StorageType: "pvc",
Phase: nvidiacomv1alpha1.DynamoCheckpointPhaseReady,
IdentityHash: hash,
Location: "/checkpoints/" + hash,
StorageType: "pvc",
},
}
c := fake.NewClientBuilder().WithScheme(s).WithObjects(ckpt).WithStatusSubresource(ckpt).Build()
ref := "my-ckpt"
ref := hash
info, err := ResolveCheckpointForService(ctx, c, testNamespace, &nvidiacomv1alpha1.ServiceCheckpointConfig{
Enabled: true, CheckpointRef: &ref,
})
require.NoError(t, err)
assert.True(t, info.Exists)
assert.True(t, info.Ready)
assert.Equal(t, testHash, info.Hash)
assert.Equal(t, "/checkpoints/"+testHash, info.Location)
assert.Equal(t, hash, info.Hash)
assert.Equal(t, "/checkpoints/"+hash, info.Location)
assert.Equal(t, hash, info.CheckpointName)
})
t.Run("checkpointRef resolves not-ready CR", func(t *testing.T) {
hash, err := ComputeIdentityHash(testIdentity())
require.NoError(t, err)
ckpt := &nvidiacomv1alpha1.DynamoCheckpoint{
ObjectMeta: metav1.ObjectMeta{Name: "pending-ckpt", Namespace: testNamespace},
ObjectMeta: metav1.ObjectMeta{Name: hash, Namespace: testNamespace},
Spec: nvidiacomv1alpha1.DynamoCheckpointSpec{Identity: testIdentity()},
Status: nvidiacomv1alpha1.DynamoCheckpointStatus{Phase: nvidiacomv1alpha1.DynamoCheckpointPhaseCreating},
}
c := fake.NewClientBuilder().WithScheme(s).WithObjects(ckpt).WithStatusSubresource(ckpt).Build()
ref := "pending-ckpt"
ref := hash
info, err := ResolveCheckpointForService(ctx, c, testNamespace, &nvidiacomv1alpha1.ServiceCheckpointConfig{
Enabled: true, CheckpointRef: &ref,
})
require.NoError(t, err)
assert.True(t, info.Exists)
assert.False(t, info.Ready)
})
......@@ -415,20 +450,40 @@ func TestResolveCheckpointForService(t *testing.T) {
assert.ErrorContains(t, err, "nonexistent")
})
t.Run("identity lookup finds existing checkpoint by label", func(t *testing.T) {
t.Run("checkpointRef resolves human-readable checkpoint names", func(t *testing.T) {
hash, err := ComputeIdentityHash(testIdentity())
require.NoError(t, err)
ckpt := &nvidiacomv1alpha1.DynamoCheckpoint{
ObjectMeta: metav1.ObjectMeta{Name: "not-the-hash", Namespace: testNamespace},
Spec: nvidiacomv1alpha1.DynamoCheckpointSpec{Identity: testIdentity()},
Status: nvidiacomv1alpha1.DynamoCheckpointStatus{
IdentityHash: hash,
},
}
c := fake.NewClientBuilder().WithScheme(s).WithObjects(ckpt).WithStatusSubresource(ckpt).Build()
ref := "not-the-hash"
info, err := ResolveCheckpointForService(ctx, c, testNamespace, &nvidiacomv1alpha1.ServiceCheckpointConfig{
Enabled: true, CheckpointRef: &ref,
})
require.NoError(t, err)
assert.Equal(t, "not-the-hash", info.CheckpointName)
assert.Equal(t, hash, info.Hash)
})
t.Run("identity lookup finds existing checkpoint by identity hash", func(t *testing.T) {
identity := testIdentity()
hash, err := ComputeIdentityHash(identity)
require.NoError(t, err)
ckpt := &nvidiacomv1alpha1.DynamoCheckpoint{
ObjectMeta: metav1.ObjectMeta{
Name: hash, Namespace: testNamespace,
Labels: map[string]string{consts.KubeLabelCheckpointHash: hash},
},
ObjectMeta: metav1.ObjectMeta{Name: "friendly-name", Namespace: testNamespace},
Spec: nvidiacomv1alpha1.DynamoCheckpointSpec{Identity: identity},
Status: nvidiacomv1alpha1.DynamoCheckpointStatus{
Phase: nvidiacomv1alpha1.DynamoCheckpointPhaseReady, IdentityHash: hash,
Location: "/checkpoints/" + hash, StorageType: "pvc",
Phase: nvidiacomv1alpha1.DynamoCheckpointPhaseReady,
IdentityHash: hash,
Location: "/checkpoints/" + hash,
StorageType: "pvc",
},
}
c := fake.NewClientBuilder().WithScheme(s).WithObjects(ckpt).WithStatusSubresource(ckpt).Build()
......@@ -437,8 +492,34 @@ func TestResolveCheckpointForService(t *testing.T) {
Enabled: true, Identity: &identity,
})
require.NoError(t, err)
assert.True(t, info.Exists)
assert.True(t, info.Ready)
assert.Equal(t, hash, info.Hash)
assert.Equal(t, "friendly-name", info.CheckpointName)
})
t.Run("identity lookup returns existing not-ready checkpoint", func(t *testing.T) {
identity := testIdentity()
hash, err := ComputeIdentityHash(identity)
require.NoError(t, err)
ckpt := &nvidiacomv1alpha1.DynamoCheckpoint{
ObjectMeta: metav1.ObjectMeta{Name: "friendly-name", Namespace: testNamespace},
Spec: nvidiacomv1alpha1.DynamoCheckpointSpec{Identity: identity},
Status: nvidiacomv1alpha1.DynamoCheckpointStatus{
Phase: nvidiacomv1alpha1.DynamoCheckpointPhaseCreating,
IdentityHash: hash,
},
}
c := fake.NewClientBuilder().WithScheme(s).WithObjects(ckpt).WithStatusSubresource(ckpt).Build()
info, err := ResolveCheckpointForService(ctx, c, testNamespace, &nvidiacomv1alpha1.ServiceCheckpointConfig{
Enabled: true, Identity: &identity,
})
require.NoError(t, err)
assert.True(t, info.Exists)
assert.False(t, info.Ready)
assert.Equal(t, hash, info.Hash)
})
t.Run("identity lookup returns not-ready when no CR found", func(t *testing.T) {
......@@ -448,6 +529,7 @@ func TestResolveCheckpointForService(t *testing.T) {
Enabled: true, Identity: &identity,
})
require.NoError(t, err)
assert.False(t, info.Exists)
assert.False(t, info.Ready)
assert.Len(t, info.Hash, 16)
})
......
......@@ -18,288 +18,151 @@
package checkpoint
import (
"context"
"fmt"
configv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/config/v1alpha1"
nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
commonconsts "github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
)
// getCheckpointInfoFromCheckpoint extracts CheckpointInfo from a DynamoCheckpoint CR
func getCheckpointInfoFromCheckpoint(ckpt *nvidiacomv1alpha1.DynamoCheckpoint) *CheckpointInfo {
info := &CheckpointInfo{
Enabled: true,
CheckpointName: ckpt.Name,
Hash: ckpt.Status.IdentityHash,
Location: ckpt.Status.Location,
StorageType: ckpt.Status.StorageType,
Ready: ckpt.Status.Phase == nvidiacomv1alpha1.DynamoCheckpointPhaseReady,
Identity: &ckpt.Spec.Identity,
}
return info
}
// getPVCBasePath returns the PVC base path from storage config.
// Only applicable for PVC storage type
func getPVCBasePath(storageConfig *configv1alpha1.CheckpointStorageConfiguration) string {
if storageConfig != nil && storageConfig.PVC.BasePath != "" {
return storageConfig.PVC.BasePath
}
return ""
}
// GetPVCBasePath returns the configured PVC base path from controller config.
// This is used by both CheckpointReconciler and DynamoGraphDeploymentReconciler.
// Only applicable for PVC storage type.
func GetPVCBasePath(config *configv1alpha1.CheckpointConfiguration) string {
if config != nil {
return getPVCBasePath(&config.Storage)
}
return ""
}
// CheckpointInfo contains resolved checkpoint information for a DGD service
type CheckpointInfo struct {
// Enabled indicates if checkpointing is enabled
Enabled bool
// Identity is the resolved checkpoint identity (model, framework, etc.)
Identity *nvidiacomv1alpha1.DynamoCheckpointIdentity
// Hash is the computed identity hash
Hash string
// Location is the full URI/path in the storage backend
Location string
// StorageType is the storage backend type (pvc, s3, oci)
StorageType nvidiacomv1alpha1.DynamoCheckpointStorageType
// CheckpointName is the name of the Checkpoint CR
CheckpointName string
// Ready indicates if the checkpoint is ready for use
Ready bool
}
// ResolveCheckpointForService resolves checkpoint information for a DGD service.
// It handles both checkpointRef (direct reference) and identity-based lookup.
// Returns CheckpointInfo with the resolved identity populated.
func ResolveCheckpointForService(
ctx context.Context,
c client.Client,
namespace string,
config *nvidiacomv1alpha1.ServiceCheckpointConfig,
) (*CheckpointInfo, error) {
if config == nil || !config.Enabled {
return &CheckpointInfo{Enabled: false}, nil
}
func ApplyCheckpointSourcePodMetadata(
labels map[string]string,
annotations map[string]string,
hash string,
location string,
storageType nvidiacomv1alpha1.DynamoCheckpointStorageType,
) {
delete(labels, commonconsts.KubeLabelIsRestoreTarget)
delete(labels, commonconsts.KubeLabelCheckpointHash)
delete(annotations, commonconsts.KubeAnnotationCheckpointLocation)
delete(annotations, commonconsts.KubeAnnotationCheckpointStorageType)
// If a direct checkpoint reference is provided, use it
if config.CheckpointRef != nil && *config.CheckpointRef != "" {
ckpt := &nvidiacomv1alpha1.DynamoCheckpoint{}
err := c.Get(ctx, types.NamespacedName{
Namespace: namespace,
Name: *config.CheckpointRef,
}, ckpt)
if err != nil {
return nil, fmt.Errorf("failed to get referenced checkpoint %s: %w", *config.CheckpointRef, err)
labels[commonconsts.KubeLabelIsCheckpointSource] = commonconsts.KubeLabelValueTrue
if hash != "" {
labels[commonconsts.KubeLabelCheckpointHash] = hash
}
// Extract all checkpoint info including identity from the CR
return getCheckpointInfoFromCheckpoint(ckpt), nil
if location != "" {
annotations[commonconsts.KubeAnnotationCheckpointLocation] = location
}
// Otherwise, compute hash from identity and look up checkpoint
if config.Identity == nil {
return nil, fmt.Errorf("checkpoint enabled but no checkpointRef or identity provided")
if storageType != "" {
annotations[commonconsts.KubeAnnotationCheckpointStorageType] = string(storageType)
}
hash, err := ComputeIdentityHash(*config.Identity)
if err != nil {
return nil, fmt.Errorf("failed to compute identity hash: %w", err)
}
info := &CheckpointInfo{
Enabled: true,
Identity: config.Identity,
Hash: hash,
}
// Look for existing checkpoint with matching hash using label selector
checkpointList := &nvidiacomv1alpha1.DynamoCheckpointList{}
if err = c.List(ctx, checkpointList,
client.InNamespace(namespace),
client.MatchingLabels{consts.KubeLabelCheckpointHash: info.Hash},
); err != nil {
return nil, fmt.Errorf("failed to list checkpoints: %w", err)
}
// Return the first matching checkpoint (there should be at most one per hash)
if len(checkpointList.Items) > 0 {
ckpt := &checkpointList.Items[0]
// Merge checkpoint info from the CR (overrides the computed values)
foundInfo := getCheckpointInfoFromCheckpoint(ckpt)
// Keep the hash and identity we computed from the config
foundInfo.Hash = info.Hash
foundInfo.Identity = info.Identity
return foundInfo, nil
}
// No existing checkpoint found
// In Auto mode, the controller should create one
return info, nil
}
// InjectCheckpointEnvVars adds checkpoint-related environment variables to a restored/DGD container.
// Sets PATH and HASH so the restored process knows its checkpoint identity.
// DYN_CHECKPOINT_LOCATION is reserved for future S3/OCI support.
func InjectCheckpointEnvVars(container *corev1.Container, info *CheckpointInfo, checkpointConfig *configv1alpha1.CheckpointConfiguration) {
if !info.Enabled {
return
}
func ApplyRestorePodMetadata(labels map[string]string, annotations map[string]string, checkpointInfo *CheckpointInfo) {
delete(labels, commonconsts.KubeLabelIsRestoreTarget)
delete(labels, commonconsts.KubeLabelCheckpointHash)
delete(annotations, commonconsts.KubeAnnotationCheckpointLocation)
delete(annotations, commonconsts.KubeAnnotationCheckpointStorageType)
var envVars []corev1.EnvVar
// For PVC storage: inject base path so the restored process knows its checkpoint location.
// For S3/OCI (future): inject DYN_CHECKPOINT_LOCATION directly.
storageType := configv1alpha1.CheckpointStorageTypePVC
if checkpointConfig != nil && checkpointConfig.Storage.Type != "" {
storageType = checkpointConfig.Storage.Type
if checkpointInfo == nil || !checkpointInfo.Enabled || !checkpointInfo.Ready {
return
}
switch storageType {
case configv1alpha1.CheckpointStorageTypePVC:
basePath := ""
if checkpointConfig != nil {
basePath = getPVCBasePath(&checkpointConfig.Storage)
}
envVars = append(envVars, corev1.EnvVar{
Name: consts.EnvCheckpointPath,
Value: basePath,
})
default:
// S3/OCI: inject full location URI directly
if info.Location != "" {
envVars = append(envVars, corev1.EnvVar{
Name: consts.EnvCheckpointLocation,
Value: info.Location,
})
labels[commonconsts.KubeLabelIsRestoreTarget] = commonconsts.KubeLabelValueTrue
if checkpointInfo.Hash != "" {
labels[commonconsts.KubeLabelCheckpointHash] = checkpointInfo.Hash
}
if checkpointInfo.Location != "" {
annotations[commonconsts.KubeAnnotationCheckpointLocation] = checkpointInfo.Location
}
if info.Hash != "" {
envVars = append(envVars, corev1.EnvVar{
Name: consts.EnvCheckpointHash,
Value: info.Hash,
})
if checkpointInfo.StorageType != "" {
annotations[commonconsts.KubeAnnotationCheckpointStorageType] = string(checkpointInfo.StorageType)
}
// Prepend checkpoint env vars to ensure they're available
container.Env = append(envVars, container.Env...)
}
// InjectCheckpointVolume adds the checkpoint PVC volume to a pod spec
func InjectCheckpointVolume(podSpec *corev1.PodSpec, pvcName string) {
// Check if volume already exists
for _, v := range podSpec.Volumes {
if v.Name == consts.CheckpointVolumeName {
for _, volume := range podSpec.Volumes {
if volume.Name == commonconsts.CheckpointVolumeName {
return
}
}
podSpec.Volumes = append(podSpec.Volumes, corev1.Volume{
Name: consts.CheckpointVolumeName,
Name: commonconsts.CheckpointVolumeName,
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: pvcName,
ReadOnly: false, // CRIU needs write access during restore
ReadOnly: false,
},
},
})
}
// InjectCheckpointVolumeMount adds the checkpoint volume mount to a container
func InjectCheckpointVolumeMount(container *corev1.Container, basePath string) {
// Check if mount already exists
for _, m := range container.VolumeMounts {
if m.Name == consts.CheckpointVolumeName {
for _, mount := range container.VolumeMounts {
if mount.Name == commonconsts.CheckpointVolumeName {
return
}
}
container.VolumeMounts = append(container.VolumeMounts, corev1.VolumeMount{
Name: consts.CheckpointVolumeName,
Name: commonconsts.CheckpointVolumeName,
MountPath: basePath,
ReadOnly: false, // CRIU needs write access for restore.log and restore-criu.conf
ReadOnly: false,
})
}
// InjectPodInfoVolume adds a Downward API volume for pod identity and DGD info.
// This is critical for CRIU checkpoint/restore scenarios where environment variables
// contain stale values from the checkpoint source pod. The Downward API files
// always reflect the current pod's identity and DGD configuration.
func InjectPodInfoVolume(podSpec *corev1.PodSpec) {
// Check if volume already exists
for _, v := range podSpec.Volumes {
if v.Name == consts.PodInfoVolumeName {
for _, volume := range podSpec.Volumes {
if volume.Name == commonconsts.PodInfoVolumeName {
return
}
}
podSpec.Volumes = append(podSpec.Volumes, corev1.Volume{
Name: consts.PodInfoVolumeName,
Name: commonconsts.PodInfoVolumeName,
VolumeSource: corev1.VolumeSource{
DownwardAPI: &corev1.DownwardAPIVolumeSource{
Items: []corev1.DownwardAPIVolumeFile{
// Pod identity fields
{
Path: "pod_name",
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: consts.PodInfoFieldPodName,
FieldPath: commonconsts.PodInfoFieldPodName,
},
},
{
Path: "pod_uid",
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: consts.PodInfoFieldPodUID,
FieldPath: commonconsts.PodInfoFieldPodUID,
},
},
{
Path: "pod_namespace",
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: consts.PodInfoFieldPodNamespace,
FieldPath: commonconsts.PodInfoFieldPodNamespace,
},
},
// DGD info from annotations (for CRIU restore)
{
Path: consts.PodInfoFileDynNamespace,
Path: commonconsts.PodInfoFileDynNamespace,
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.annotations['" + consts.AnnotationDynNamespace + "']",
FieldPath: "metadata.labels['" + commonconsts.KubeLabelDynamoNamespace + "']",
},
},
{
Path: consts.PodInfoFileDynComponent,
Path: commonconsts.PodInfoFileDynNamespaceWorkerSuffix,
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.annotations['" + consts.AnnotationDynComponent + "']",
FieldPath: "metadata.labels['" + commonconsts.KubeLabelDynamoWorkerHash + "']",
},
},
{
Path: consts.PodInfoFileDynParentDGDName,
Path: commonconsts.PodInfoFileDynComponent,
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.annotations['" + consts.AnnotationDynParentDGDName + "']",
FieldPath: "metadata.labels['" + commonconsts.KubeLabelDynamoComponentType + "']",
},
},
{
Path: consts.PodInfoFileDynParentDGDNS,
Path: commonconsts.PodInfoFileDynParentDGDName,
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.annotations['" + consts.AnnotationDynParentDGDNS + "']",
FieldPath: "metadata.labels['" + commonconsts.KubeLabelDynamoGraphDeploymentName + "']",
},
},
{
Path: consts.PodInfoFileDynDiscoveryBackend,
Path: commonconsts.PodInfoFileDynParentDGDNamespace,
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.annotations['" + consts.AnnotationDynDiscoveryBackend + "']",
FieldPath: commonconsts.PodInfoFieldPodNamespace,
},
},
},
......@@ -308,35 +171,20 @@ func InjectPodInfoVolume(podSpec *corev1.PodSpec) {
})
}
// InjectPodInfoVolumeMount adds the Downward API volume mount to a container.
func InjectPodInfoVolumeMount(container *corev1.Container) {
// Check if mount already exists
for _, m := range container.VolumeMounts {
if m.Name == consts.PodInfoVolumeName {
for _, mount := range container.VolumeMounts {
if mount.Name == commonconsts.PodInfoVolumeName {
return
}
}
container.VolumeMounts = append(container.VolumeMounts, corev1.VolumeMount{
Name: consts.PodInfoVolumeName,
MountPath: consts.PodInfoMountPath,
Name: commonconsts.PodInfoVolumeName,
MountPath: commonconsts.PodInfoMountPath,
ReadOnly: true,
})
}
// InjectCheckpointIntoPodSpec injects checkpoint configuration into a pod spec for
// external restore via the snapshot DaemonSet. The pod image is expected to be a
// runtime-compatible restore image (runtime + CRIU tooling). For ready checkpoints,
// the operator overrides command to `sleep infinity` so the watcher can trigger
// external restore via nsenter + nsrestore.
//
// Modifications applied:
// 1. Security context - seccomp profile (io_uring blocking, matches checkpoint environment)
// 2. Environment variables - checkpoint path and hash
// 3. Storage configuration - checkpoint PVC and Downward API (pod identity)
//
// No hostIPC, no privileged mode — those are only needed when CRIU runs inside the
// container. With external restore, all privilege lives in the DaemonSet.
func InjectCheckpointIntoPodSpec(
podSpec *corev1.PodSpec,
checkpointInfo *CheckpointInfo,
......@@ -351,6 +199,7 @@ func InjectCheckpointIntoPodSpec(
if info.Identity == nil {
return fmt.Errorf("checkpoint enabled but identity is nil and hash is not set")
}
hash, err := ComputeIdentityHash(*info.Identity)
if err != nil {
return fmt.Errorf("failed to compute identity hash: %w", err)
......@@ -358,10 +207,9 @@ func InjectCheckpointIntoPodSpec(
info.Hash = hash
}
// Find the main container (needed for volume mounts and env vars)
var mainContainer *corev1.Container
for i := range podSpec.Containers {
if podSpec.Containers[i].Name == consts.MainContainerName {
if podSpec.Containers[i].Name == commonconsts.MainContainerName {
mainContainer = &podSpec.Containers[i]
break
}
......@@ -373,26 +221,20 @@ func InjectCheckpointIntoPodSpec(
return fmt.Errorf("no container found to inject checkpoint config")
}
// When a ready checkpoint exists, override the container command to sleep infinity.
// The DaemonSet watcher detects this pod via the checkpoint-restore label and
// performs external restore (nsenter + nsrestore). When no checkpoint is ready,
// the original command runs (cold start).
if info.Ready {
mainContainer.Command = []string{"sleep", "infinity"}
mainContainer.Args = nil
}
// Seccomp profile to match checkpoint environment (blocks io_uring syscalls)
if podSpec.SecurityContext == nil {
podSpec.SecurityContext = &corev1.PodSecurityContext{}
}
podSpec.SecurityContext.SeccompProfile = &corev1.SeccompProfile{
Type: corev1.SeccompProfileTypeLocalhost,
LocalhostProfile: ptr.To(consts.SeccompProfilePath),
LocalhostProfile: ptr.To(commonconsts.SeccompProfilePath),
}
// Determine storage type and compute location/path
storageType := configv1alpha1.CheckpointStorageTypePVC // default
storageType := configv1alpha1.CheckpointStorageTypePVC
var storageConfig *configv1alpha1.CheckpointStorageConfiguration
if checkpointConfig != nil {
storageConfig = &checkpointConfig.Storage
......@@ -408,59 +250,30 @@ func InjectCheckpointIntoPodSpec(
return fmt.Errorf("S3 storage type selected but no S3 URI configured (set checkpoint.storage.s3.uri)")
}
info.Location = fmt.Sprintf("%s/%s.tar", storageConfig.S3.URI, info.Hash)
case configv1alpha1.CheckpointStorageTypeOCI:
info.StorageType = nvidiacomv1alpha1.DynamoCheckpointStorageType(storageType)
if storageConfig == nil || storageConfig.OCI.URI == "" {
return fmt.Errorf("OCI storage type selected but no OCI URI configured (set checkpoint.storage.oci.uri)")
}
info.Location = fmt.Sprintf("%s:%s", storageConfig.OCI.URI, info.Hash)
default: // PVC
default:
info.StorageType = nvidiacomv1alpha1.DynamoCheckpointStorageType(storageType)
basePath := getPVCBasePath(storageConfig)
basePath := ""
if storageConfig != nil && storageConfig.PVC.BasePath != "" {
basePath = storageConfig.PVC.BasePath
}
if storageConfig == nil || storageConfig.PVC.PVCName == "" {
return fmt.Errorf("PVC storage type selected but no PVC name configured (set checkpoint.storage.pvc.pvcName)")
}
pvcName := storageConfig.PVC.PVCName
if basePath == "" {
return fmt.Errorf("PVC storage type selected but no PVC base path configured (set checkpoint.storage.pvc.basePath)")
}
info.Location = fmt.Sprintf("%s/%s", basePath, info.Hash)
InjectCheckpointVolume(podSpec, pvcName)
InjectCheckpointVolume(podSpec, storageConfig.PVC.PVCName)
InjectCheckpointVolumeMount(mainContainer, basePath)
}
// Downward API volume for pod identity after CRIU restore
InjectPodInfoVolume(podSpec)
InjectPodInfoVolumeMount(mainContainer)
// Checkpoint environment variables (path, hash)
InjectCheckpointEnvVars(mainContainer, info, checkpointConfig)
return nil
}
// InjectCheckpointLabelsFromConfig adds checkpoint identity labels to a label map based on config.
// Restore trigger labels are injected only when a concrete restore request is prepared.
func InjectCheckpointLabelsFromConfig(labels map[string]string, config *nvidiacomv1alpha1.ServiceCheckpointConfig) (map[string]string, error) {
if config == nil || !config.Enabled {
return labels, nil
}
if labels == nil {
labels = make(map[string]string)
}
// Compute hash from identity if provided
if config.Identity != nil {
hash, err := ComputeIdentityHash(*config.Identity)
if err != nil {
return nil, fmt.Errorf("failed to compute identity hash for labels: %w", err)
}
labels[consts.KubeLabelCheckpointHash] = hash
}
return labels, nil
}
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package checkpoint
import (
"context"
"fmt"
configv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/config/v1alpha1"
nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)
type CheckpointInfo struct {
Enabled bool
Exists bool
Identity *nvidiacomv1alpha1.DynamoCheckpointIdentity
Hash string
Location string
StorageType nvidiacomv1alpha1.DynamoCheckpointStorageType
CheckpointName string
Ready bool
}
func checkpointInfoFromObject(ckpt *nvidiacomv1alpha1.DynamoCheckpoint) (*CheckpointInfo, error) {
hash, err := checkpointIdentityHash(ckpt)
if err != nil {
return nil, err
}
return &CheckpointInfo{
Enabled: true,
Exists: true,
Identity: &ckpt.Spec.Identity,
Hash: hash,
Location: ckpt.Status.Location,
StorageType: ckpt.Status.StorageType,
CheckpointName: ckpt.Name,
Ready: ckpt.Status.Phase == nvidiacomv1alpha1.DynamoCheckpointPhaseReady,
}, nil
}
func ResolveCheckpointForService(
ctx context.Context,
c client.Client,
namespace string,
config *nvidiacomv1alpha1.ServiceCheckpointConfig,
) (*CheckpointInfo, error) {
switch {
case config == nil || !config.Enabled:
return &CheckpointInfo{Enabled: false}, nil
case config.CheckpointRef != nil && *config.CheckpointRef != "":
ckpt := &nvidiacomv1alpha1.DynamoCheckpoint{}
if err := c.Get(ctx, types.NamespacedName{
Namespace: namespace,
Name: *config.CheckpointRef,
}, ckpt); err != nil {
return nil, fmt.Errorf("failed to get referenced checkpoint %s: %w", *config.CheckpointRef, err)
}
return checkpointInfoFromObject(ckpt)
case config.Identity == nil:
return nil, fmt.Errorf("checkpoint enabled but no checkpointRef or identity provided")
}
hash, err := ComputeIdentityHash(*config.Identity)
if err != nil {
return nil, fmt.Errorf("failed to compute identity hash: %w", err)
}
existing, err := FindCheckpointByIdentityHash(ctx, c, namespace, hash, "")
if err != nil {
return nil, err
}
if existing == nil {
return &CheckpointInfo{
Enabled: true,
Identity: config.Identity,
Hash: hash,
}, nil
}
info, err := checkpointInfoFromObject(existing)
if err != nil {
return nil, err
}
info.Identity = config.Identity
return info, nil
}
func ResolveCheckpointStorage(
hash string,
config *configv1alpha1.CheckpointConfiguration,
) (string, nvidiacomv1alpha1.DynamoCheckpointStorageType, error) {
storageType := configv1alpha1.CheckpointStorageTypePVC
if config != nil && config.Storage.Type != "" {
storageType = config.Storage.Type
}
switch storageType {
case configv1alpha1.CheckpointStorageTypeS3:
if config == nil || config.Storage.S3.URI == "" {
return "", "", fmt.Errorf("S3 storage type selected but no S3 URI configured (set checkpoint.storage.s3.uri)")
}
return fmt.Sprintf("%s/%s.tar", config.Storage.S3.URI, hash), nvidiacomv1alpha1.DynamoCheckpointStorageType(storageType), nil
case configv1alpha1.CheckpointStorageTypeOCI:
if config == nil || config.Storage.OCI.URI == "" {
return "", "", fmt.Errorf("OCI storage type selected but no OCI URI configured (set checkpoint.storage.oci.uri)")
}
return fmt.Sprintf("%s:%s", config.Storage.OCI.URI, hash), nvidiacomv1alpha1.DynamoCheckpointStorageType(storageType), nil
default:
if config == nil || config.Storage.PVC.BasePath == "" {
return "", "", fmt.Errorf("PVC storage type selected but no PVC base path configured (set checkpoint.storage.pvc.basePath)")
}
return fmt.Sprintf("%s/%s", config.Storage.PVC.BasePath, hash), nvidiacomv1alpha1.DynamoCheckpointStorageType(storageType), nil
}
}
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package checkpoint
import (
"context"
"fmt"
nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)
func checkpointIdentityHash(ckpt *nvidiacomv1alpha1.DynamoCheckpoint) (string, error) {
if ckpt.Status.IdentityHash != "" {
return ckpt.Status.IdentityHash, nil
}
hash, err := ComputeIdentityHash(ckpt.Spec.Identity)
if err != nil {
return "", fmt.Errorf("failed to compute checkpoint hash for %s: %w", ckpt.Name, err)
}
return hash, nil
}
func FindCheckpointByIdentityHash(
ctx context.Context,
c client.Client,
namespace string,
hash string,
excludeName string,
) (*nvidiacomv1alpha1.DynamoCheckpoint, error) {
checkpoints := &nvidiacomv1alpha1.DynamoCheckpointList{}
if err := c.List(
ctx,
checkpoints,
client.InNamespace(namespace),
client.MatchingLabels{consts.KubeLabelCheckpointHash: hash},
); err != nil {
return nil, fmt.Errorf("failed to list checkpoints by hash label: %w", err)
}
var existing *nvidiacomv1alpha1.DynamoCheckpoint
for i := range checkpoints.Items {
if checkpoints.Items[i].Name == excludeName {
continue
}
if existing != nil {
return nil, fmt.Errorf("multiple checkpoints found for identity hash %s", hash)
}
existing = checkpoints.Items[i].DeepCopy()
}
if existing != nil {
return existing, nil
}
// Fall back to a full scan so legacy checkpoints without the hash label still resolve.
checkpoints = &nvidiacomv1alpha1.DynamoCheckpointList{}
if err := c.List(ctx, checkpoints, client.InNamespace(namespace)); err != nil {
return nil, fmt.Errorf("failed to list checkpoints: %w", err)
}
for i := range checkpoints.Items {
ckpt := &checkpoints.Items[i]
if ckpt.Name == excludeName {
continue
}
existingHash, err := checkpointIdentityHash(ckpt)
if err != nil {
return nil, err
}
if existingHash != hash {
continue
}
if existing != nil {
return nil, fmt.Errorf("multiple checkpoints found for identity hash %s", hash)
}
existing = ckpt.DeepCopy()
}
return existing, nil
}
func CreateOrGetAutoCheckpoint(
ctx context.Context,
c client.Client,
namespace string,
identity nvidiacomv1alpha1.DynamoCheckpointIdentity,
podTemplate corev1.PodTemplateSpec,
) (*nvidiacomv1alpha1.DynamoCheckpoint, error) {
hash, err := ComputeIdentityHash(identity)
if err != nil {
return nil, fmt.Errorf("failed to compute identity hash: %w", err)
}
ckpt := &nvidiacomv1alpha1.DynamoCheckpoint{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("checkpoint-%s", hash),
Namespace: namespace,
Labels: map[string]string{
consts.KubeLabelCheckpointHash: hash,
},
},
Spec: nvidiacomv1alpha1.DynamoCheckpointSpec{
Identity: identity,
Job: nvidiacomv1alpha1.DynamoCheckpointJobConfig{
PodTemplateSpec: podTemplate,
},
},
}
if err := c.Create(ctx, ckpt); err != nil {
if !apierrors.IsAlreadyExists(err) {
return nil, fmt.Errorf("failed to create checkpoint %s: %w", ckpt.Name, err)
}
existing := &nvidiacomv1alpha1.DynamoCheckpoint{}
key := types.NamespacedName{Name: ckpt.Name, Namespace: namespace}
if err := c.Get(ctx, key, existing); err != nil {
return nil, fmt.Errorf("failed to get checkpoint %s after already exists: %w", ckpt.Name, err)
}
existingHash, err := checkpointIdentityHash(existing)
if err != nil {
return nil, err
}
if existingHash != hash {
return nil, fmt.Errorf("checkpoint %s already exists with identity hash %s", ckpt.Name, existingHash)
}
return existing, nil
}
existing, err := FindCheckpointByIdentityHash(ctx, c, namespace, hash, ckpt.Name)
if err != nil {
if deleteErr := c.Delete(ctx, ckpt); deleteErr != nil && !apierrors.IsNotFound(deleteErr) {
return nil, fmt.Errorf("failed to clean up checkpoint %s after dedupe error: %v (lookup error: %w)", ckpt.Name, deleteErr, err)
}
return nil, err
}
if existing != nil {
if err := c.Delete(ctx, ckpt); err != nil && !apierrors.IsNotFound(err) {
return nil, fmt.Errorf("failed to delete duplicate checkpoint %s: %w", ckpt.Name, err)
}
return existing, nil
}
return ckpt, nil
}
......@@ -145,16 +145,13 @@ const (
// Kubernetes labels
KubeLabelIsCheckpointSource = "nvidia.com/snapshot-is-checkpoint-source" // Pod label that triggers DaemonSet auto-checkpoint
KubeLabelCheckpointHash = "nvidia.com/snapshot-checkpoint-hash" // Checkpoint identity hash (= DynamoCheckpoint CR name)
KubeLabelCheckpointHash = "nvidia.com/snapshot-checkpoint-hash" // Checkpoint identity hash used for lookup/reuse (may differ from DynamoCheckpoint metadata.name)
KubeLabelIsRestoreTarget = "nvidia.com/snapshot-is-restore-target" // Pod label that triggers DaemonSet auto-restore
KubeAnnotationCheckpointLocation = "nvidia.com/snapshot-checkpoint-location" // Pod annotation that tells snapshot-agent where the checkpoint lives
KubeAnnotationCheckpointStorageType = "nvidia.com/snapshot-checkpoint-storage-type" // Pod annotation that tells snapshot-agent which storage backend owns the checkpoint
// Environment variables injected into pods
EnvCheckpointStorageType = "DYN_CHECKPOINT_STORAGE_TYPE" // Storage backend (pvc, s3, oci) — checkpoint job pods only
EnvCheckpointLocation = "DYN_CHECKPOINT_LOCATION" // Full checkpoint URI — future S3/OCI; for PVC, use PATH+HASH instead
EnvCheckpointPath = "DYN_CHECKPOINT_PATH" // Base checkpoint directory (e.g., /checkpoints) — PVC restored pods
EnvCheckpointHash = "DYN_CHECKPOINT_HASH" // Identity hash — all checkpoint-related pods
EnvReadyForCheckpointFile = "DYN_READY_FOR_CHECKPOINT_FILE" // Ready-for-checkpoint file path — checkpoint job pods
EnvSkipWaitForCheckpoint = "SKIP_WAIT_FOR_CHECKPOINT" // Skip polling, check once — restored/DGD pods
// Checkpoint pod-internal constants
CheckpointVolumeName = "checkpoint-storage" // Pod-internal volume name for checkpoint PVC
......@@ -173,19 +170,12 @@ const (
PodInfoFieldPodUID = "metadata.uid"
PodInfoFieldPodNamespace = "metadata.namespace"
// Downward API file names for DGD annotations
// Downward API file names for restore identity
PodInfoFileDynNamespace = "dyn_namespace"
PodInfoFileDynNamespaceWorkerSuffix = "dyn_namespace_worker_suffix"
PodInfoFileDynComponent = "dyn_component"
PodInfoFileDynParentDGDName = "dyn_parent_dgd_name"
PodInfoFileDynParentDGDNS = "dyn_parent_dgd_namespace"
PodInfoFileDynDiscoveryBackend = "dyn_discovery_backend"
// Annotation keys for DGD info (exposed via Downward API)
AnnotationDynNamespace = "nvidia.com/dyn-namespace"
AnnotationDynComponent = "nvidia.com/dyn-component"
AnnotationDynParentDGDName = "nvidia.com/dyn-parent-dgd-name"
AnnotationDynParentDGDNS = "nvidia.com/dyn-parent-dgd-namespace"
AnnotationDynDiscoveryBackend = "nvidia.com/dyn-discovery-backend"
PodInfoFileDynParentDGDName = "dyn_parent_dgd_k8s_name"
PodInfoFileDynParentDGDNamespace = "dyn_parent_dgd_k8s_namespace"
// Rolling update annotations
AnnotationCurrentWorkerHash = "nvidia.com/current-worker-hash"
......
......@@ -20,11 +20,15 @@ package controller
import (
"context"
"fmt"
"time"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/dynamo"
batchv1 "k8s.io/api/batch/v1"
coordinationv1 "k8s.io/api/coordination/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"
"k8s.io/utils/ptr"
......@@ -42,6 +46,12 @@ import (
commonController "github.com/ai-dynamo/dynamo/deploy/operator/internal/controller_common"
)
const (
checkpointStatusAnnotation = "nvidia.com/snapshot-checkpoint-status"
checkpointStatusCompleted = "completed"
checkpointStatusFailed = "failed"
)
// CheckpointReconciler reconciles a DynamoCheckpoint object
type CheckpointReconciler struct {
client.Client
......@@ -50,26 +60,30 @@ type CheckpointReconciler struct {
Recorder record.EventRecorder
}
// Helper function to compute checkpoint location from operator config
func (r *CheckpointReconciler) getCheckpointLocation(identityHash string) string {
basePath := checkpoint.GetPVCBasePath(&r.Config.Checkpoint)
return fmt.Sprintf("%s/%s", basePath, identityHash)
}
// Helper function to get checkpoint storage type from operator config
func (r *CheckpointReconciler) getCheckpointStorageType() nvidiacomv1alpha1.DynamoCheckpointStorageType {
return nvidiacomv1alpha1.DynamoCheckpointStorageType(r.Config.Checkpoint.Storage.Type)
}
// GetRecorder returns the event recorder (implements controller_common.Reconciler interface)
func (r *CheckpointReconciler) GetRecorder() record.EventRecorder {
return r.Recorder
}
func checkpointLeaseExpired(lease *coordinationv1.Lease, now time.Time) bool {
if lease.Spec.LeaseDurationSeconds == nil {
return true
}
leaseTime := lease.Spec.RenewTime
if leaseTime == nil {
leaseTime = lease.Spec.AcquireTime
}
if leaseTime == nil {
return true
}
return now.After(leaseTime.Time.Add(time.Duration(*lease.Spec.LeaseDurationSeconds) * time.Second))
}
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamocheckpoints,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamocheckpoints/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamocheckpoints/finalizers,verbs=update
// +kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=coordination.k8s.io,resources=leases,verbs=get;list;watch
func (r *CheckpointReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)
......@@ -85,24 +99,52 @@ func (r *CheckpointReconciler) Reconcile(ctx context.Context, req ctrl.Request)
logger.Info("Reconciling DynamoCheckpoint", "name", ckpt.Name, "phase", ckpt.Status.Phase)
// Compute identity hash if not already set
if ckpt.Status.IdentityHash == "" {
hash, err := checkpoint.ComputeIdentityHash(ckpt.Spec.Identity)
identityHash, err := checkpoint.ComputeIdentityHash(ckpt.Spec.Identity)
if err != nil {
logger.Error(err, "Failed to compute identity hash")
return ctrl.Result{}, fmt.Errorf("failed to compute identity hash: %w", err)
logger.Error(err, "Failed to compute checkpoint identity hash")
return ctrl.Result{}, fmt.Errorf("failed to compute checkpoint identity hash: %w", err)
}
ckpt.Status.IdentityHash = hash
ckpt.Status.Phase = nvidiacomv1alpha1.DynamoCheckpointPhasePending
if ckpt.Labels == nil {
ckpt.Labels = map[string]string{}
}
if ckpt.Labels[consts.KubeLabelCheckpointHash] != identityHash {
ckpt.Labels[consts.KubeLabelCheckpointHash] = identityHash
if err := r.Update(ctx, ckpt); err != nil {
return ctrl.Result{}, err
}
if err := r.Get(ctx, req.NamespacedName, ckpt); err != nil {
return ctrl.Result{}, err
}
}
needsStatusUpdate := false
phaseWasEmpty := ckpt.Status.Phase == ""
if ckpt.Status.IdentityHash != identityHash {
ckpt.Status.IdentityHash = identityHash
needsStatusUpdate = true
}
switch ckpt.Status.Phase {
case "", nvidiacomv1alpha1.DynamoCheckpointPhasePending, nvidiacomv1alpha1.DynamoCheckpointPhaseCreating, nvidiacomv1alpha1.DynamoCheckpointPhaseReady, nvidiacomv1alpha1.DynamoCheckpointPhaseFailed:
default:
ckpt.Status.Phase = nvidiacomv1alpha1.DynamoCheckpointPhasePending
ckpt.Status.Message = ""
needsStatusUpdate = true
}
if ckpt.Status.Phase == "" {
ckpt.Status.Phase = nvidiacomv1alpha1.DynamoCheckpointPhasePending
ckpt.Status.Message = ""
needsStatusUpdate = true
}
if needsStatusUpdate {
if err := r.Status().Update(ctx, ckpt); err != nil {
logger.Error(err, "Failed to update DynamoCheckpoint status with hash")
logger.Error(err, "Failed to initialize DynamoCheckpoint status")
return ctrl.Result{}, err
}
// Status update will trigger a new reconcile via the watch
if phaseWasEmpty {
return ctrl.Result{}, nil
}
}
// Handle based on current phase
switch ckpt.Status.Phase {
......@@ -132,7 +174,15 @@ func (r *CheckpointReconciler) Reconcile(ctx context.Context, req ctrl.Request)
func (r *CheckpointReconciler) handlePending(ctx context.Context, ckpt *nvidiacomv1alpha1.DynamoCheckpoint) (ctrl.Result, error) {
logger := log.FromContext(ctx)
jobName := fmt.Sprintf("checkpoint-%s", ckpt.Name)
hash := ckpt.Status.IdentityHash
if hash == "" {
var err error
hash, err = checkpoint.ComputeIdentityHash(ckpt.Spec.Identity)
if err != nil {
return ctrl.Result{}, fmt.Errorf("failed to compute checkpoint identity hash: %w", err)
}
}
jobName := fmt.Sprintf("checkpoint-job-%s", hash)
// Use SyncResource to create/update the checkpoint Job
modified, _, err := commonController.SyncResource(ctx, r, ckpt, func(ctx context.Context) (*batchv1.Job, bool, error) {
......@@ -151,6 +201,7 @@ func (r *CheckpointReconciler) handlePending(ctx context.Context, ckpt *nvidiaco
// Update status to Creating phase
ckpt.Status.Phase = nvidiacomv1alpha1.DynamoCheckpointPhaseCreating
ckpt.Status.JobName = jobName
ckpt.Status.Message = ""
meta.SetStatusCondition(&ckpt.Status.Conditions, metav1.Condition{
Type: string(nvidiacomv1alpha1.DynamoCheckpointConditionJobCreated),
Status: metav1.ConditionTrue,
......@@ -170,6 +221,15 @@ func (r *CheckpointReconciler) handlePending(ctx context.Context, ckpt *nvidiaco
func (r *CheckpointReconciler) handleCreating(ctx context.Context, ckpt *nvidiacomv1alpha1.DynamoCheckpoint) (ctrl.Result, error) {
logger := log.FromContext(ctx)
if ckpt.Status.JobName == "" {
ckpt.Status.Phase = nvidiacomv1alpha1.DynamoCheckpointPhasePending
ckpt.Status.Message = "checkpoint job is missing from status"
if err := r.Status().Update(ctx, ckpt); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
// Check Job status
job := &batchv1.Job{}
if err := r.Get(ctx, client.ObjectKey{Namespace: ckpt.Namespace, Name: ckpt.Status.JobName}, job); err != nil {
......@@ -177,6 +237,7 @@ func (r *CheckpointReconciler) handleCreating(ctx context.Context, ckpt *nvidiac
// Job was deleted, go back to Pending
ckpt.Status.Phase = nvidiacomv1alpha1.DynamoCheckpointPhasePending
ckpt.Status.JobName = ""
ckpt.Status.Message = "checkpoint job was deleted"
meta.SetStatusCondition(&ckpt.Status.Conditions, metav1.Condition{
Type: string(nvidiacomv1alpha1.DynamoCheckpointConditionJobCreated),
Status: metav1.ConditionFalse,
......@@ -192,19 +253,100 @@ func (r *CheckpointReconciler) handleCreating(ctx context.Context, ckpt *nvidiac
return ctrl.Result{}, err
}
// Check if job succeeded
if job.Status.Succeeded > 0 {
jobComplete := false
jobFailed := false
for _, condition := range job.Status.Conditions {
if condition.Status != corev1.ConditionTrue {
continue
}
if condition.Type == batchv1.JobComplete {
jobComplete = true
continue
}
if condition.Type == batchv1.JobFailed {
jobFailed = true
}
}
status := job.Annotations[checkpointStatusAnnotation]
if status == checkpointStatusFailed {
reason := "JobFailed"
message := "Checkpoint job failed"
if jobComplete {
reason = "CheckpointVerificationFailed"
message = "Checkpoint job completed but snapshot-agent reported checkpoint failure"
}
logger.Info("Checkpoint Job failed", "job", job.Name, "checkpoint_status", status)
r.Recorder.Event(ckpt, corev1.EventTypeWarning, "CheckpointFailed", message)
ckpt.Status.Phase = nvidiacomv1alpha1.DynamoCheckpointPhaseFailed
ckpt.Status.Message = message
meta.SetStatusCondition(&ckpt.Status.Conditions, metav1.Condition{
Type: string(nvidiacomv1alpha1.DynamoCheckpointConditionJobCompleted),
Status: metav1.ConditionFalse,
Reason: reason,
Message: message,
LastTransitionTime: metav1.Now(),
})
if err := r.Status().Update(ctx, ckpt); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
if jobComplete {
if status != checkpointStatusCompleted {
lease := &coordinationv1.Lease{}
leaseKey := client.ObjectKey{Namespace: job.Namespace, Name: job.Name}
if err := r.Get(ctx, leaseKey, lease); err != nil {
if !apierrors.IsNotFound(err) {
return ctrl.Result{}, err
}
} else if !checkpointLeaseExpired(lease, time.Now()) {
logger.V(1).Info("Checkpoint job is complete but checkpoint lease is still active; waiting for terminal watcher status", "job", job.Name)
return ctrl.Result{RequeueAfter: time.Second}, nil
}
reason := "CheckpointVerificationFailed"
message := "Checkpoint job completed without snapshot-agent completion confirmation"
if status == checkpointStatusFailed {
message = "Checkpoint job completed but snapshot-agent reported checkpoint failure"
}
logger.Info("Checkpoint Job completed without usable artifact", "job", job.Name, "checkpoint_status", status)
r.Recorder.Event(ckpt, corev1.EventTypeWarning, "CheckpointFailed", message)
ckpt.Status.Phase = nvidiacomv1alpha1.DynamoCheckpointPhaseFailed
ckpt.Status.Message = message
meta.SetStatusCondition(&ckpt.Status.Conditions, metav1.Condition{
Type: string(nvidiacomv1alpha1.DynamoCheckpointConditionJobCompleted),
Status: metav1.ConditionFalse,
Reason: reason,
Message: message,
LastTransitionTime: metav1.Now(),
})
if err := r.Status().Update(ctx, ckpt); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
logger.Info("Checkpoint Job succeeded", "job", job.Name)
r.Recorder.Event(ckpt, corev1.EventTypeNormal, "CheckpointReady", "Checkpoint creation completed successfully")
now := metav1.Now()
location, storageType, err := checkpoint.ResolveCheckpointStorage(ckpt.Status.IdentityHash, &r.Config.Checkpoint)
if err != nil {
return ctrl.Result{}, err
}
ckpt.Status.Phase = nvidiacomv1alpha1.DynamoCheckpointPhaseReady
ckpt.Status.CreatedAt = &now
// Set checkpoint location and storage type using helper functions
ckpt.Status.Location = r.getCheckpointLocation(ckpt.Status.IdentityHash)
ckpt.Status.StorageType = r.getCheckpointStorageType()
ckpt.Status.Location = location
ckpt.Status.StorageType = storageType
ckpt.Status.Message = ""
meta.SetStatusCondition(&ckpt.Status.Conditions, metav1.Condition{
Type: string(nvidiacomv1alpha1.DynamoCheckpointConditionJobCompleted),
Status: metav1.ConditionTrue,
......@@ -219,14 +361,6 @@ func (r *CheckpointReconciler) handleCreating(ctx context.Context, ckpt *nvidiac
return ctrl.Result{}, nil
}
// Check if job reached terminal Failed condition.
jobFailed := false
for _, condition := range job.Status.Conditions {
if condition.Type == batchv1.JobFailed && condition.Status == corev1.ConditionTrue {
jobFailed = true
break
}
}
if jobFailed {
logger.Info("Checkpoint Job failed", "job", job.Name)
r.Recorder.Event(ckpt, corev1.EventTypeWarning, "CheckpointFailed", "Checkpoint creation failed")
......@@ -251,62 +385,155 @@ func (r *CheckpointReconciler) handleCreating(ctx context.Context, ckpt *nvidiac
return ctrl.Result{}, nil
}
func (r *CheckpointReconciler) buildCheckpointWorkerDefaultEnv(
ckpt *nvidiacomv1alpha1.DynamoCheckpoint,
podTemplate *corev1.PodTemplateSpec,
) []corev1.EnvVar {
componentType := consts.ComponentTypeWorker
dynamoNamespace := consts.GlobalDynamoNamespace
parentGraphDeploymentName := podTemplate.Labels[consts.KubeLabelDynamoGraphDeploymentName]
workerHashSuffix := podTemplate.Labels[consts.KubeLabelDynamoWorkerHash]
discoveryBackend := configv1alpha1.DiscoveryBackendKubernetes
if podTemplate.Labels[consts.KubeLabelDynamoNamespace] != "" {
dynamoNamespace = podTemplate.Labels[consts.KubeLabelDynamoNamespace]
}
if podTemplate.Labels[consts.KubeLabelDynamoComponentType] != "" &&
dynamo.IsWorkerComponent(podTemplate.Labels[consts.KubeLabelDynamoComponentType]) {
componentType = podTemplate.Labels[consts.KubeLabelDynamoComponentType]
}
defaultContainer, _ := dynamo.NewWorkerDefaults().GetBaseContainer(dynamo.ComponentContext{
ComponentType: componentType,
DynamoNamespace: dynamoNamespace,
ParentGraphDeploymentName: parentGraphDeploymentName,
ParentGraphDeploymentNamespace: ckpt.Namespace,
DiscoveryBackend: discoveryBackend,
WorkerHashSuffix: workerHashSuffix,
})
return defaultContainer.Env
}
func (r *CheckpointReconciler) buildCheckpointJob(ckpt *nvidiacomv1alpha1.DynamoCheckpoint, jobName string) *batchv1.Job {
// Use the pod template from the spec
podTemplate := ckpt.Spec.Job.PodTemplateSpec.DeepCopy()
hash := ckpt.Status.IdentityHash
if hash == "" {
hash, _ = checkpoint.ComputeIdentityHash(ckpt.Spec.Identity)
}
// Add checkpoint-related labels
if podTemplate.Labels == nil {
podTemplate.Labels = make(map[string]string)
}
podTemplate.Labels[consts.KubeLabelCheckpointHash] = ckpt.Status.IdentityHash
podTemplate.Labels[consts.KubeLabelIsCheckpointSource] = "true"
if podTemplate.Annotations == nil {
podTemplate.Annotations = make(map[string]string)
}
location, storageType, err := checkpoint.ResolveCheckpointStorage(hash, &r.Config.Checkpoint)
if err != nil {
location = ""
storageType = ""
}
checkpoint.ApplyCheckpointSourcePodMetadata(podTemplate.Labels, podTemplate.Annotations, hash, location, storageType)
hasPodInfoVolume := false
for _, volume := range podTemplate.Spec.Volumes {
if volume.Name == consts.PodInfoVolumeName {
hasPodInfoVolume = true
break
}
}
if !hasPodInfoVolume {
podTemplate.Spec.Volumes = append(podTemplate.Spec.Volumes, corev1.Volume{
Name: consts.PodInfoVolumeName,
VolumeSource: corev1.VolumeSource{
DownwardAPI: &corev1.DownwardAPIVolumeSource{
Items: []corev1.DownwardAPIVolumeFile{
{
Path: consts.PodInfoFileDynNamespace,
FieldRef: &corev1.ObjectFieldSelector{
APIVersion: "v1",
FieldPath: "metadata.labels['" + consts.KubeLabelDynamoNamespace + "']",
},
},
{
Path: consts.PodInfoFileDynNamespaceWorkerSuffix,
FieldRef: &corev1.ObjectFieldSelector{
APIVersion: "v1",
FieldPath: "metadata.labels['" + consts.KubeLabelDynamoWorkerHash + "']",
},
},
{
Path: consts.PodInfoFileDynComponent,
FieldRef: &corev1.ObjectFieldSelector{
APIVersion: "v1",
FieldPath: "metadata.labels['" + consts.KubeLabelDynamoComponentType + "']",
},
},
{
Path: consts.PodInfoFileDynParentDGDName,
FieldRef: &corev1.ObjectFieldSelector{
APIVersion: "v1",
FieldPath: "metadata.labels['" + consts.KubeLabelDynamoGraphDeploymentName + "']",
},
},
{
Path: consts.PodInfoFileDynParentDGDNamespace,
FieldRef: &corev1.ObjectFieldSelector{
APIVersion: "v1",
FieldPath: "metadata.namespace",
},
},
{
Path: "pod_name",
FieldRef: &corev1.ObjectFieldSelector{
APIVersion: "v1",
FieldPath: consts.PodInfoFieldPodName,
},
},
{
Path: "pod_uid",
FieldRef: &corev1.ObjectFieldSelector{
APIVersion: "v1",
FieldPath: consts.PodInfoFieldPodUID,
},
},
{
Path: "pod_namespace",
FieldRef: &corev1.ObjectFieldSelector{
APIVersion: "v1",
FieldPath: consts.PodInfoFieldPodNamespace,
},
},
},
},
},
})
}
// Add checkpoint env vars and volume mounts to main container
// Configure the main container for checkpoint mode.
if len(podTemplate.Spec.Containers) > 0 {
mainContainer := &podTemplate.Spec.Containers[0]
// Compute checkpoint location and storage type using helper functions
checkpointLocation := r.getCheckpointLocation(ckpt.Status.IdentityHash)
storageType := string(r.getCheckpointStorageType())
// Manual checkpoints start from a raw pod template, so re-apply the worker
// runtime env defaults before layering checkpoint-specific env on top.
mainContainer.Env = dynamo.MergeEnvs(
r.buildCheckpointWorkerDefaultEnv(ckpt, podTemplate),
mainContainer.Env,
)
dynamo.AddStandardEnvVars(mainContainer, r.Config)
// Add checkpoint-related env vars
// Add the ready-for-checkpoint signal path.
mainContainer.Env = append(mainContainer.Env,
// Ready file: Worker creates this when model is loaded
corev1.EnvVar{
Name: consts.EnvReadyForCheckpointFile,
Value: r.Config.Checkpoint.ReadyForCheckpointFilePath,
},
// Checkpoint hash: For idempotency check
corev1.EnvVar{
Name: consts.EnvCheckpointHash,
Value: ckpt.Status.IdentityHash,
},
// Checkpoint location: For idempotency check
corev1.EnvVar{
Name: consts.EnvCheckpointLocation,
Value: checkpointLocation,
},
// Storage type: For idempotency check (pvc, s3, oci)
corev1.EnvVar{
Name: consts.EnvCheckpointStorageType,
Value: storageType,
},
)
// Add checkpoint PVC volume and mount for mount namespace consistency with restore pods
// CRIU requires the exact same mount layout between checkpoint and restore
if r.Config.Checkpoint.Storage.PVC.PVCName != "" {
pvcName := r.Config.Checkpoint.Storage.PVC.PVCName
basePath := r.Config.Checkpoint.Storage.PVC.BasePath
checkpoint.InjectCheckpointVolume(&podTemplate.Spec, pvcName)
checkpoint.InjectCheckpointVolumeMount(mainContainer, basePath)
if gpus, ok := mainContainer.Resources.Limits[corev1.ResourceName(consts.KubeResourceGPUNvidia)]; ok && gpus.Cmp(*resource.NewQuantity(1, resource.DecimalSI)) > 0 {
mainContainer.Command = append([]string{"cuda-checkpoint", "--launch-job"}, mainContainer.Command...)
}
// Add Downward API volume for pod identity (mount namespace consistency with restore pods)
checkpoint.InjectPodInfoVolume(&podTemplate.Spec)
checkpoint.InjectPodInfoVolumeMount(mainContainer)
// Override probes for checkpoint mode
// Checkpoint jobs need different probe behavior than regular worker pods:
// - Readiness: Wait for model to load before checkpoint
......@@ -324,6 +551,23 @@ func (r *CheckpointReconciler) buildCheckpointJob(ckpt *nvidiacomv1alpha1.Dynamo
mainContainer.LivenessProbe = nil
// Remove startup probe - not needed for checkpoint jobs
mainContainer.StartupProbe = nil
hasPodInfoMount := false
for _, mount := range mainContainer.VolumeMounts {
if mount.Name == consts.PodInfoVolumeName {
hasPodInfoMount = true
break
}
}
if !hasPodInfoMount {
mainContainer.VolumeMounts = append(mainContainer.VolumeMounts, corev1.VolumeMount{
Name: consts.PodInfoVolumeName,
MountPath: consts.PodInfoMountPath,
ReadOnly: true,
})
}
dynamo.ApplySharedMemoryVolumeAndMount(&podTemplate.Spec, mainContainer, ckpt.Spec.Job.SharedMemory)
}
// Set restart policy to Never for Jobs
......@@ -331,11 +575,12 @@ func (r *CheckpointReconciler) buildCheckpointJob(ckpt *nvidiacomv1alpha1.Dynamo
// Apply seccomp profile to block io_uring syscalls
// CRIU doesn't support io_uring memory mappings, so we must block these syscalls
podTemplate.Spec.SecurityContext = &corev1.PodSecurityContext{
SeccompProfile: &corev1.SeccompProfile{
if podTemplate.Spec.SecurityContext == nil {
podTemplate.Spec.SecurityContext = &corev1.PodSecurityContext{}
}
podTemplate.Spec.SecurityContext.SeccompProfile = &corev1.SeccompProfile{
Type: corev1.SeccompProfileTypeLocalhost,
LocalhostProfile: ptr.To(consts.SeccompProfilePath),
},
}
// Build the Job
......@@ -345,12 +590,6 @@ func (r *CheckpointReconciler) buildCheckpointJob(ckpt *nvidiacomv1alpha1.Dynamo
activeDeadlineSeconds = &defaultDeadline
}
backoffLimit := ckpt.Spec.Job.BackoffLimit
if backoffLimit == nil {
defaultBackoff := int32(3)
backoffLimit = &defaultBackoff
}
ttlSeconds := ckpt.Spec.Job.TTLSecondsAfterFinished
if ttlSeconds == nil {
defaultTTL := int32(300) // 5 minutes
......@@ -362,12 +601,13 @@ func (r *CheckpointReconciler) buildCheckpointJob(ckpt *nvidiacomv1alpha1.Dynamo
Name: jobName,
Namespace: ckpt.Namespace,
Labels: map[string]string{
consts.KubeLabelCheckpointHash: ckpt.Status.IdentityHash,
consts.KubeLabelCheckpointHash: hash,
},
},
Spec: batchv1.JobSpec{
ActiveDeadlineSeconds: activeDeadlineSeconds,
BackoffLimit: backoffLimit,
// Checkpoint jobs are single-attempt to keep snapshot-agent status terminal.
BackoffLimit: ptr.To[int32](0),
TTLSecondsAfterFinished: ttlSeconds,
Template: *podTemplate,
},
......
......@@ -20,33 +20,50 @@ package controller
import (
"context"
"testing"
"time"
configv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/config/v1alpha1"
nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/checkpoint"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
batchv1 "k8s.io/api/batch/v1"
coordinationv1 "k8s.io/api/coordination/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
)
const (
testHash = "abc123def4567890"
testNamespace = "default"
)
const testNamespace = "default"
const friendlyCheckpointName = "friendly-checkpoint"
var checkpointTestIdentity = nvidiacomv1alpha1.DynamoCheckpointIdentity{
Model: "meta-llama/Llama-2-7b-hf",
BackendFramework: "vllm",
}
var testHash = func() string {
hash, err := checkpoint.ComputeIdentityHash(checkpointTestIdentity)
if err != nil {
panic(err)
}
return hash
}()
func checkpointTestScheme() *runtime.Scheme {
s := runtime.NewScheme()
_ = nvidiacomv1alpha1.AddToScheme(s)
_ = corev1.AddToScheme(s)
_ = batchv1.AddToScheme(s)
_ = coordinationv1.AddToScheme(s)
return s
}
......@@ -74,17 +91,20 @@ func makeCheckpointReconciler(s *runtime.Scheme, objs ...client.Object) *Checkpo
}
}
func makeTestCheckpoint(name string, phase nvidiacomv1alpha1.DynamoCheckpointPhase) *nvidiacomv1alpha1.DynamoCheckpoint {
func makeTestCheckpoint(phase nvidiacomv1alpha1.DynamoCheckpointPhase) *nvidiacomv1alpha1.DynamoCheckpoint {
runAsUser := int64(1234)
fsGroup := int64(4321)
return &nvidiacomv1alpha1.DynamoCheckpoint{
ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: testNamespace},
ObjectMeta: metav1.ObjectMeta{Name: testHash, Namespace: testNamespace},
Spec: nvidiacomv1alpha1.DynamoCheckpointSpec{
Identity: nvidiacomv1alpha1.DynamoCheckpointIdentity{
Model: "meta-llama/Llama-2-7b-hf",
BackendFramework: "vllm",
},
Identity: checkpointTestIdentity,
Job: nvidiacomv1alpha1.DynamoCheckpointJobConfig{
PodTemplateSpec: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
SecurityContext: &corev1.PodSecurityContext{
RunAsUser: &runAsUser,
FSGroup: &fsGroup,
},
Containers: []corev1.Container{{
Name: "main",
Image: "test-image:latest",
......@@ -99,13 +119,29 @@ func makeTestCheckpoint(name string, phase nvidiacomv1alpha1.DynamoCheckpointPha
}
}
func makeCheckpointLease(name string, renewTime time.Time, durationSeconds int32) *coordinationv1.Lease {
renewMicroTime := metav1.NewMicroTime(renewTime)
return &coordinationv1.Lease{
ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: testNamespace},
Spec: coordinationv1.LeaseSpec{
HolderIdentity: ptr.To("snapshot-agent/test"),
LeaseDurationSeconds: &durationSeconds,
AcquireTime: &renewMicroTime,
RenewTime: &renewMicroTime,
},
}
}
func TestBuildCheckpointJob(t *testing.T) {
s := checkpointTestScheme()
ckpt := makeTestCheckpoint("test-ckpt", nvidiacomv1alpha1.DynamoCheckpointPhasePending)
ckpt.Status.IdentityHash = testHash
ckpt := makeTestCheckpoint(nvidiacomv1alpha1.DynamoCheckpointPhasePending)
ckpt.Spec.Job.PodTemplateSpec.Labels = map[string]string{
consts.KubeLabelDynamoNamespace: "manual-checkpoint",
consts.KubeLabelDynamoWorkerHash: "worker-1234",
}
r := makeCheckpointReconciler(s, ckpt)
job := r.buildCheckpointJob(ckpt, "checkpoint-test-ckpt")
job := r.buildCheckpointJob(ckpt, "checkpoint-job-"+testHash)
podSpec := job.Spec.Template.Spec
main := podSpec.Containers[0]
......@@ -120,16 +156,35 @@ func TestBuildCheckpointJob(t *testing.T) {
envMap[e.Name] = e.Value
}
assert.Equal(t, "/tmp/ready-for-checkpoint", envMap[consts.EnvReadyForCheckpointFile])
assert.Equal(t, testHash, envMap[consts.EnvCheckpointHash])
assert.Equal(t, "/checkpoints/"+testHash, envMap[consts.EnvCheckpointLocation])
assert.Equal(t, "pvc", envMap[consts.EnvCheckpointStorageType])
assert.Equal(t, "manual-checkpoint", envMap[consts.DynamoNamespaceEnvVar])
assert.Equal(t, consts.ComponentTypeWorker, envMap[consts.DynamoComponentEnvVar])
assert.Equal(t, "worker-1234", envMap[consts.DynamoNamespaceWorkerSuffixEnvVar])
assert.Equal(t, "kubernetes", envMap[consts.DynamoDiscoveryBackendEnvVar])
assert.Equal(t, "9090", envMap["DYN_SYSTEM_PORT"])
assert.Equal(t, "true", envMap["DYN_SYSTEM_ENABLED"])
assert.Equal(t, "secret", envMap["HF_TOKEN"])
var podNameEnv *corev1.EnvVar
for i := range main.Env {
if main.Env[i].Name == "POD_NAME" {
podNameEnv = &main.Env[i]
break
}
}
require.NotNil(t, podNameEnv)
require.NotNil(t, podNameEnv.ValueFrom)
require.NotNil(t, podNameEnv.ValueFrom.FieldRef)
assert.Equal(t, "metadata.name", podNameEnv.ValueFrom.FieldRef.FieldPath)
// Seccomp profile
require.NotNil(t, podSpec.SecurityContext)
require.NotNil(t, podSpec.SecurityContext.SeccompProfile)
assert.Equal(t, corev1.SeccompProfileTypeLocalhost, podSpec.SecurityContext.SeccompProfile.Type)
assert.Equal(t, consts.SeccompProfilePath, *podSpec.SecurityContext.SeccompProfile.LocalhostProfile)
require.NotNil(t, podSpec.SecurityContext.RunAsUser)
assert.Equal(t, int64(1234), *podSpec.SecurityContext.RunAsUser)
require.NotNil(t, podSpec.SecurityContext.FSGroup)
assert.Equal(t, int64(4321), *podSpec.SecurityContext.FSGroup)
// Probes: readiness set, liveness/startup cleared
require.NotNil(t, main.ReadinessProbe)
......@@ -137,27 +192,35 @@ func TestBuildCheckpointJob(t *testing.T) {
assert.Nil(t, main.LivenessProbe)
assert.Nil(t, main.StartupProbe)
// Checkpoint PVC volume + mount
// Checkpoint jobs still mount podinfo for Kubernetes discovery, but not checkpoint storage.
volNames := make(map[string]bool)
for _, v := range podSpec.Volumes {
volNames[v.Name] = true
if v.Name == consts.CheckpointVolumeName {
require.NotNil(t, v.PersistentVolumeClaim)
assert.Equal(t, "snapshot-pvc", v.PersistentVolumeClaim.ClaimName)
}
if v.Name == consts.PodInfoVolumeName {
require.NotNil(t, v.DownwardAPI)
}
}
assert.True(t, volNames[consts.CheckpointVolumeName])
assert.False(t, volNames[consts.CheckpointVolumeName])
assert.True(t, volNames[consts.PodInfoVolumeName])
mountPaths := make(map[string]string)
for _, m := range main.VolumeMounts {
mountPaths[m.Name] = m.MountPath
}
assert.Equal(t, "/checkpoints", mountPaths[consts.CheckpointVolumeName])
_, hasCheckpointMount := mountPaths[consts.CheckpointVolumeName]
assert.False(t, hasCheckpointMount)
assert.Equal(t, consts.PodInfoMountPath, mountPaths[consts.PodInfoVolumeName])
assert.Equal(t, consts.DefaultSharedMemoryMountPath, mountPaths[consts.KubeValueNameSharedMemory])
foundSharedMemoryVolume := false
for _, v := range podSpec.Volumes {
if v.Name != consts.KubeValueNameSharedMemory {
continue
}
foundSharedMemoryVolume = true
require.NotNil(t, v.EmptyDir)
assert.Equal(t, corev1.StorageMediumMemory, v.EmptyDir.Medium)
require.NotNil(t, v.EmptyDir.SizeLimit)
assert.Equal(t, resource.MustParse(consts.DefaultSharedMemorySize), *v.EmptyDir.SizeLimit)
}
require.True(t, foundSharedMemoryVolume, "shared-memory volume not found: "+consts.KubeValueNameSharedMemory)
// Restart policy, user image/command preserved
assert.Equal(t, corev1.RestartPolicyNever, podSpec.RestartPolicy)
......@@ -166,20 +229,72 @@ func TestBuildCheckpointJob(t *testing.T) {
// Default deadlines
assert.Equal(t, int64(3600), *job.Spec.ActiveDeadlineSeconds)
assert.Equal(t, int32(3), *job.Spec.BackoffLimit)
assert.Equal(t, int32(0), *job.Spec.BackoffLimit)
assert.Equal(t, int32(300), *job.Spec.TTLSecondsAfterFinished)
// Custom deadlines override defaults
// Custom deadlines override defaults, but checkpoint jobs never retry.
deadline := int64(7200)
backoff := int32(5)
ttl := int32(600)
ckpt.Spec.Job.ActiveDeadlineSeconds = &deadline
ckpt.Spec.Job.BackoffLimit = &backoff
ckpt.Spec.Job.BackoffLimit = &backoff //nolint:staticcheck // Compatibility test: deprecated field must remain ignored by checkpoint Jobs.
ckpt.Spec.Job.TTLSecondsAfterFinished = &ttl
job = r.buildCheckpointJob(ckpt, "checkpoint-test-ckpt")
job = r.buildCheckpointJob(ckpt, "checkpoint-job-"+testHash)
assert.Equal(t, int64(7200), *job.Spec.ActiveDeadlineSeconds)
assert.Equal(t, int32(5), *job.Spec.BackoffLimit)
assert.Equal(t, int32(0), *job.Spec.BackoffLimit)
assert.Equal(t, int32(600), *job.Spec.TTLSecondsAfterFinished)
ckpt.Spec.Job.PodTemplateSpec.Spec.Containers[0].Resources = corev1.ResourceRequirements{
Limits: corev1.ResourceList{
corev1.ResourceName("nvidia.com/gpu"): resource.MustParse("2"),
},
}
job = r.buildCheckpointJob(ckpt, "checkpoint-job-"+testHash)
assert.Equal(t, []string{"cuda-checkpoint", "--launch-job", "python3", "-m", "dynamo.vllm"}, job.Spec.Template.Spec.Containers[0].Command)
}
func TestBuildCheckpointJobInjectsStandardEnvVars(t *testing.T) {
s := checkpointTestScheme()
ckpt := makeTestCheckpoint(nvidiacomv1alpha1.DynamoCheckpointPhasePending)
ckpt.Spec.Job.PodTemplateSpec.Spec.Containers[0].Env = append(
ckpt.Spec.Job.PodTemplateSpec.Spec.Containers[0].Env,
corev1.EnvVar{Name: "NATS_SERVER", Value: "nats://custom:4222"},
corev1.EnvVar{Name: "DYN_SYSTEM_PORT", Value: "10090"},
)
r := makeCheckpointReconciler(s, ckpt)
r.Config.Infrastructure = configv1alpha1.InfrastructureConfiguration{
NATSAddress: "nats://platform:4222",
ETCDAddress: "http://etcd:2379",
ModelExpressURL: "http://model-express:8000",
PrometheusEndpoint: "http://prometheus:9090",
}
customShmSize := resource.MustParse("16Gi")
ckpt.Spec.Job.SharedMemory = &nvidiacomv1alpha1.SharedMemorySpec{Size: customShmSize}
job := r.buildCheckpointJob(ckpt, "checkpoint-job-"+testHash)
foundCustomShmVolume := false
for _, v := range job.Spec.Template.Spec.Volumes {
if v.Name == consts.KubeValueNameSharedMemory {
foundCustomShmVolume = true
require.NotNil(t, v.EmptyDir)
require.NotNil(t, v.EmptyDir.SizeLimit)
assert.Equal(t, customShmSize, *v.EmptyDir.SizeLimit)
}
}
require.True(t, foundCustomShmVolume, "shared-memory volume not found: "+consts.KubeValueNameSharedMemory)
main := job.Spec.Template.Spec.Containers[0]
envMap := make(map[string]string, len(main.Env))
for _, e := range main.Env {
envMap[e.Name] = e.Value
}
assert.Equal(t, "nats://custom:4222", envMap["NATS_SERVER"])
assert.Equal(t, "10090", envMap["DYN_SYSTEM_PORT"])
assert.Equal(t, "http://etcd:2379", envMap["ETCD_ENDPOINTS"])
assert.Equal(t, "http://model-express:8000", envMap["MODEL_EXPRESS_URL"])
assert.Equal(t, "http://prometheus:9090", envMap["PROMETHEUS_ENDPOINT"])
}
func TestCheckpointReconciler_Reconcile(t *testing.T) {
......@@ -196,45 +311,62 @@ func TestCheckpointReconciler_Reconcile(t *testing.T) {
})
t.Run("new CR computes hash and sets Pending", func(t *testing.T) {
ckpt := makeTestCheckpoint("new-ckpt", "")
ckpt := makeTestCheckpoint("")
r := makeCheckpointReconciler(s, ckpt)
_, err := r.Reconcile(ctx, ctrl.Request{
NamespacedName: types.NamespacedName{Name: "new-ckpt", Namespace: testNamespace},
NamespacedName: types.NamespacedName{Name: testHash, Namespace: testNamespace},
})
require.NoError(t, err)
updated := &nvidiacomv1alpha1.DynamoCheckpoint{}
require.NoError(t, r.Get(ctx, types.NamespacedName{Name: "new-ckpt", Namespace: testNamespace}, updated))
require.NoError(t, r.Get(ctx, types.NamespacedName{Name: testHash, Namespace: testNamespace}, updated))
assert.Equal(t, nvidiacomv1alpha1.DynamoCheckpointPhasePending, updated.Status.Phase)
assert.Len(t, updated.Status.IdentityHash, 16)
assert.Equal(t, testHash, updated.Status.IdentityHash)
assert.Empty(t, updated.Status.Message)
assert.Equal(t, testHash, updated.Labels[consts.KubeLabelCheckpointHash])
})
t.Run("Ready phase is a no-op", func(t *testing.T) {
ckpt := makeTestCheckpoint("ready-ckpt", nvidiacomv1alpha1.DynamoCheckpointPhaseReady)
ckpt.Status.IdentityHash = testHash
ckpt := makeTestCheckpoint(nvidiacomv1alpha1.DynamoCheckpointPhaseReady)
r := makeCheckpointReconciler(s, ckpt)
result, err := r.Reconcile(ctx, ctrl.Request{
NamespacedName: types.NamespacedName{Name: "ready-ckpt", Namespace: testNamespace},
NamespacedName: types.NamespacedName{Name: ckpt.Name, Namespace: testNamespace},
})
require.NoError(t, err)
assert.Equal(t, ctrl.Result{}, result)
})
t.Run("human-readable checkpoint name backfills hash state", func(t *testing.T) {
ckpt := makeTestCheckpoint("")
ckpt.Name = friendlyCheckpointName
r := makeCheckpointReconciler(s, ckpt)
_, err := r.Reconcile(ctx, ctrl.Request{
NamespacedName: types.NamespacedName{Name: friendlyCheckpointName, Namespace: testNamespace},
})
require.NoError(t, err)
updated := &nvidiacomv1alpha1.DynamoCheckpoint{}
require.NoError(t, r.Get(ctx, types.NamespacedName{Name: friendlyCheckpointName, Namespace: testNamespace}, updated))
assert.Equal(t, testHash, updated.Labels[consts.KubeLabelCheckpointHash])
assert.Equal(t, testHash, updated.Status.IdentityHash)
})
t.Run("unknown phase resets to Pending", func(t *testing.T) {
ckpt := makeTestCheckpoint("unknown-ckpt", "SomeUnknownPhase")
ckpt.Status.IdentityHash = testHash
ckpt := makeTestCheckpoint("SomeUnknownPhase")
r := makeCheckpointReconciler(s, ckpt)
_, err := r.Reconcile(ctx, ctrl.Request{
NamespacedName: types.NamespacedName{Name: "unknown-ckpt", Namespace: testNamespace},
NamespacedName: types.NamespacedName{Name: testHash, Namespace: testNamespace},
})
require.NoError(t, err)
updated := &nvidiacomv1alpha1.DynamoCheckpoint{}
require.NoError(t, r.Get(ctx, types.NamespacedName{Name: "unknown-ckpt", Namespace: testNamespace}, updated))
assert.Equal(t, nvidiacomv1alpha1.DynamoCheckpointPhasePending, updated.Status.Phase)
require.NoError(t, r.Get(ctx, types.NamespacedName{Name: testHash, Namespace: testNamespace}, updated))
assert.Equal(t, nvidiacomv1alpha1.DynamoCheckpointPhaseCreating, updated.Status.Phase)
assert.Equal(t, "checkpoint-job-"+testHash, updated.Status.JobName)
})
}
......@@ -244,17 +376,29 @@ func TestCheckpointReconciler_HandleCreating(t *testing.T) {
// Helper to create a checkpoint CR in Creating phase with a named job
makeCreatingCkpt := func(name, jobName string) *nvidiacomv1alpha1.DynamoCheckpoint {
ckpt := makeTestCheckpoint(name, nvidiacomv1alpha1.DynamoCheckpointPhaseCreating)
ckpt := makeTestCheckpoint(nvidiacomv1alpha1.DynamoCheckpointPhaseCreating)
if name != "" {
ckpt.Name = name
}
ckpt.Status.IdentityHash = testHash
ckpt.Status.JobName = jobName
return ckpt
}
t.Run("succeeded job transitions to Ready", func(t *testing.T) {
ckpt := makeCreatingCkpt("ckpt-ok", "job-ok")
ckpt := makeCreatingCkpt(testHash, "job-ok")
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{Name: "job-ok", Namespace: testNamespace},
Status: batchv1.JobStatus{Succeeded: 1},
ObjectMeta: metav1.ObjectMeta{
Name: "job-ok",
Namespace: testNamespace,
Annotations: map[string]string{checkpointStatusAnnotation: checkpointStatusCompleted},
},
Status: batchv1.JobStatus{
Succeeded: 1,
Conditions: []batchv1.JobCondition{
{Type: batchv1.JobComplete, Status: corev1.ConditionTrue, LastTransitionTime: metav1.Now()},
},
},
}
r := makeCheckpointReconciler(s, ckpt, job)
......@@ -262,7 +406,7 @@ func TestCheckpointReconciler_HandleCreating(t *testing.T) {
require.NoError(t, err)
updated := &nvidiacomv1alpha1.DynamoCheckpoint{}
require.NoError(t, r.Get(ctx, types.NamespacedName{Name: "ckpt-ok", Namespace: testNamespace}, updated))
require.NoError(t, r.Get(ctx, types.NamespacedName{Name: testHash, Namespace: testNamespace}, updated))
assert.Equal(t, nvidiacomv1alpha1.DynamoCheckpointPhaseReady, updated.Status.Phase)
assert.Equal(t, "/checkpoints/"+testHash, updated.Status.Location)
assert.Equal(t, nvidiacomv1alpha1.DynamoCheckpointStorageType("pvc"), updated.Status.StorageType)
......@@ -270,7 +414,7 @@ func TestCheckpointReconciler_HandleCreating(t *testing.T) {
})
t.Run("failed job transitions to Failed", func(t *testing.T) {
ckpt := makeCreatingCkpt("ckpt-fail", "job-fail")
ckpt := makeCreatingCkpt(testHash, "job-fail")
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{Name: "job-fail", Namespace: testNamespace},
Status: batchv1.JobStatus{
......@@ -283,12 +427,107 @@ func TestCheckpointReconciler_HandleCreating(t *testing.T) {
require.NoError(t, err)
updated := &nvidiacomv1alpha1.DynamoCheckpoint{}
require.NoError(t, r.Get(ctx, types.NamespacedName{Name: "ckpt-fail", Namespace: testNamespace}, updated))
require.NoError(t, r.Get(ctx, types.NamespacedName{Name: testHash, Namespace: testNamespace}, updated))
assert.Equal(t, nvidiacomv1alpha1.DynamoCheckpointPhaseFailed, updated.Status.Phase)
})
t.Run("completed job without completion annotation waits while lease is active", func(t *testing.T) {
ckpt := makeCreatingCkpt(testHash, "job-missing-status-active-lease")
completionTime := metav1.NewTime(time.Now())
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{Name: "job-missing-status-active-lease", Namespace: testNamespace},
Status: batchv1.JobStatus{
Succeeded: 1,
CompletionTime: &completionTime,
Conditions: []batchv1.JobCondition{
{Type: batchv1.JobComplete, Status: corev1.ConditionTrue, LastTransitionTime: completionTime},
},
},
}
lease := makeCheckpointLease("job-missing-status-active-lease", time.Now(), 30)
r := makeCheckpointReconciler(s, ckpt, job, lease)
result, err := r.handleCreating(ctx, ckpt)
require.NoError(t, err)
assert.Equal(t, time.Second, result.RequeueAfter)
updated := &nvidiacomv1alpha1.DynamoCheckpoint{}
require.NoError(t, r.Get(ctx, types.NamespacedName{Name: testHash, Namespace: testNamespace}, updated))
assert.Equal(t, nvidiacomv1alpha1.DynamoCheckpointPhaseCreating, updated.Status.Phase)
})
t.Run("completed job without completion annotation transitions to Failed once lease expires", func(t *testing.T) {
ckpt := makeCreatingCkpt(testHash, "job-missing-status")
completionTime := metav1.NewTime(time.Now().Add(-time.Minute))
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{Name: "job-missing-status", Namespace: testNamespace},
Status: batchv1.JobStatus{
Succeeded: 1,
CompletionTime: &completionTime,
Conditions: []batchv1.JobCondition{
{Type: batchv1.JobComplete, Status: corev1.ConditionTrue, LastTransitionTime: completionTime},
},
},
}
r := makeCheckpointReconciler(s, ckpt, job)
_, err := r.handleCreating(ctx, ckpt)
require.NoError(t, err)
updated := &nvidiacomv1alpha1.DynamoCheckpoint{}
require.NoError(t, r.Get(ctx, types.NamespacedName{Name: testHash, Namespace: testNamespace}, updated))
assert.Equal(t, nvidiacomv1alpha1.DynamoCheckpointPhaseFailed, updated.Status.Phase)
assert.Contains(t, updated.Status.Message, "without snapshot-agent completion confirmation")
})
t.Run("completed job with failed completion annotation transitions to Failed", func(t *testing.T) {
ckpt := makeCreatingCkpt(testHash, "job-agent-failed")
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "job-agent-failed",
Namespace: testNamespace,
Annotations: map[string]string{checkpointStatusAnnotation: checkpointStatusFailed},
},
Status: batchv1.JobStatus{
Succeeded: 1,
Conditions: []batchv1.JobCondition{
{Type: batchv1.JobComplete, Status: corev1.ConditionTrue, LastTransitionTime: metav1.Now()},
},
},
}
r := makeCheckpointReconciler(s, ckpt, job)
_, err := r.handleCreating(ctx, ckpt)
require.NoError(t, err)
updated := &nvidiacomv1alpha1.DynamoCheckpoint{}
require.NoError(t, r.Get(ctx, types.NamespacedName{Name: testHash, Namespace: testNamespace}, updated))
assert.Equal(t, nvidiacomv1alpha1.DynamoCheckpointPhaseFailed, updated.Status.Phase)
assert.Contains(t, updated.Status.Message, "snapshot-agent reported checkpoint failure")
})
t.Run("running job with failed checkpoint annotation transitions to Failed", func(t *testing.T) {
ckpt := makeCreatingCkpt(testHash, "job-running-agent-failed")
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "job-running-agent-failed",
Namespace: testNamespace,
Annotations: map[string]string{checkpointStatusAnnotation: checkpointStatusFailed},
},
Status: batchv1.JobStatus{Active: 1},
}
r := makeCheckpointReconciler(s, ckpt, job)
_, err := r.handleCreating(ctx, ckpt)
require.NoError(t, err)
updated := &nvidiacomv1alpha1.DynamoCheckpoint{}
require.NoError(t, r.Get(ctx, types.NamespacedName{Name: testHash, Namespace: testNamespace}, updated))
assert.Equal(t, nvidiacomv1alpha1.DynamoCheckpointPhaseFailed, updated.Status.Phase)
assert.Equal(t, "Checkpoint job failed", updated.Status.Message)
})
t.Run("running job keeps Creating phase", func(t *testing.T) {
ckpt := makeCreatingCkpt("ckpt-run", "job-run")
ckpt := makeCreatingCkpt(testHash, "job-run")
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{Name: "job-run", Namespace: testNamespace},
Status: batchv1.JobStatus{Active: 1},
......@@ -299,20 +538,37 @@ func TestCheckpointReconciler_HandleCreating(t *testing.T) {
require.NoError(t, err)
updated := &nvidiacomv1alpha1.DynamoCheckpoint{}
require.NoError(t, r.Get(ctx, types.NamespacedName{Name: "ckpt-run", Namespace: testNamespace}, updated))
require.NoError(t, r.Get(ctx, types.NamespacedName{Name: testHash, Namespace: testNamespace}, updated))
assert.Equal(t, nvidiacomv1alpha1.DynamoCheckpointPhaseCreating, updated.Status.Phase)
})
t.Run("succeeded count without complete condition keeps Creating phase", func(t *testing.T) {
ckpt := makeCreatingCkpt(testHash, "job-succeeded-not-complete")
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{Name: "job-succeeded-not-complete", Namespace: testNamespace},
Status: batchv1.JobStatus{Succeeded: 1},
}
r := makeCheckpointReconciler(s, ckpt, job)
_, err := r.handleCreating(ctx, ckpt)
require.NoError(t, err)
updated := &nvidiacomv1alpha1.DynamoCheckpoint{}
require.NoError(t, r.Get(ctx, types.NamespacedName{Name: testHash, Namespace: testNamespace}, updated))
assert.Equal(t, nvidiacomv1alpha1.DynamoCheckpointPhaseCreating, updated.Status.Phase)
})
t.Run("deleted job resets to Pending", func(t *testing.T) {
ckpt := makeCreatingCkpt("ckpt-del", "job-deleted")
ckpt := makeCreatingCkpt(testHash, "job-deleted")
r := makeCheckpointReconciler(s, ckpt) // no job object
_, err := r.handleCreating(ctx, ckpt)
require.NoError(t, err)
updated := &nvidiacomv1alpha1.DynamoCheckpoint{}
require.NoError(t, r.Get(ctx, types.NamespacedName{Name: "ckpt-del", Namespace: testNamespace}, updated))
require.NoError(t, r.Get(ctx, types.NamespacedName{Name: testHash, Namespace: testNamespace}, updated))
assert.Equal(t, nvidiacomv1alpha1.DynamoCheckpointPhasePending, updated.Status.Phase)
assert.Empty(t, updated.Status.JobName)
})
}
......@@ -1068,19 +1068,19 @@ func (r *DynamoComponentDeploymentReconciler) generatePodTemplateSpec(ctx contex
maps.Copy(podAnnotations, extraPodMetadata.Annotations)
maps.Copy(podLabels, extraPodMetadata.Labels)
}
// Restore labels are operator-controlled. Clear any stale/user-provided
// value after metadata merge; the controller re-adds it only when the
// checkpoint contract below is satisfied.
delete(podLabels, commonconsts.KubeLabelIsRestoreTarget)
// Explicit restore orchestration contract:
// only mark pods as restore targets when checkpoint material is ready.
if checkpointInfo != nil && checkpointInfo.Enabled && checkpointInfo.Ready {
podLabels[commonconsts.KubeLabelIsRestoreTarget] = commonconsts.KubeLabelValueTrue
if checkpointInfo.Hash != "" {
podLabels[commonconsts.KubeLabelCheckpointHash] = checkpointInfo.Hash
podLabels[commonconsts.KubeLabelDynamoGraphDeploymentName] = opt.dynamoComponentDeployment.Spec.Labels[commonconsts.KubeLabelDynamoGraphDeploymentName]
if opt.dynamoComponentDeployment.Spec.ComponentType != "" {
podLabels[commonconsts.KubeLabelDynamoComponentType] = opt.dynamoComponentDeployment.Spec.ComponentType
}
if opt.dynamoComponentDeployment.Spec.DynamoNamespace != nil && *opt.dynamoComponentDeployment.Spec.DynamoNamespace != "" {
podLabels[commonconsts.KubeLabelDynamoNamespace] = *opt.dynamoComponentDeployment.Spec.DynamoNamespace
}
if workerHash := opt.dynamoComponentDeployment.Spec.Labels[commonconsts.KubeLabelDynamoWorkerHash]; workerHash != "" {
podLabels[commonconsts.KubeLabelDynamoWorkerHash] = workerHash
}
// Restore labels are operator-controlled state. Clear stale values after
// metadata merge and only reapply them when checkpoint material is ready.
checkpoint.ApplyRestorePodMetadata(podLabels, podAnnotations, checkpointInfo)
// Propagate restart annotation to pod template to trigger rolling restart
// This is the same mechanism used by kubectl rollout restart
......
......@@ -25,6 +25,7 @@ import (
configv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/config/v1alpha1"
"github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/checkpoint"
commonconsts "github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/controller_common"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/dynamo"
......@@ -728,6 +729,7 @@ func TestDynamoComponentDeploymentReconciler_generateLeaderWorkerSet(t *testing.
commonconsts.KubeLabelMetricsEnabled: commonconsts.KubeLabelValueTrue,
"role": "leader",
"nvidia.com/label1": "label1",
commonconsts.KubeLabelDynamoNamespace: "default-test-lws-deploy",
commonconsts.KubeLabelDynamoComponentType: commonconsts.ComponentTypeWorker,
commonconsts.KubeLabelDynamoSubComponentType: "test-sub-component",
commonconsts.KubeLabelDynamoGraphDeploymentName: "",
......@@ -869,6 +871,7 @@ func TestDynamoComponentDeploymentReconciler_generateLeaderWorkerSet(t *testing.
commonconsts.KubeLabelMetricsEnabled: commonconsts.KubeLabelValueTrue,
"role": "worker",
"nvidia.com/label1": "label1",
commonconsts.KubeLabelDynamoNamespace: "default-test-lws-deploy",
commonconsts.KubeLabelDynamoComponentType: commonconsts.ComponentTypeWorker,
commonconsts.KubeLabelDynamoSubComponentType: "test-sub-component",
commonconsts.KubeLabelDynamoGraphDeploymentName: "",
......@@ -1267,6 +1270,7 @@ func TestDynamoComponentDeploymentReconciler_generatePodTemplateSpec_RestoreLabe
DynamoNamespace: ptr.To("default"),
Labels: map[string]string{
commonconsts.KubeLabelDynamoGraphDeploymentName: "test-dgd",
commonconsts.KubeLabelDynamoWorkerHash: "workerhash",
commonconsts.KubeLabelIsRestoreTarget: commonconsts.KubeLabelValueTrue,
},
Checkpoint: &v1alpha1.ServiceCheckpointConfig{
......@@ -1308,16 +1312,20 @@ func TestDynamoComponentDeploymentReconciler_generatePodTemplateSpec_RestoreLabe
}
t.Run("ready checkpoint adds explicit restore labels", func(t *testing.T) {
checkpointName := "ckpt-ready"
identity := v1alpha1.DynamoCheckpointIdentity{Model: "test-model", BackendFramework: "vllm"}
checkpointName, err := checkpoint.ComputeIdentityHash(identity)
if err != nil {
t.Fatalf("ComputeIdentityHash failed: %v", err)
}
dcd := makeDCD(checkpointName)
ckpt := &v1alpha1.DynamoCheckpoint{
ObjectMeta: metav1.ObjectMeta{
Name: checkpointName,
Namespace: "default",
},
Spec: v1alpha1.DynamoCheckpointSpec{Identity: identity},
Status: v1alpha1.DynamoCheckpointStatus{
Phase: v1alpha1.DynamoCheckpointPhaseReady,
IdentityHash: "hash-ready-1",
},
}
......@@ -1334,22 +1342,76 @@ func TestDynamoComponentDeploymentReconciler_generatePodTemplateSpec_RestoreLabe
if got := podTemplateSpec.Labels[commonconsts.KubeLabelIsRestoreTarget]; got != commonconsts.KubeLabelValueTrue {
t.Fatalf("expected %s label to be true, got %q", commonconsts.KubeLabelIsRestoreTarget, got)
}
if got := podTemplateSpec.Labels[commonconsts.KubeLabelCheckpointHash]; got != "hash-ready-1" {
if got := podTemplateSpec.Labels[commonconsts.KubeLabelCheckpointHash]; got != checkpointName {
t.Fatalf("expected %s to be checkpoint hash, got %q", commonconsts.KubeLabelCheckpointHash, got)
}
})
t.Run("operator reasserts restore identity labels after metadata merge", func(t *testing.T) {
identity := v1alpha1.DynamoCheckpointIdentity{Model: "test-model", BackendFramework: "vllm"}
checkpointName, err := checkpoint.ComputeIdentityHash(identity)
if err != nil {
t.Fatalf("ComputeIdentityHash failed: %v", err)
}
dcd := makeDCD(checkpointName)
dcd.Spec.ExtraPodMetadata = &v1alpha1.ExtraPodMetadata{
Labels: map[string]string{
commonconsts.KubeLabelDynamoNamespace: "wrong-namespace",
commonconsts.KubeLabelDynamoComponentType: commonconsts.ComponentTypeFrontend,
commonconsts.KubeLabelDynamoGraphDeploymentName: "wrong-dgd",
commonconsts.KubeLabelDynamoWorkerHash: "wrong-hash",
},
}
ckpt := &v1alpha1.DynamoCheckpoint{
ObjectMeta: metav1.ObjectMeta{
Name: checkpointName,
Namespace: "default",
},
Spec: v1alpha1.DynamoCheckpointSpec{Identity: identity},
Status: v1alpha1.DynamoCheckpointStatus{
Phase: v1alpha1.DynamoCheckpointPhaseReady,
},
}
r := makeReconciler(dcd, ckpt)
podTemplateSpec, err := r.generatePodTemplateSpec(
context.Background(),
generateResourceOption{dynamoComponentDeployment: dcd},
dynamo.RoleMain,
)
if err != nil {
t.Fatalf("generatePodTemplateSpec failed: %v", err)
}
if got := podTemplateSpec.Labels[commonconsts.KubeLabelDynamoNamespace]; got != defaultNamespace {
t.Fatalf("expected %s label to be %q, got %q", commonconsts.KubeLabelDynamoNamespace, "default", got)
}
if got := podTemplateSpec.Labels[commonconsts.KubeLabelDynamoComponentType]; got != commonconsts.ComponentTypeWorker {
t.Fatalf("expected %s label to be %q, got %q", commonconsts.KubeLabelDynamoComponentType, commonconsts.ComponentTypeWorker, got)
}
if got := podTemplateSpec.Labels[commonconsts.KubeLabelDynamoGraphDeploymentName]; got != "test-dgd" {
t.Fatalf("expected %s label to be %q, got %q", commonconsts.KubeLabelDynamoGraphDeploymentName, "test-dgd", got)
}
if got := podTemplateSpec.Labels[commonconsts.KubeLabelDynamoWorkerHash]; got != "workerhash" {
t.Fatalf("expected %s label to be %q, got %q", commonconsts.KubeLabelDynamoWorkerHash, "workerhash", got)
}
})
t.Run("non-ready checkpoint clears stale restore labels", func(t *testing.T) {
checkpointName := "ckpt-pending"
identity := v1alpha1.DynamoCheckpointIdentity{Model: "test-model", BackendFramework: "vllm"}
checkpointName, err := checkpoint.ComputeIdentityHash(identity)
if err != nil {
t.Fatalf("ComputeIdentityHash failed: %v", err)
}
dcd := makeDCD(checkpointName)
ckpt := &v1alpha1.DynamoCheckpoint{
ObjectMeta: metav1.ObjectMeta{
Name: checkpointName,
Namespace: "default",
},
Spec: v1alpha1.DynamoCheckpointSpec{Identity: identity},
Status: v1alpha1.DynamoCheckpointStatus{
Phase: v1alpha1.DynamoCheckpointPhaseCreating,
IdentityHash: "hash-pending-1",
},
}
......@@ -1440,16 +1502,20 @@ func TestDynamoComponentDeploymentReconciler_generateDeployment_RestoreStrategy(
}
t.Run("ready checkpoint forces Recreate strategy", func(t *testing.T) {
checkpointName := "ckpt-ready"
identity := v1alpha1.DynamoCheckpointIdentity{Model: "test-model", BackendFramework: "vllm"}
checkpointName, err := checkpoint.ComputeIdentityHash(identity)
if err != nil {
t.Fatalf("ComputeIdentityHash failed: %v", err)
}
dcd := makeDCD(checkpointName)
ckpt := &v1alpha1.DynamoCheckpoint{
ObjectMeta: metav1.ObjectMeta{
Name: checkpointName,
Namespace: "default",
},
Spec: v1alpha1.DynamoCheckpointSpec{Identity: identity},
Status: v1alpha1.DynamoCheckpointStatus{
Phase: v1alpha1.DynamoCheckpointPhaseReady,
IdentityHash: "hash-ready-1",
},
}
......@@ -1469,16 +1535,20 @@ func TestDynamoComponentDeploymentReconciler_generateDeployment_RestoreStrategy(
})
t.Run("non-ready checkpoint keeps RollingUpdate strategy", func(t *testing.T) {
checkpointName := "ckpt-creating"
identity := v1alpha1.DynamoCheckpointIdentity{Model: "test-model", BackendFramework: "vllm"}
checkpointName, err := checkpoint.ComputeIdentityHash(identity)
if err != nil {
t.Fatalf("ComputeIdentityHash failed: %v", err)
}
dcd := makeDCD(checkpointName)
ckpt := &v1alpha1.DynamoCheckpoint{
ObjectMeta: metav1.ObjectMeta{
Name: checkpointName,
Namespace: "default",
},
Spec: v1alpha1.DynamoCheckpointSpec{Identity: identity},
Status: v1alpha1.DynamoCheckpointStatus{
Phase: v1alpha1.DynamoCheckpointPhaseCreating,
IdentityHash: "hash-creating-1",
},
}
......@@ -2068,6 +2138,100 @@ func Test_reconcileDeploymentResources(t *testing.T) {
}
}
func Test_reconcileDeploymentResources_DoesNotRecycleFailedRestorePods(t *testing.T) {
ctx := context.Background()
g := gomega.NewGomegaWithT(t)
s := scheme.Scheme
g.Expect(v1alpha1.AddToScheme(s)).To(gomega.Succeed())
g.Expect(appsv1.AddToScheme(s)).To(gomega.Succeed())
g.Expect(corev1.AddToScheme(s)).To(gomega.Succeed())
replicas := int32(1)
dcd := &v1alpha1.DynamoComponentDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-component",
Namespace: "default",
},
Spec: v1alpha1.DynamoComponentDeploymentSpec{
BackendFramework: string(dynamo.BackendFrameworkVLLM),
DynamoComponentDeploymentSharedSpec: v1alpha1.DynamoComponentDeploymentSharedSpec{
ServiceName: "test-service",
DynamoNamespace: ptr.To("default"),
ComponentType: string(commonconsts.ComponentTypeDecode),
Replicas: &replicas,
ExtraPodSpec: &v1alpha1.ExtraPodSpec{
MainContainer: &corev1.Container{
Image: "test-image:latest",
Args: []string{"--test-arg"},
},
},
},
},
}
deployment := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-component",
Namespace: "default",
Generation: 1,
},
Spec: appsv1.DeploymentSpec{
Replicas: ptr.To(int32(1)),
},
Status: appsv1.DeploymentStatus{
ObservedGeneration: 1,
Replicas: 1,
UpdatedReplicas: 1,
ReadyReplicas: 0,
AvailableReplicas: 0,
Conditions: []appsv1.DeploymentCondition{
{
Type: appsv1.DeploymentAvailable,
Status: corev1.ConditionFalse,
},
},
},
}
fakeKubeClient := fake.NewClientBuilder().
WithScheme(s).
WithObjects(dcd, deployment).
WithStatusSubresource(dcd, deployment).
Build()
reconciler := &DynamoComponentDeploymentReconciler{
Client: fakeKubeClient,
Recorder: record.NewFakeRecorder(100),
Config: &configv1alpha1.OperatorConfiguration{},
RuntimeConfig: &controller_common.RuntimeConfig{},
DockerSecretRetriever: &mockDockerSecretRetriever{
GetSecretsFunc: func(namespace, imageName string) ([]string, error) {
return []string{}, nil
},
},
}
result, err := reconciler.reconcileDeploymentResources(ctx, dcd)
g.Expect(err).NotTo(gomega.HaveOccurred())
g.Expect(result).To(gomega.Equal(ComponentReconcileResult{
modified: true,
status: metav1.ConditionFalse,
reason: "DeploymentNotReady",
message: "Deployment is not ready",
serviceReplicaStatus: &v1alpha1.ServiceReplicaStatus{
ComponentKind: v1alpha1.ComponentKindDeployment,
ComponentName: "test-component",
ComponentNames: []string{"test-component"},
Replicas: 1,
UpdatedReplicas: 1,
ReadyReplicas: ptr.To(int32(0)),
AvailableReplicas: ptr.To(int32(0)),
},
}))
}
func Test_setStatusConditionAndServiceReplicaStatus(t *testing.T) {
ctx := context.Background()
......
......@@ -87,6 +87,7 @@ type DynamoGraphDeploymentReconciler struct {
// +kubebuilder:rbac:groups=grove.io,resources=podcliquescalinggroups/scale,verbs=get;update;patch
// +kubebuilder:rbac:groups=scheduling.run.ai,resources=queues,verbs=get;list
// +kubebuilder:rbac:groups=inference.networking.k8s.io,resources=inferencepools,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
......@@ -1202,12 +1203,15 @@ func (r *DynamoGraphDeploymentReconciler) reconcilePVCs(ctx context.Context, dyn
return nil
}
// reconcileCheckpoints reconciles Checkpoint CRs for services with checkpointing enabled
// For Auto mode, it creates Checkpoint CRs if they don't exist
// Returns a map of service names to checkpoint status and a map of service names to checkpoint info
func (r *DynamoGraphDeploymentReconciler) reconcileCheckpoints(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) (map[string]nvidiacomv1alpha1.ServiceCheckpointStatus, map[string]*checkpoint.CheckpointInfo, error) {
// reconcileCheckpoints reconciles Checkpoint CRs for services with checkpointing enabled.
// For Auto mode, it creates Checkpoint CRs if they do not exist.
// Returns per-service checkpoint status and resolved checkpoint info.
func (r *DynamoGraphDeploymentReconciler) reconcileCheckpoints(
ctx context.Context,
dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment,
) (map[string]nvidiacomv1alpha1.ServiceCheckpointStatus, map[string]*checkpoint.CheckpointInfo, error) {
logger := log.FromContext(ctx)
statuses := make(map[string]nvidiacomv1alpha1.ServiceCheckpointStatus)
checkpointStatuses := make(map[string]nvidiacomv1alpha1.ServiceCheckpointStatus)
checkpointInfos := make(map[string]*checkpoint.CheckpointInfo)
for serviceName, component := range dynamoDeployment.Spec.Services {
......@@ -1227,8 +1231,13 @@ func (r *DynamoGraphDeploymentReconciler) reconcileCheckpoints(ctx context.Conte
// Store checkpoint info for later use in pod spec generation
checkpointInfos[serviceName] = info
// If no checkpoint found and mode is Auto, create one
if info.CheckpointName == "" && component.Checkpoint.Mode == nvidiacomv1alpha1.CheckpointModeAuto {
// checkpointRef is authoritative. Auto mode should only create the canonical checkpoint
// when the service is using identity-based lookup.
if component.Checkpoint.Mode == nvidiacomv1alpha1.CheckpointModeAuto &&
(component.Checkpoint.CheckpointRef == nil || *component.Checkpoint.CheckpointRef == "") &&
!info.Exists &&
info.Identity != nil &&
!info.Ready {
logger.Info("Creating DynamoCheckpoint CR in Auto mode", "service", serviceName)
ckpt, err := r.createCheckpointCR(ctx, dynamoDeployment, serviceName, component)
......@@ -1236,28 +1245,22 @@ func (r *DynamoGraphDeploymentReconciler) reconcileCheckpoints(ctx context.Conte
logger.Error(err, "Failed to create DynamoCheckpoint CR", "service", serviceName)
return nil, nil, fmt.Errorf("failed to create checkpoint for service %s: %w", serviceName, err)
}
info.Exists = true
info.CheckpointName = ckpt.Name
// Compute hash locally since status may not be populated yet
// (checkpoint controller reconciles asynchronously)
hash, err := checkpoint.ComputeIdentityHash(*component.Checkpoint.Identity)
if err != nil {
logger.Error(err, "Failed to compute checkpoint identity hash", "service", serviceName)
return nil, nil, fmt.Errorf("failed to compute checkpoint hash for service %s: %w", serviceName, err)
if info.Hash == "" {
info.Hash = ckpt.Status.IdentityHash
}
info.Hash = hash
info.Ready = false // Newly created checkpoint is not ready yet
info.Ready = false
}
// Update status
statuses[serviceName] = nvidiacomv1alpha1.ServiceCheckpointStatus{
checkpointStatuses[serviceName] = nvidiacomv1alpha1.ServiceCheckpointStatus{
CheckpointName: info.CheckpointName,
IdentityHash: info.Hash,
Ready: info.Ready,
}
}
return statuses, checkpointInfos, nil
return checkpointStatuses, checkpointInfos, nil
}
// createCheckpointCR creates a DynamoCheckpoint CR for a service in Auto mode
......@@ -1273,22 +1276,6 @@ func (r *DynamoGraphDeploymentReconciler) createCheckpointCR(
identity := component.Checkpoint.Identity
// Compute hash for naming
hash, err := checkpoint.ComputeIdentityHash(*identity)
if err != nil {
return nil, fmt.Errorf("failed to compute identity hash: %w", err)
}
// Generate checkpoint name: use hash directly (16 chars, 64 bits)
// This allows natural deduplication - same identity = same checkpoint name
// 16 characters provides excellent collision resistance (1% at 500M configs)
ckptName := hash
// Use SyncResource to create/update the DynamoCheckpoint CR
// Pass nil as parentResource to create an independent checkpoint (no owner reference)
// This ensures the checkpoint persists even if the DGD is deleted
_, ckpt, err := commoncontroller.SyncResource(ctx, r, nil, func(ctx context.Context) (*nvidiacomv1alpha1.DynamoCheckpoint, bool, error) {
// Build the checkpoint identity from service identity
checkpointIdentity := nvidiacomv1alpha1.DynamoCheckpointIdentity{
Model: identity.Model,
BackendFramework: identity.BackendFramework,
......@@ -1300,43 +1287,25 @@ func (r *DynamoGraphDeploymentReconciler) createCheckpointCR(
ExtraParameters: identity.ExtraParameters,
}
// Build pod template from service spec for checkpoint job
// This uses GenerateBasePodSpec to ensure same config as worker pods (image pull secrets, etc.)
// Pass framework from checkpoint identity for accurate backend detection
// Capture config is not part of the checkpoint identity. Once a checkpoint object exists for a
// hash, later reconcilers must reuse it instead of racing to overwrite the capture pod template.
podTemplate, err := r.buildCheckpointJobPodTemplate(
dynamoDeployment,
component,
serviceName,
identity.BackendFramework, // Use framework from checkpoint identity
identity.BackendFramework,
)
if err != nil {
return nil, false, fmt.Errorf("failed to build checkpoint job pod template: %w", err)
}
ckpt := &nvidiacomv1alpha1.DynamoCheckpoint{
ObjectMeta: metav1.ObjectMeta{
Name: ckptName,
Namespace: dynamoDeployment.Namespace,
Labels: map[string]string{
consts.KubeLabelDynamoGraphDeploymentName: dynamoDeployment.Name,
consts.KubeLabelDynamoComponent: serviceName,
consts.KubeLabelCheckpointHash: hash,
},
},
Spec: nvidiacomv1alpha1.DynamoCheckpointSpec{
Identity: checkpointIdentity,
Job: nvidiacomv1alpha1.DynamoCheckpointJobConfig{
PodTemplateSpec: podTemplate,
},
},
}
return ckpt, false, nil
})
if err != nil {
return nil, fmt.Errorf("failed to sync checkpoint CR: %w", err)
return nil, fmt.Errorf("failed to build checkpoint job pod template: %w", err)
}
return ckpt, nil
return checkpoint.CreateOrGetAutoCheckpoint(
ctx,
r.Client,
dynamoDeployment.Namespace,
checkpointIdentity,
podTemplate,
)
}
// buildCheckpointJobPodTemplate builds a pod template for the checkpoint job from service spec
......@@ -1603,6 +1572,7 @@ func (r *DynamoGraphDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) err
GenericFunc: func(ge event.GenericEvent) bool { return false },
}),
)
}
// Wrap with metrics collection
observedReconciler := observability.NewObservedReconciler(r, consts.ResourceTypeDynamoGraphDeployment)
......
......@@ -23,11 +23,13 @@ import (
configv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/config/v1alpha1"
"github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/checkpoint"
commonconsts "github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/controller_common"
grovev1alpha1 "github.com/ai-dynamo/grove/operator/api/core/v1alpha1"
"github.com/onsi/gomega"
autoscalingv1 "k8s.io/api/autoscaling/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
......@@ -342,6 +344,314 @@ func TestDynamoGraphDeploymentReconciler_reconcileScalingAdapters(t *testing.T)
}
}
func TestDynamoGraphDeploymentReconciler_createCheckpointCR_reusesExistingCapture(t *testing.T) {
if err := v1alpha1.AddToScheme(scheme.Scheme); err != nil {
t.Fatalf("Failed to add v1alpha1 to scheme: %v", err)
}
ctx := context.Background()
identity := v1alpha1.DynamoCheckpointIdentity{
Model: "meta-llama/Llama-2-7b-hf",
BackendFramework: "vllm",
}
hash, err := checkpoint.ComputeIdentityHash(identity)
if err != nil {
t.Fatalf("Failed to compute checkpoint hash: %v", err)
}
existing := &v1alpha1.DynamoCheckpoint{
ObjectMeta: metav1.ObjectMeta{
Name: "existing-worker-checkpoint",
Namespace: "default",
},
Spec: v1alpha1.DynamoCheckpointSpec{
Identity: identity,
Job: v1alpha1.DynamoCheckpointJobConfig{
PodTemplateSpec: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Name: "main",
Image: "keep-existing:latest",
}},
},
},
},
},
Status: v1alpha1.DynamoCheckpointStatus{
IdentityHash: hash,
},
}
reconciler := &DynamoGraphDeploymentReconciler{
Client: fake.NewClientBuilder().
WithScheme(scheme.Scheme).
WithObjects(existing).
Build(),
Config: &configv1alpha1.OperatorConfiguration{},
Recorder: record.NewFakeRecorder(10),
}
dgd := &v1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dgd",
Namespace: "default",
},
}
component := &v1alpha1.DynamoComponentDeploymentSharedSpec{
ComponentType: string(commonconsts.ComponentTypeWorker),
Checkpoint: &v1alpha1.ServiceCheckpointConfig{
Enabled: true,
Mode: v1alpha1.CheckpointModeAuto,
Identity: &v1alpha1.DynamoCheckpointIdentity{
Model: identity.Model,
BackendFramework: identity.BackendFramework,
TensorParallelSize: 1,
PipelineParallelSize: 1,
ExtraParameters: map[string]string{},
},
},
ExtraPodSpec: &v1alpha1.ExtraPodSpec{
MainContainer: &corev1.Container{
Name: "main",
Image: "new-writer:latest",
},
},
}
ckpt, err := reconciler.createCheckpointCR(ctx, dgd, "worker", component)
if err != nil {
t.Fatalf("createCheckpointCR() error = %v", err)
}
if ckpt.Name != "existing-worker-checkpoint" {
t.Fatalf("createCheckpointCR() returned checkpoint %s, want existing-worker-checkpoint", ckpt.Name)
}
updated := &v1alpha1.DynamoCheckpoint{}
if err := reconciler.Get(ctx, types.NamespacedName{Name: "existing-worker-checkpoint", Namespace: "default"}, updated); err != nil {
t.Fatalf("Failed to get checkpoint: %v", err)
}
if len(updated.Spec.Job.PodTemplateSpec.Spec.Containers) != 1 {
t.Fatalf("expected one job container, got %d", len(updated.Spec.Job.PodTemplateSpec.Spec.Containers))
}
if updated.Spec.Job.PodTemplateSpec.Spec.Containers[0].Image != "keep-existing:latest" {
t.Fatalf("existing job image was mutated to %s", updated.Spec.Job.PodTemplateSpec.Spec.Containers[0].Image)
}
}
func TestDynamoGraphDeploymentReconciler_reconcileCheckpoints_checkpointRefSkipsAutoCreateWhileReferencedCRIsNotReady(t *testing.T) {
if err := v1alpha1.AddToScheme(scheme.Scheme); err != nil {
t.Fatalf("Failed to add v1alpha1 to scheme: %v", err)
}
ctx := context.Background()
identity := v1alpha1.DynamoCheckpointIdentity{
Model: "meta-llama/Llama-2-7b-hf",
BackendFramework: "vllm",
}
hash, err := checkpoint.ComputeIdentityHash(identity)
if err != nil {
t.Fatalf("Failed to compute checkpoint hash: %v", err)
}
referenced := &v1alpha1.DynamoCheckpoint{
ObjectMeta: metav1.ObjectMeta{
Name: "friendly-checkpoint",
Namespace: "default",
},
Spec: v1alpha1.DynamoCheckpointSpec{
Identity: identity,
Job: v1alpha1.DynamoCheckpointJobConfig{
PodTemplateSpec: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Name: "main",
Image: "keep-existing:latest",
}},
},
},
},
},
Status: v1alpha1.DynamoCheckpointStatus{
Phase: v1alpha1.DynamoCheckpointPhaseCreating,
IdentityHash: hash,
},
}
reconciler := &DynamoGraphDeploymentReconciler{
Client: fake.NewClientBuilder().
WithScheme(scheme.Scheme).
WithObjects(referenced).
WithStatusSubresource(referenced).
Build(),
Config: &configv1alpha1.OperatorConfiguration{},
Recorder: record.NewFakeRecorder(10),
}
ref := friendlyCheckpointName
dgd := &v1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dgd",
Namespace: "default",
},
Spec: v1alpha1.DynamoGraphDeploymentSpec{
Services: map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
"worker": {
ComponentType: string(commonconsts.ComponentTypeWorker),
Checkpoint: &v1alpha1.ServiceCheckpointConfig{
Enabled: true,
Mode: v1alpha1.CheckpointModeAuto,
CheckpointRef: &ref,
},
},
},
},
}
checkpointStatuses, checkpointInfos, err := reconciler.reconcileCheckpoints(ctx, dgd)
if err != nil {
t.Fatalf("reconcileCheckpoints() error = %v", err)
}
info, ok := checkpointInfos["worker"]
if !ok {
t.Fatalf("expected checkpoint info for worker service")
}
if info.Ready {
t.Fatalf("expected referenced checkpoint to remain not ready")
}
if !info.Exists {
t.Fatalf("expected referenced checkpoint to exist")
}
if info.Hash != hash {
t.Fatalf("checkpoint hash = %s, want %s", info.Hash, hash)
}
if checkpointStatuses["worker"].CheckpointName != "friendly-checkpoint" {
t.Fatalf("checkpoint status name = %s, want friendly-checkpoint", checkpointStatuses["worker"].CheckpointName)
}
checkpoints := &v1alpha1.DynamoCheckpointList{}
if err := reconciler.List(ctx, checkpoints, client.InNamespace("default")); err != nil {
t.Fatalf("failed to list checkpoints: %v", err)
}
if len(checkpoints.Items) != 1 {
t.Fatalf("expected only the referenced checkpoint to exist, found %d", len(checkpoints.Items))
}
if checkpoints.Items[0].Name != "friendly-checkpoint" {
t.Fatalf("unexpected checkpoint %s", checkpoints.Items[0].Name)
}
}
func TestDynamoGraphDeploymentReconciler_reconcileCheckpoints_autoModeWaitsForExistingCreatingCheckpoint(t *testing.T) {
if err := v1alpha1.AddToScheme(scheme.Scheme); err != nil {
t.Fatalf("Failed to add v1alpha1 to scheme: %v", err)
}
ctx := context.Background()
identity := v1alpha1.DynamoCheckpointIdentity{
Model: "meta-llama/Llama-2-7b-hf",
BackendFramework: "vllm",
}
hash, err := checkpoint.ComputeIdentityHash(identity)
if err != nil {
t.Fatalf("Failed to compute checkpoint hash: %v", err)
}
existing := &v1alpha1.DynamoCheckpoint{
ObjectMeta: metav1.ObjectMeta{
Name: "existing-worker-checkpoint",
Namespace: "default",
},
Spec: v1alpha1.DynamoCheckpointSpec{
Identity: identity,
Job: v1alpha1.DynamoCheckpointJobConfig{
PodTemplateSpec: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Name: "main",
Image: "keep-existing:latest",
}},
},
},
},
},
Status: v1alpha1.DynamoCheckpointStatus{
Phase: v1alpha1.DynamoCheckpointPhaseCreating,
IdentityHash: hash,
},
}
reconciler := &DynamoGraphDeploymentReconciler{
Client: fake.NewClientBuilder().
WithScheme(scheme.Scheme).
WithObjects(existing).
WithStatusSubresource(existing).
Build(),
Config: &configv1alpha1.OperatorConfiguration{},
Recorder: record.NewFakeRecorder(10),
}
dgd := &v1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-dgd",
Namespace: "default",
},
Spec: v1alpha1.DynamoGraphDeploymentSpec{
Services: map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
"worker": {
ComponentType: string(commonconsts.ComponentTypeWorker),
Checkpoint: &v1alpha1.ServiceCheckpointConfig{
Enabled: true,
Mode: v1alpha1.CheckpointModeAuto,
Identity: &v1alpha1.DynamoCheckpointIdentity{
Model: identity.Model,
BackendFramework: identity.BackendFramework,
},
},
ExtraPodSpec: &v1alpha1.ExtraPodSpec{
MainContainer: &corev1.Container{
Name: "main",
Image: "new-writer:latest",
},
},
},
},
},
}
checkpointStatuses, checkpointInfos, err := reconciler.reconcileCheckpoints(ctx, dgd)
if err != nil {
t.Fatalf("reconcileCheckpoints() error = %v", err)
}
info, ok := checkpointInfos["worker"]
if !ok {
t.Fatalf("expected checkpoint info for worker service")
}
if info.Ready {
t.Fatalf("expected existing checkpoint to remain not ready")
}
if !info.Exists {
t.Fatalf("expected existing checkpoint to be detected")
}
if info.Hash != hash {
t.Fatalf("checkpoint hash = %s, want %s", info.Hash, hash)
}
if checkpointStatuses["worker"].CheckpointName != "existing-worker-checkpoint" {
t.Fatalf("checkpoint status name = %s, want existing-worker-checkpoint", checkpointStatuses["worker"].CheckpointName)
}
updated := &v1alpha1.DynamoCheckpoint{}
if err := reconciler.Get(ctx, types.NamespacedName{Name: "existing-worker-checkpoint", Namespace: "default"}, updated); err != nil {
t.Fatalf("Failed to get checkpoint: %v", err)
}
if len(updated.Spec.Job.PodTemplateSpec.Spec.Containers) != 1 {
t.Fatalf("expected one job container, got %d", len(updated.Spec.Job.PodTemplateSpec.Spec.Containers))
}
if updated.Spec.Job.PodTemplateSpec.Spec.Containers[0].Image != "keep-existing:latest" {
t.Fatalf("existing job image was mutated to %s", updated.Spec.Job.PodTemplateSpec.Spec.Containers[0].Image)
}
}
// mockScaleClient implements scale.ScalesGetter for testing
type mockScaleClient struct{}
......
......@@ -28,7 +28,6 @@ import (
istioNetworking "istio.io/api/networking/v1beta1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/utils/ptr"
......@@ -928,8 +927,9 @@ func IsWorkerComponent(componentType string) bool {
componentType == commonconsts.ComponentTypeDecode
}
// addStandardEnvVars adds the standard environment variables that are common to both Grove and Controller
func addStandardEnvVars(container *corev1.Container, operatorConfig *configv1alpha1.OperatorConfiguration) {
// AddStandardEnvVars adds the standard environment variables that are common to
// both checkpoint jobs and generated worker pods.
func AddStandardEnvVars(container *corev1.Container, operatorConfig *configv1alpha1.OperatorConfiguration) {
standardEnvVars := []corev1.EnvVar{}
if operatorConfig.Infrastructure.NATSAddress != "" {
standardEnvVars = append(standardEnvVars, corev1.EnvVar{
......@@ -1077,7 +1077,7 @@ func GenerateBasePodSpec(
})
}
addStandardEnvVars(&container, operatorConfig)
AddStandardEnvVars(&container, operatorConfig)
volumes := make([]corev1.Volume, 0, len(component.VolumeMounts)+1) // +1 for shared memory volume
......@@ -1113,11 +1113,6 @@ func GenerateBasePodSpec(
MountPath: mountPoint,
})
}
if shmVol, shmMount := generateSharedMemoryVolumeAndMount(component.SharedMemory); shmVol != nil && shmMount != nil {
volumes = append(volumes, *shmVol)
container.VolumeMounts = append(container.VolumeMounts, *shmMount)
}
// Apply backend-specific container modifications
multinodeDeployer := MultinodeDeployerFactory(multinodeDeploymentType)
if multinodeDeployer == nil {
......@@ -1161,8 +1156,9 @@ func GenerateBasePodSpec(
}
}
podSpec.Containers = append(podSpec.Containers, container)
podSpec.Volumes = append(podSpec.Volumes, volumes...)
ApplySharedMemoryVolumeAndMount(&podSpec, &container, component.SharedMemory)
podSpec.Containers = append(podSpec.Containers, container)
podSpec.ImagePullSecrets = controller_common.AppendUniqueImagePullSecrets(podSpec.ImagePullSecrets, imagePullSecrets)
backend.UpdatePodSpec(&podSpec, numberOfNodes, role, component, serviceName, multinodeDeployer)
......@@ -1171,7 +1167,7 @@ func GenerateBasePodSpec(
// This handles ALL checkpoint-related modifications:
// - Command/Args transformation (moves Command to Args to respect image ENTRYPOINT)
// - Security context (hostIPC, privileged mode)
// - Environment variables (checkpoint path, hash, CRIU settings)
// - Restore/checkpoint pod metadata (labels/annotations)
// - Storage configuration (volumes, mounts)
// CheckpointInfo should have been resolved by ResolveCheckpointForService before calling this function
// Checkpoint config comes from the operator's controller config (Helm values)
......@@ -1215,7 +1211,6 @@ func setMetricsLabels(labels map[string]string, dynamoGraphDeployment *v1alpha1.
func generateComponentContext(component *v1alpha1.DynamoComponentDeploymentSharedSpec, parentGraphDeploymentName string, namespace string, numberOfNodes int32, discoveryBackend configv1alpha1.DiscoveryBackend) ComponentContext {
dynamoNamespace := v1alpha1.ComputeDynamoNamespace(component.GlobalDynamoNamespace, namespace, parentGraphDeploymentName)
var workerHashSuffix string
if IsWorkerComponent(component.ComponentType) && component.Labels[commonconsts.KubeLabelDynamoWorkerHash] != "" {
workerHashSuffix = component.Labels[commonconsts.KubeLabelDynamoWorkerHash]
......@@ -1277,7 +1272,7 @@ func generateFrontendSidecar(
container.Env = MergeEnvs(container.Env, spec.Envs)
}
addStandardEnvVars(&container, operatorConfig)
AddStandardEnvVars(&container, operatorConfig)
return container, nil
}
......@@ -1423,7 +1418,7 @@ func GenerateGrovePodCliqueSet(
PodSpec: *podSpec,
},
}
labels, err := generateLabels(component, dynamoDeployment, serviceName, checkpointInfo)
labels, err := generateLabels(component, dynamoDeployment, serviceName)
if err != nil {
return nil, fmt.Errorf("failed to generate labels: %w", err)
}
......@@ -1432,6 +1427,7 @@ func GenerateGrovePodCliqueSet(
if err != nil {
return nil, fmt.Errorf("failed to generate annotations: %w", err)
}
checkpoint.ApplyRestorePodMetadata(labels, annotations, checkpointInfo)
// Apply restart annotation if this service should be restarted.
// For services not in the current restart order, preserve their existing annotation
......@@ -1481,7 +1477,6 @@ func generateLabels(
component *v1alpha1.DynamoComponentDeploymentSharedSpec,
dynamoDeployment *v1alpha1.DynamoGraphDeployment,
componentName string,
checkpointInfo *checkpoint.CheckpointInfo,
) (map[string]string, error) {
labels := make(map[string]string)
labels[commonconsts.KubeLabelDynamoSelector] = GetDCDResourceName(dynamoDeployment, componentName, "")
......@@ -1510,18 +1505,15 @@ func generateLabels(
return nil, fmt.Errorf("failed to merge extraPodMetadata labels: %w", err)
}
}
// Inject checkpoint labels AFTER user labels so they cannot be overridden.
var err error
labels, err = checkpoint.InjectCheckpointLabelsFromConfig(labels, component.Checkpoint)
if err != nil {
return nil, fmt.Errorf("failed to inject checkpoint labels: %w", err)
labels[commonconsts.KubeLabelDynamoGraphDeploymentName] = dynamoDeployment.Name
if component.ComponentType != "" {
labels[commonconsts.KubeLabelDynamoComponentType] = component.ComponentType
}
// Only mark pods as restore targets when a concrete checkpoint is ready.
if checkpointInfo != nil && checkpointInfo.Enabled && checkpointInfo.Ready {
labels[commonconsts.KubeLabelIsRestoreTarget] = "true"
labels[commonconsts.KubeLabelCheckpointHash] = checkpointInfo.Hash
if component.DynamoNamespace != nil {
labels[commonconsts.KubeLabelDynamoNamespace] = *component.DynamoNamespace
}
if workerHash := component.Labels[commonconsts.KubeLabelDynamoWorkerHash]; workerHash != "" {
labels[commonconsts.KubeLabelDynamoWorkerHash] = workerHash
}
return labels, nil
}
......@@ -1706,8 +1698,10 @@ func GenerateBasePodSpecForController(
}
// Generate base PodSpec with standard env vars using merged component envs
// For controller usage, we may not have serviceName, so use the component name as fallback
serviceName := dynComponent.Name
serviceName := dynComponent.Spec.ServiceName
if serviceName == "" {
serviceName = dynComponent.Name
}
podSpec, err := GenerateBasePodSpec(
componentSpec,
backendFramework,
......@@ -1742,30 +1736,3 @@ func getDefaultCompilationCacheMountPoint(backendFramework BackendFramework) str
return ""
}
}
func generateSharedMemoryVolumeAndMount(spec *v1alpha1.SharedMemorySpec) (*corev1.Volume, *corev1.VolumeMount) {
// default: enabled=true, size=8Gi
size := resource.MustParse(commonconsts.DefaultSharedMemorySize)
if spec != nil {
if spec.Disabled {
return nil, nil
}
if !spec.Size.IsZero() {
size = spec.Size
}
}
volume := corev1.Volume{
Name: commonconsts.KubeValueNameSharedMemory,
VolumeSource: corev1.VolumeSource{
EmptyDir: &corev1.EmptyDirVolumeSource{
Medium: corev1.StorageMediumMemory,
SizeLimit: &size,
},
},
}
volumeMount := corev1.VolumeMount{
Name: commonconsts.KubeValueNameSharedMemory,
MountPath: commonconsts.DefaultSharedMemoryMountPath,
}
return &volume, &volumeMount
}
......@@ -28,11 +28,13 @@ import (
configv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/config/v1alpha1"
"github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/checkpoint"
commonconsts "github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/controller_common"
grovev1alpha1 "github.com/ai-dynamo/grove/operator/api/core/v1alpha1"
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
......@@ -6815,6 +6817,104 @@ func TestGenerateGrovePodCliqueSet_RestartAnnotations(t *testing.T) {
}
}
func TestGenerateLabels_RemovesStaleRestoreLabelsWhenCheckpointNotReady(t *testing.T) {
labels, err := generateLabels(
&v1alpha1.DynamoComponentDeploymentSharedSpec{
ComponentType: commonconsts.ComponentTypeWorker,
DynamoNamespace: ptr.To("default-test-dgd"),
Labels: map[string]string{
"user-label": "keep",
commonconsts.KubeLabelIsRestoreTarget: commonconsts.KubeLabelValueTrue,
},
ExtraPodMetadata: &v1alpha1.ExtraPodMetadata{
Labels: map[string]string{
"extra-label": "keep-too",
commonconsts.KubeLabelCheckpointHash: "stale-hash",
},
},
},
&v1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{Name: "test-dgd"},
},
"Worker",
)
require.NoError(t, err)
annotations := map[string]string{}
checkpoint.ApplyRestorePodMetadata(labels, annotations, &checkpoint.CheckpointInfo{
Enabled: true,
Ready: false,
Hash: "resolved-hash",
})
assert.Equal(t, "keep", labels["user-label"])
assert.Equal(t, "keep-too", labels["extra-label"])
_, hasRestoreTarget := labels[commonconsts.KubeLabelIsRestoreTarget]
_, hasCheckpointHash := labels[commonconsts.KubeLabelCheckpointHash]
assert.False(t, hasRestoreTarget)
assert.False(t, hasCheckpointHash)
}
func TestGenerateLabels_OverwritesStaleRestoreLabelsWhenCheckpointReady(t *testing.T) {
labels, err := generateLabels(
&v1alpha1.DynamoComponentDeploymentSharedSpec{
ComponentType: commonconsts.ComponentTypeWorker,
DynamoNamespace: ptr.To("default-test-dgd"),
Labels: map[string]string{
commonconsts.KubeLabelIsRestoreTarget: "false",
},
ExtraPodMetadata: &v1alpha1.ExtraPodMetadata{
Labels: map[string]string{
commonconsts.KubeLabelCheckpointHash: "stale-hash",
},
},
},
&v1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{Name: "test-dgd"},
},
"Worker",
)
require.NoError(t, err)
annotations := map[string]string{}
checkpoint.ApplyRestorePodMetadata(labels, annotations, &checkpoint.CheckpointInfo{
Enabled: true,
Ready: true,
Hash: "resolved-hash",
})
assert.Equal(t, commonconsts.KubeLabelValueTrue, labels[commonconsts.KubeLabelIsRestoreTarget])
assert.Equal(t, "resolved-hash", labels[commonconsts.KubeLabelCheckpointHash])
}
func TestGenerateLabels_ReassertsRestoreIdentityLabelsAfterMetadataMerge(t *testing.T) {
labels, err := generateLabels(
&v1alpha1.DynamoComponentDeploymentSharedSpec{
ComponentType: commonconsts.ComponentTypeWorker,
DynamoNamespace: ptr.To("default-test-dgd"),
Labels: map[string]string{
commonconsts.KubeLabelDynamoNamespace: "wrong-from-labels",
commonconsts.KubeLabelDynamoComponentType: commonconsts.ComponentTypeFrontend,
commonconsts.KubeLabelDynamoGraphDeploymentName: "wrong-from-labels",
commonconsts.KubeLabelDynamoWorkerHash: "workerhash",
},
ExtraPodMetadata: &v1alpha1.ExtraPodMetadata{
Labels: map[string]string{
commonconsts.KubeLabelDynamoNamespace: "wrong-from-extra-metadata",
commonconsts.KubeLabelDynamoComponentType: commonconsts.ComponentTypePlanner,
commonconsts.KubeLabelDynamoGraphDeploymentName: "wrong-from-extra-metadata",
commonconsts.KubeLabelDynamoWorkerHash: "wrong-from-extra-metadata",
},
},
},
&v1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{Name: "test-dgd"},
},
"Worker",
)
require.NoError(t, err)
assert.Equal(t, "default-test-dgd", labels[commonconsts.KubeLabelDynamoNamespace])
assert.Equal(t, commonconsts.ComponentTypeWorker, labels[commonconsts.KubeLabelDynamoComponentType])
assert.Equal(t, "test-dgd", labels[commonconsts.KubeLabelDynamoGraphDeploymentName])
assert.Equal(t, "workerhash", labels[commonconsts.KubeLabelDynamoWorkerHash])
}
func TestIsWorkerComponent(t *testing.T) {
workers := []string{commonconsts.ComponentTypeWorker, commonconsts.ComponentTypePrefill, commonconsts.ComponentTypeDecode}
nonWorkers := []string{commonconsts.ComponentTypeFrontend, commonconsts.ComponentTypePlanner, commonconsts.ComponentTypeEPP, "custom", ""}
......
/*
* SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dynamo
import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
v1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
commonconsts "github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
)
func buildSharedMemoryVolumeAndMount(spec *v1alpha1.SharedMemorySpec) (*corev1.Volume, *corev1.VolumeMount) {
size := resource.MustParse(commonconsts.DefaultSharedMemorySize)
if spec != nil {
if spec.Disabled {
return nil, nil
}
if !spec.Size.IsZero() {
size = spec.Size
}
}
volume := &corev1.Volume{
Name: commonconsts.KubeValueNameSharedMemory,
VolumeSource: corev1.VolumeSource{
EmptyDir: &corev1.EmptyDirVolumeSource{
Medium: corev1.StorageMediumMemory,
SizeLimit: &size,
},
},
}
volumeMount := &corev1.VolumeMount{
Name: commonconsts.KubeValueNameSharedMemory,
MountPath: commonconsts.DefaultSharedMemoryMountPath,
}
return volume, volumeMount
}
func ApplySharedMemoryVolumeAndMount(podSpec *corev1.PodSpec, mainContainer *corev1.Container, spec *v1alpha1.SharedMemorySpec) {
volume, volumeMount := buildSharedMemoryVolumeAndMount(spec)
if volume == nil || volumeMount == nil {
return
}
volumes := make([]corev1.Volume, 0, len(podSpec.Volumes)+1)
for _, existingVolume := range podSpec.Volumes {
if existingVolume.Name != volume.Name {
volumes = append(volumes, existingVolume)
}
}
podSpec.Volumes = append(volumes, *volume)
mounts := make([]corev1.VolumeMount, 0, len(mainContainer.VolumeMounts)+1)
for _, existingMount := range mainContainer.VolumeMounts {
if existingMount.Name != volumeMount.Name && existingMount.MountPath != volumeMount.MountPath {
mounts = append(mounts, existingMount)
}
}
mainContainer.VolumeMounts = append(mounts, *volumeMount)
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment