dgd_integration.go 15.7 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/*
 * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
 * SPDX-License-Identifier: Apache-2.0
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package checkpoint

import (
	"context"
	"fmt"

24
	configv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/config/v1alpha1"
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
	nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
	"github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
	corev1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/types"
	"k8s.io/utils/ptr"
	"sigs.k8s.io/controller-runtime/pkg/client"
)

// getCheckpointInfoFromCheckpoint extracts CheckpointInfo from a DynamoCheckpoint CR
func getCheckpointInfoFromCheckpoint(ckpt *nvidiacomv1alpha1.DynamoCheckpoint) *CheckpointInfo {
	info := &CheckpointInfo{
		Enabled:        true,
		CheckpointName: ckpt.Name,
		Hash:           ckpt.Status.IdentityHash,
		Location:       ckpt.Status.Location,
		StorageType:    ckpt.Status.StorageType,
		Ready:          ckpt.Status.Phase == nvidiacomv1alpha1.DynamoCheckpointPhaseReady,
		Identity:       &ckpt.Spec.Identity,
	}

	return info
}

48
// getPVCBasePath returns the PVC base path from storage config.
49
// Only applicable for PVC storage type
50
func getPVCBasePath(storageConfig *configv1alpha1.CheckpointStorageConfiguration) string {
51
52
53
	if storageConfig != nil && storageConfig.PVC.BasePath != "" {
		return storageConfig.PVC.BasePath
	}
54
	return ""
55
56
}

57
58
// GetPVCBasePath returns the configured PVC base path from controller config.
// This is used by both CheckpointReconciler and DynamoGraphDeploymentReconciler.
59
// Only applicable for PVC storage type.
60
func GetPVCBasePath(config *configv1alpha1.CheckpointConfiguration) string {
61
	if config != nil {
62
63
		return getPVCBasePath(&config.Storage)
	}
64
	return ""
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
}

// CheckpointInfo contains resolved checkpoint information for a DGD service
type CheckpointInfo struct {
	// Enabled indicates if checkpointing is enabled
	Enabled bool
	// Identity is the resolved checkpoint identity (model, framework, etc.)
	Identity *nvidiacomv1alpha1.DynamoCheckpointIdentity
	// Hash is the computed identity hash
	Hash string
	// Location is the full URI/path in the storage backend
	Location string
	// StorageType is the storage backend type (pvc, s3, oci)
	StorageType nvidiacomv1alpha1.DynamoCheckpointStorageType
	// CheckpointName is the name of the Checkpoint CR
	CheckpointName string
	// Ready indicates if the checkpoint is ready for use
	Ready bool
}

// ResolveCheckpointForService resolves checkpoint information for a DGD service.
// It handles both checkpointRef (direct reference) and identity-based lookup.
// Returns CheckpointInfo with the resolved identity populated.
func ResolveCheckpointForService(
	ctx context.Context,
	c client.Client,
	namespace string,
	config *nvidiacomv1alpha1.ServiceCheckpointConfig,
) (*CheckpointInfo, error) {
	if config == nil || !config.Enabled {
		return &CheckpointInfo{Enabled: false}, nil
	}

	// If a direct checkpoint reference is provided, use it
	if config.CheckpointRef != nil && *config.CheckpointRef != "" {
		ckpt := &nvidiacomv1alpha1.DynamoCheckpoint{}
		err := c.Get(ctx, types.NamespacedName{
			Namespace: namespace,
			Name:      *config.CheckpointRef,
		}, ckpt)
		if err != nil {
			return nil, fmt.Errorf("failed to get referenced checkpoint %s: %w", *config.CheckpointRef, err)
		}

		// Extract all checkpoint info including identity from the CR
		return getCheckpointInfoFromCheckpoint(ckpt), nil
	}

	// Otherwise, compute hash from identity and look up checkpoint
	if config.Identity == nil {
		return nil, fmt.Errorf("checkpoint enabled but no checkpointRef or identity provided")
	}

	hash, err := ComputeIdentityHash(*config.Identity)
	if err != nil {
		return nil, fmt.Errorf("failed to compute identity hash: %w", err)
	}

	info := &CheckpointInfo{
		Enabled:  true,
		Identity: config.Identity,
		Hash:     hash,
	}

	// Look for existing checkpoint with matching hash using label selector
	checkpointList := &nvidiacomv1alpha1.DynamoCheckpointList{}
	if err = c.List(ctx, checkpointList,
		client.InNamespace(namespace),
		client.MatchingLabels{consts.KubeLabelCheckpointHash: info.Hash},
	); err != nil {
		return nil, fmt.Errorf("failed to list checkpoints: %w", err)
	}

	// Return the first matching checkpoint (there should be at most one per hash)
	if len(checkpointList.Items) > 0 {
		ckpt := &checkpointList.Items[0]
		// Merge checkpoint info from the CR (overrides the computed values)
		foundInfo := getCheckpointInfoFromCheckpoint(ckpt)
		// Keep the hash and identity we computed from the config
		foundInfo.Hash = info.Hash
		foundInfo.Identity = info.Identity
		return foundInfo, nil
	}

	// No existing checkpoint found
	// In Auto mode, the controller should create one
	return info, nil
}

154
// InjectCheckpointEnvVars adds checkpoint-related environment variables to a restored/DGD container.
155
// Sets PATH and HASH so the restored process knows its checkpoint identity.
156
// DYN_CHECKPOINT_LOCATION is reserved for future S3/OCI support.
157
func InjectCheckpointEnvVars(container *corev1.Container, info *CheckpointInfo, checkpointConfig *configv1alpha1.CheckpointConfiguration) {
158
159
160
161
	if !info.Enabled {
		return
	}

162
	var envVars []corev1.EnvVar
163

164
	// For PVC storage: inject base path so the restored process knows its checkpoint location.
165
	// For S3/OCI (future): inject DYN_CHECKPOINT_LOCATION directly.
166
	storageType := configv1alpha1.CheckpointStorageTypePVC
167
168
	if checkpointConfig != nil && checkpointConfig.Storage.Type != "" {
		storageType = checkpointConfig.Storage.Type
169
170
	}

171
	switch storageType {
172
	case configv1alpha1.CheckpointStorageTypePVC:
173
174
175
176
		basePath := ""
		if checkpointConfig != nil {
			basePath = getPVCBasePath(&checkpointConfig.Storage)
		}
177
178
179
180
		envVars = append(envVars, corev1.EnvVar{
			Name:  consts.EnvCheckpointPath,
			Value: basePath,
		})
181
182
183
184
185
186
187
188
	default:
		// S3/OCI: inject full location URI directly
		if info.Location != "" {
			envVars = append(envVars, corev1.EnvVar{
				Name:  consts.EnvCheckpointLocation,
				Value: info.Location,
			})
		}
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
	}

	if info.Hash != "" {
		envVars = append(envVars, corev1.EnvVar{
			Name:  consts.EnvCheckpointHash,
			Value: info.Hash,
		})
	}

	// Prepend checkpoint env vars to ensure they're available
	container.Env = append(envVars, container.Env...)
}

// InjectCheckpointVolume adds the checkpoint PVC volume to a pod spec
func InjectCheckpointVolume(podSpec *corev1.PodSpec, pvcName string) {
	// Check if volume already exists
	for _, v := range podSpec.Volumes {
		if v.Name == consts.CheckpointVolumeName {
			return
		}
	}

	podSpec.Volumes = append(podSpec.Volumes, corev1.Volume{
		Name: consts.CheckpointVolumeName,
		VolumeSource: corev1.VolumeSource{
			PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
				ClaimName: pvcName,
				ReadOnly:  false, // CRIU needs write access during restore
			},
		},
	})
}

// InjectCheckpointVolumeMount adds the checkpoint volume mount to a container
func InjectCheckpointVolumeMount(container *corev1.Container, basePath string) {
	// Check if mount already exists
	for _, m := range container.VolumeMounts {
		if m.Name == consts.CheckpointVolumeName {
			return
		}
	}

	container.VolumeMounts = append(container.VolumeMounts, corev1.VolumeMount{
		Name:      consts.CheckpointVolumeName,
		MountPath: basePath,
		ReadOnly:  false, // CRIU needs write access for restore.log and restore-criu.conf
	})
}

// InjectPodInfoVolume adds a Downward API volume for pod identity and DGD info.
// This is critical for CRIU checkpoint/restore scenarios where environment variables
// contain stale values from the checkpoint source pod. The Downward API files
// always reflect the current pod's identity and DGD configuration.
func InjectPodInfoVolume(podSpec *corev1.PodSpec) {
	// Check if volume already exists
	for _, v := range podSpec.Volumes {
		if v.Name == consts.PodInfoVolumeName {
			return
		}
	}

	podSpec.Volumes = append(podSpec.Volumes, corev1.Volume{
		Name: consts.PodInfoVolumeName,
		VolumeSource: corev1.VolumeSource{
			DownwardAPI: &corev1.DownwardAPIVolumeSource{
				Items: []corev1.DownwardAPIVolumeFile{
					// Pod identity fields
					{
						Path: "pod_name",
						FieldRef: &corev1.ObjectFieldSelector{
							FieldPath: consts.PodInfoFieldPodName,
						},
					},
					{
						Path: "pod_uid",
						FieldRef: &corev1.ObjectFieldSelector{
							FieldPath: consts.PodInfoFieldPodUID,
						},
					},
					{
						Path: "pod_namespace",
						FieldRef: &corev1.ObjectFieldSelector{
							FieldPath: consts.PodInfoFieldPodNamespace,
						},
					},
					// DGD info from annotations (for CRIU restore)
					{
						Path: consts.PodInfoFileDynNamespace,
						FieldRef: &corev1.ObjectFieldSelector{
							FieldPath: "metadata.annotations['" + consts.AnnotationDynNamespace + "']",
						},
					},
					{
						Path: consts.PodInfoFileDynComponent,
						FieldRef: &corev1.ObjectFieldSelector{
							FieldPath: "metadata.annotations['" + consts.AnnotationDynComponent + "']",
						},
					},
					{
						Path: consts.PodInfoFileDynParentDGDName,
						FieldRef: &corev1.ObjectFieldSelector{
							FieldPath: "metadata.annotations['" + consts.AnnotationDynParentDGDName + "']",
						},
					},
					{
						Path: consts.PodInfoFileDynParentDGDNS,
						FieldRef: &corev1.ObjectFieldSelector{
							FieldPath: "metadata.annotations['" + consts.AnnotationDynParentDGDNS + "']",
						},
					},
					{
						Path: consts.PodInfoFileDynDiscoveryBackend,
						FieldRef: &corev1.ObjectFieldSelector{
							FieldPath: "metadata.annotations['" + consts.AnnotationDynDiscoveryBackend + "']",
						},
					},
				},
			},
		},
	})
}

// InjectPodInfoVolumeMount adds the Downward API volume mount to a container.
func InjectPodInfoVolumeMount(container *corev1.Container) {
	// Check if mount already exists
	for _, m := range container.VolumeMounts {
		if m.Name == consts.PodInfoVolumeName {
			return
		}
	}

	container.VolumeMounts = append(container.VolumeMounts, corev1.VolumeMount{
		Name:      consts.PodInfoVolumeName,
		MountPath: consts.PodInfoMountPath,
		ReadOnly:  true,
	})
}

327
// InjectCheckpointIntoPodSpec injects checkpoint configuration into a pod spec for
328
// external restore via the snapshot DaemonSet. The pod image is expected to be a
329
330
331
332
333
334
335
336
// runtime-compatible restore image (runtime + CRIU tooling). For ready checkpoints,
// the operator overrides command to `sleep infinity` so the watcher can trigger
// external restore via nsenter + nsrestore.
//
// Modifications applied:
//  1. Security context - seccomp profile (io_uring blocking, matches checkpoint environment)
//  2. Environment variables - checkpoint path and hash
//  3. Storage configuration - checkpoint PVC and Downward API (pod identity)
337
//
338
339
// No hostIPC, no privileged mode — those are only needed when CRIU runs inside the
// container. With external restore, all privilege lives in the DaemonSet.
340
341
342
func InjectCheckpointIntoPodSpec(
	podSpec *corev1.PodSpec,
	checkpointInfo *CheckpointInfo,
343
	checkpointConfig *configv1alpha1.CheckpointConfiguration,
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
) error {
	if checkpointInfo == nil || !checkpointInfo.Enabled {
		return nil
	}

	info := checkpointInfo
	if info.Hash == "" {
		if info.Identity == nil {
			return fmt.Errorf("checkpoint enabled but identity is nil and hash is not set")
		}
		hash, err := ComputeIdentityHash(*info.Identity)
		if err != nil {
			return fmt.Errorf("failed to compute identity hash: %w", err)
		}
		info.Hash = hash
	}

361
	// Find the main container (needed for volume mounts and env vars)
362
363
364
365
366
367
368
369
370
371
372
373
374
375
	var mainContainer *corev1.Container
	for i := range podSpec.Containers {
		if podSpec.Containers[i].Name == consts.MainContainerName {
			mainContainer = &podSpec.Containers[i]
			break
		}
	}
	if mainContainer == nil && len(podSpec.Containers) > 0 {
		mainContainer = &podSpec.Containers[0]
	}
	if mainContainer == nil {
		return fmt.Errorf("no container found to inject checkpoint config")
	}

376
377
378
379
380
381
382
	// When a ready checkpoint exists, override the container command to sleep infinity.
	// The DaemonSet watcher detects this pod via the checkpoint-restore label and
	// performs external restore (nsenter + nsrestore). When no checkpoint is ready,
	// the original command runs (cold start).
	if info.Ready {
		mainContainer.Command = []string{"sleep", "infinity"}
		mainContainer.Args = nil
383
384
	}

385
	// Seccomp profile to match checkpoint environment (blocks io_uring syscalls)
386
387
388
389
390
	if podSpec.SecurityContext == nil {
		podSpec.SecurityContext = &corev1.PodSecurityContext{}
	}
	podSpec.SecurityContext.SeccompProfile = &corev1.SeccompProfile{
		Type:             corev1.SeccompProfileTypeLocalhost,
391
		LocalhostProfile: ptr.To(consts.SeccompProfilePath),
392
393
394
	}

	// Determine storage type and compute location/path
395
396
	storageType := configv1alpha1.CheckpointStorageTypePVC // default
	var storageConfig *configv1alpha1.CheckpointStorageConfiguration
397
398
399
400
401
402
403
404
	if checkpointConfig != nil {
		storageConfig = &checkpointConfig.Storage
		if storageConfig.Type != "" {
			storageType = storageConfig.Type
		}
	}

	switch storageType {
405
	case configv1alpha1.CheckpointStorageTypeS3:
406
		info.StorageType = nvidiacomv1alpha1.DynamoCheckpointStorageType(storageType)
407
408
		if storageConfig == nil || storageConfig.S3.URI == "" {
			return fmt.Errorf("S3 storage type selected but no S3 URI configured (set checkpoint.storage.s3.uri)")
409
		}
410
		info.Location = fmt.Sprintf("%s/%s.tar", storageConfig.S3.URI, info.Hash)
411

412
	case configv1alpha1.CheckpointStorageTypeOCI:
413
		info.StorageType = nvidiacomv1alpha1.DynamoCheckpointStorageType(storageType)
414
415
		if storageConfig == nil || storageConfig.OCI.URI == "" {
			return fmt.Errorf("OCI storage type selected but no OCI URI configured (set checkpoint.storage.oci.uri)")
416
		}
417
		info.Location = fmt.Sprintf("%s:%s", storageConfig.OCI.URI, info.Hash)
418

419
420
	default: // PVC
		info.StorageType = nvidiacomv1alpha1.DynamoCheckpointStorageType(storageType)
421
		basePath := getPVCBasePath(storageConfig)
422
423
424
425
426
427
		if storageConfig == nil || storageConfig.PVC.PVCName == "" {
			return fmt.Errorf("PVC storage type selected but no PVC name configured (set checkpoint.storage.pvc.pvcName)")
		}
		pvcName := storageConfig.PVC.PVCName
		if basePath == "" {
			return fmt.Errorf("PVC storage type selected but no PVC base path configured (set checkpoint.storage.pvc.basePath)")
428
429
430
431
432
433
434
		}
		info.Location = fmt.Sprintf("%s/%s", basePath, info.Hash)

		InjectCheckpointVolume(podSpec, pvcName)
		InjectCheckpointVolumeMount(mainContainer, basePath)
	}

435
	// Downward API volume for pod identity after CRIU restore
436
437
438
	InjectPodInfoVolume(podSpec)
	InjectPodInfoVolumeMount(mainContainer)

439
	// Checkpoint environment variables (path, hash)
440
441
442
443
444
	InjectCheckpointEnvVars(mainContainer, info, checkpointConfig)

	return nil
}

445
446
// InjectCheckpointLabelsFromConfig adds checkpoint identity labels to a label map based on config.
// Restore trigger labels are injected only when a concrete restore request is prepared.
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
func InjectCheckpointLabelsFromConfig(labels map[string]string, config *nvidiacomv1alpha1.ServiceCheckpointConfig) (map[string]string, error) {
	if config == nil || !config.Enabled {
		return labels, nil
	}

	if labels == nil {
		labels = make(map[string]string)
	}

	// Compute hash from identity if provided
	if config.Identity != nil {
		hash, err := ComputeIdentityHash(*config.Identity)
		if err != nil {
			return nil, fmt.Errorf("failed to compute identity hash for labels: %w", err)
		}
		labels[consts.KubeLabelCheckpointHash] = hash
	}

	return labels, nil
}