controller.go 21.7 KB
Newer Older
1
2
3
4
5
// Package controller implements the node-local control loop inside snapshot-agent.
// It does not own CRDs or replace the operator. Instead it watches pod, job, and
// lease state on the current node and delegates CRIU/CUDA execution to the
// snapshot executor workflows.
package controller
6
7
8
9
10
11

import (
	"context"
	"fmt"
	"os"
	"path/filepath"
12
	"strings"
13
	"sync"
14
	"syscall"
15
16
	"time"

17
18
	"github.com/containerd/containerd"
	"github.com/go-logr/logr"
19
20
	"github.com/google/uuid"
	batchv1 "k8s.io/api/batch/v1"
21
22
23
24
25
26
27
28
	corev1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/labels"
	"k8s.io/client-go/informers"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/cache"

29
	"github.com/ai-dynamo/dynamo/deploy/snapshot/pkg/common"
30
	"github.com/ai-dynamo/dynamo/deploy/snapshot/pkg/executor"
31
	"github.com/ai-dynamo/dynamo/deploy/snapshot/pkg/types"
32
33
)

34
const (
35
36
37
38
39
40
41
42
	kubeLabelIsCheckpointSource         = "nvidia.com/snapshot-is-checkpoint-source"
	kubeLabelCheckpointHash             = "nvidia.com/snapshot-checkpoint-hash"
	kubeLabelIsRestoreTarget            = "nvidia.com/snapshot-is-restore-target"
	kubeAnnotationCheckpointLocation    = "nvidia.com/snapshot-checkpoint-location"
	kubeAnnotationCheckpointStorageType = "nvidia.com/snapshot-checkpoint-storage-type"
	kubeAnnotationCheckpointStatus      = "nvidia.com/snapshot-checkpoint-status"
	kubeAnnotationRestoreStatus         = "nvidia.com/snapshot-restore-status"
	kubeAnnotationRestoreContainerID    = "nvidia.com/snapshot-restore-container-id"
43
)
44

45
46
47
// NodeController watches local-node pods with checkpoint metadata and reconciles
// snapshot execution for checkpoint and restore requests.
type NodeController struct {
48
49
50
51
	config     *types.AgentConfig
	clientset  kubernetes.Interface
	containerd *containerd.Client
	log        logr.Logger
52
	holderID   string
53

54
55
	inFlight   map[string]struct{}
	inFlightMu sync.Mutex
56
57
58
59

	stopCh chan struct{}
}

60
61
// NewNodeController creates the node-local controller that runs inside snapshot-agent.
func NewNodeController(
62
63
64
	cfg *types.AgentConfig,
	containerd *containerd.Client,
	log logr.Logger,
65
) (*NodeController, error) {
66
67
68
69
70
71
72
73
74
75
	restConfig, err := rest.InClusterConfig()
	if err != nil {
		return nil, fmt.Errorf("failed to get in-cluster config: %w", err)
	}

	clientset, err := kubernetes.NewForConfig(restConfig)
	if err != nil {
		return nil, fmt.Errorf("failed to create kubernetes client: %w", err)
	}

76
	return &NodeController{
77
78
79
80
		config:     cfg,
		clientset:  clientset,
		containerd: containerd,
		log:        log,
81
		holderID:   "snapshot-agent/" + uuid.NewString(),
82
83
		inFlight:   make(map[string]struct{}),
		stopCh:     make(chan struct{}),
84
85
86
	}, nil
}

87
88
89
// Run starts the local pod informers and processes checkpoint/restore events.
func (w *NodeController) Run(ctx context.Context) error {
	w.log.Info("Starting snapshot node controller",
90
91
92
93
		"node", w.config.NodeName,
		"checkpoint", kubeLabelIsCheckpointSource,
		"restore", kubeLabelIsRestoreTarget,
	)
94

95
96
97
98
99
100
	var nsOptions []informers.SharedInformerOption
	if w.config.RestrictedNamespace != "" {
		w.log.Info("Restricting pod watching to namespace", "namespace", w.config.RestrictedNamespace)
		nsOptions = append(nsOptions, informers.WithNamespace(w.config.RestrictedNamespace))
	} else {
		w.log.Info("Watching pods cluster-wide (all namespaces)")
101
102
	}

103
104
105
106
107
	var syncFuncs []cache.InformerSynced

	// Checkpoint informer
	checkpointSelector := labels.SelectorFromSet(labels.Set{
		kubeLabelIsCheckpointSource: "true",
108
109
	}).String()

110
	ckptFactoryOpts := append([]informers.SharedInformerOption{
111
		informers.WithTweakListOptions(func(opts *metav1.ListOptions) {
112
			opts.LabelSelector = checkpointSelector
113
		}),
114
	}, nsOptions...)
115

116
117
	ckptFactory := informers.NewSharedInformerFactoryWithOptions(
		w.clientset, 30*time.Second, ckptFactoryOpts...,
118
119
	)

120
	ckptInformer := ckptFactory.Core().V1().Pods().Informer()
121
	if _, err := ckptInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
122
		AddFunc: func(obj interface{}) {
123
124
125
126
			pod, ok := podFromInformerObj(obj)
			if !ok {
				return
			}
127
			w.reconcileCheckpointPod(ctx, pod)
128
		},
129
130
131
132
133
		UpdateFunc: func(_, newObj interface{}) {
			pod, ok := podFromInformerObj(newObj)
			if !ok {
				return
			}
134
			w.reconcileCheckpointPod(ctx, pod)
135
		},
136
137
138
	}); err != nil {
		return fmt.Errorf("failed to add checkpoint informer handler: %w", err)
	}
139
140
	go ckptFactory.Start(w.stopCh)
	syncFuncs = append(syncFuncs, ckptInformer.HasSynced)
141

142
143
144
145
	// Restore informer
	restoreSelector := labels.SelectorFromSet(labels.Set{
		kubeLabelIsRestoreTarget: "true",
	}).String()
146

147
148
149
150
151
	restoreFactoryOpts := append([]informers.SharedInformerOption{
		informers.WithTweakListOptions(func(opts *metav1.ListOptions) {
			opts.LabelSelector = restoreSelector
		}),
	}, nsOptions...)
152

153
154
155
	restoreFactory := informers.NewSharedInformerFactoryWithOptions(
		w.clientset, 30*time.Second, restoreFactoryOpts...,
	)
156

157
	restoreInformer := restoreFactory.Core().V1().Pods().Informer()
158
	if _, err := restoreInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
159
160
161
162
163
		AddFunc: func(obj interface{}) {
			pod, ok := podFromInformerObj(obj)
			if !ok {
				return
			}
164
			w.reconcileRestorePod(ctx, pod)
165
166
167
168
169
170
		},
		UpdateFunc: func(_, newObj interface{}) {
			pod, ok := podFromInformerObj(newObj)
			if !ok {
				return
			}
171
			w.reconcileRestorePod(ctx, pod)
172
		},
173
174
175
	}); err != nil {
		return fmt.Errorf("failed to add restore informer handler: %w", err)
	}
176
177
	go restoreFactory.Start(w.stopCh)
	syncFuncs = append(syncFuncs, restoreInformer.HasSynced)
178

179
180
	if !cache.WaitForCacheSync(w.stopCh, syncFuncs...) {
		return fmt.Errorf("failed to sync informer caches")
181
182
	}

183
	w.log.Info("Snapshot node controller started and caches synced")
184
	<-ctx.Done()
185
	close(w.stopCh)
186
	return nil
187
188
}

189
func (w *NodeController) reconcileCheckpointPod(ctx context.Context, pod *corev1.Pod) {
190
191
192
	if pod.Spec.NodeName != w.config.NodeName {
		return
	}
193
	if !isPodReady(pod) {
194
195
196
197
198
		return
	}

	podKey := fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)

199
200
201
	checkpointHash, ok := pod.Labels[kubeLabelCheckpointHash]
	if !ok || checkpointHash == "" {
		w.log.Info("Pod has checkpoint label but no checkpoint-hash label", "pod", podKey)
202
203
204
		return
	}

205
206
207
208
209
210
211
212
	job, err := getCheckpointJob(ctx, w.clientset, pod)
	if err != nil {
		w.log.Error(err, "Failed to resolve checkpoint job", "pod", podKey)
		return
	}

	jobStatus := job.Annotations[kubeAnnotationCheckpointStatus]
	if jobStatus == "completed" || jobStatus == "failed" {
213
214
215
		return
	}

216
217
	if !w.tryAcquire(podKey) {
		return
218
219
	}

220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
	checkpointLocation, checkpointStorageType, err := checkpointStorageFromPod(pod)
	if err != nil {
		w.release(podKey)
		w.log.Error(err, "Checkpoint pod is missing storage metadata", "pod", podKey, "checkpoint_hash", checkpointHash)
		return
	}

	acquiredLease, err := acquireCheckpointLease(ctx, w.clientset, w.log, job, w.holderID)
	if err != nil {
		w.release(podKey)
		w.log.Error(err, "Failed to acquire checkpoint lease", "pod", podKey, "checkpoint_hash", checkpointHash)
		return
	}
	if !acquiredLease {
		w.release(podKey)
		return
	}

238
	w.log.Info("Pod ready, triggering checkpoint", "pod", podKey, "checkpoint_hash", checkpointHash)
239
	emitPodEvent(ctx, w.clientset, w.log, pod, "snapshot", corev1.EventTypeNormal, "CheckpointRequested", fmt.Sprintf("Checkpoint requested: %s", checkpointHash))
240

241
	go func() {
242
		if err := w.runCheckpoint(ctx, pod, job, checkpointHash, checkpointLocation, checkpointStorageType, podKey); err != nil {
243
			opLog := w.log.WithValues("pod", podKey, "checkpoint_hash", checkpointHash)
244
			opLog.Error(err, "Checkpoint controller worker failed")
245
246
247
			emitPodEvent(ctx, w.clientset, opLog, pod, "snapshot", corev1.EventTypeWarning, "CheckpointWorkerFailed", err.Error())
		}
	}()
248
249
}

250
func (w *NodeController) reconcileRestorePod(ctx context.Context, pod *corev1.Pod) {
251
252
	if pod.Spec.NodeName != w.config.NodeName {
		return
253
254
	}

255
	podKey := fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)
256

257
	if pod.Status.Phase != corev1.PodRunning {
258
259
260
		return
	}

261
262
	if isPodReady(pod) {
		return
263
264
	}

265
266
267
	checkpointHash, ok := pod.Labels[kubeLabelCheckpointHash]
	if !ok || checkpointHash == "" {
		w.log.Info("Restore pod has no checkpoint-hash label", "pod", podKey)
268
269
270
		return
	}

271
272
	if strings.ContainsAny(checkpointHash, "/\\") || strings.Contains(checkpointHash, "..") || filepath.Clean(checkpointHash) != checkpointHash {
		w.log.Error(fmt.Errorf("invalid checkpoint hash %q", checkpointHash), "Invalid checkpoint hash on restore pod", "pod", podKey)
273
274
275
		return
	}

276
277
278
279
280
281
282
	checkpointLocation, checkpointStorageType, err := checkpointStorageFromPod(pod)
	if err != nil {
		w.log.Error(err, "Restore pod is missing storage metadata", "pod", podKey, "checkpoint_hash", checkpointHash)
		return
	}
	if _, err := os.Stat(checkpointLocation); os.IsNotExist(err) {
		w.log.V(1).Info("Checkpoint not ready on disk, skipping restore", "pod", podKey, "checkpoint_hash", checkpointHash, "checkpoint_location", checkpointLocation)
283
		return
284
285
	}

286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
	containerName := resolveMainContainerName(pod)
	if containerName == "" {
		w.log.Info("Restore pod has no containers", "pod", podKey)
		return
	}

	containerID := ""
	for _, cs := range pod.Status.ContainerStatuses {
		if cs.Name != containerName || cs.ContainerID == "" {
			continue
		}
		containerID = strings.TrimPrefix(cs.ContainerID, "containerd://")
		break
	}
	if containerID == "" {
		w.log.V(1).Info("Restore pod has no running main container yet", "pod", podKey, "container", containerName)
		return
	}

	annotationStatus := pod.Annotations[kubeAnnotationRestoreStatus]
	annotationContainerID := pod.Annotations[kubeAnnotationRestoreContainerID]
	if annotationContainerID == containerID && (annotationStatus == "completed" || annotationStatus == "in_progress") {
		return
	}

	restoreAttemptKey := fmt.Sprintf("%s/%s", podKey, containerID)
	if !w.tryAcquire(restoreAttemptKey) {
313
314
315
		return
	}

316
	w.log.Info("Restore pod running, triggering external restore", "pod", podKey, "checkpoint_hash", checkpointHash)
317
	emitPodEvent(ctx, w.clientset, w.log, pod, "snapshot", corev1.EventTypeNormal, "RestoreRequested", fmt.Sprintf("Restore requested from checkpoint %s", checkpointHash))
318

319
	go func() {
320
		if err := w.runRestore(ctx, pod, containerName, containerID, checkpointHash, checkpointLocation, checkpointStorageType, restoreAttemptKey); err != nil {
321
			opLog := w.log.WithValues("pod", podKey, "checkpoint_hash", checkpointHash)
322
			opLog.Error(err, "Restore controller worker failed")
323
324
325
			emitPodEvent(ctx, w.clientset, opLog, pod, "snapshot", corev1.EventTypeWarning, "RestoreWorkerFailed", err.Error())
		}
	}()
326
}
327

328
329
// runCheckpoint runs the full checkpoint workflow for a pod:
//  1. Hold and renew the checkpoint lease
330
//  2. Resolve the container ID and host PID
331
//  3. Call executor.Checkpoint (inspect → configure → CUDA lock/checkpoint → CRIU dump → rootfs diff)
332
//  4. SIGUSR1 the process on success (notify workload), SIGKILL on failure (terminate immediately)
333
334
335
//  5. Mark job as completed or failed
func (w *NodeController) runCheckpoint(ctx context.Context, pod *corev1.Pod, job *batchv1.Job, checkpointHash, checkpointLocation, checkpointStorageType, podKey string) error {
	releasePodOnExit := true
336
	defer func() {
337
		if releasePodOnExit {
338
339
340
			w.release(podKey)
		}
	}()
341
	log := w.log.WithValues("pod", podKey, "checkpoint_hash", checkpointHash)
342
343
	leaseCtx, stopLease := context.WithCancelCause(ctx)
	defer stopLease(nil)
344

345
346
347
348
	releaseLeaseOnExit := true
	defer func() {
		if !releaseLeaseOnExit {
			return
349
		}
350
351
352
353
354
355
		releaseCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
		defer cancel()
		if err := releaseCheckpointLease(releaseCtx, w.clientset, log, job, w.holderID); err != nil {
			log.Error(err, "Failed to release checkpoint lease")
		}
	}()
356

357
358
359
360
361
362
363
364
365
	go w.renewCheckpointLease(leaseCtx, log, job, stopLease)

	setCheckpointStatus := func(value string) error {
		if err := annotateJob(ctx, w.clientset, log, job, map[string]string{
			kubeAnnotationCheckpointStatus: value,
		}); err != nil {
			releasePodOnExit = false
			releaseLeaseOnExit = false
			return fmt.Errorf("failed to persist terminal checkpoint status %q: %w", value, err)
366
367
368
		}
		return nil
	}
369
370
371
372
373
374

	// Resolve the target container
	containerName := resolveMainContainerName(pod)
	if containerName == "" {
		err := fmt.Errorf("no containers found in pod spec")
		log.Error(err, "Checkpoint failed")
375
		emitPodEvent(ctx, w.clientset, log, pod, "snapshot", corev1.EventTypeWarning, "CheckpointFailed", err.Error())
376
377
378
379
		if statusErr := setCheckpointStatus("failed"); statusErr != nil {
			return statusErr
		}
		return nil
380
381
382
383
384
385
386
387
388
	}
	var containerID string
	for _, cs := range pod.Status.ContainerStatuses {
		if cs.Name == containerName {
			containerID = strings.TrimPrefix(cs.ContainerID, "containerd://")
			break
		}
	}
	if containerID == "" {
389
		emitPodEvent(ctx, w.clientset, log, pod, "snapshot", corev1.EventTypeWarning, "CheckpointFailed", "Could not resolve target container ID")
390
391
392
393
		if statusErr := setCheckpointStatus("failed"); statusErr != nil {
			return statusErr
		}
		return nil
394
395
	}

396
397
	// Resolve the container's host PID (needed for signaling after checkpoint)
	containerPID, _, err := common.ResolveContainer(ctx, w.containerd, containerID)
398
	if err != nil {
399
		log.Error(err, "Failed to resolve container")
400
		emitPodEvent(ctx, w.clientset, log, pod, "snapshot", corev1.EventTypeWarning, "CheckpointFailed", fmt.Sprintf("Container resolve failed: %v", err))
401
402
403
404
		if statusErr := setCheckpointStatus("failed"); statusErr != nil {
			return statusErr
		}
		return nil
405
406
	}

407
	// Step 1: Run the checkpoint orchestrator
408
409
410
411
412
413
414
415
416
417
418
419
420
421
	req := executor.CheckpointRequest{
		ContainerID:           containerID,
		ContainerName:         containerName,
		CheckpointHash:        checkpointHash,
		CheckpointLocation:    checkpointLocation,
		CheckpointStorageType: checkpointStorageType,
		NodeName:              w.config.NodeName,
		PodName:               pod.Name,
		PodNamespace:          pod.Namespace,
	}
	if err := executor.Checkpoint(leaseCtx, w.containerd, log, req, w.config); err != nil {
		if cause := context.Cause(leaseCtx); cause != nil && cause != context.Canceled {
			err = fmt.Errorf("checkpoint lease lost: %w", cause)
		}
422
		log.Error(err, "Checkpoint failed")
423
		emitPodEvent(ctx, w.clientset, log, pod, "snapshot", corev1.EventTypeWarning, "CheckpointFailed", err.Error())
424
425
		// SIGKILL on failure: process is unrecoverable (CUDA locked), terminate immediately
		if signalErr := common.SendSignalToPID(log, containerPID, syscall.SIGKILL, "checkpoint failed"); signalErr != nil {
426
427
			log.Error(signalErr, "Failed to signal checkpoint failure to runtime process")
		}
428
429
430
431
		if statusErr := setCheckpointStatus("failed"); statusErr != nil {
			return statusErr
		}
		return nil
432
433
	}

434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
	info, err := os.Stat(checkpointLocation)
	if err != nil || !info.IsDir() {
		if err == nil {
			err = fmt.Errorf("published checkpoint path %s is not a directory", checkpointLocation)
		} else {
			err = fmt.Errorf("published checkpoint path %s is missing: %w", checkpointLocation, err)
		}
		log.Error(err, "Checkpoint failed verification")
		emitPodEvent(ctx, w.clientset, log, pod, "snapshot", corev1.EventTypeWarning, "CheckpointFailed", err.Error())
		if signalErr := common.SendSignalToPID(log, containerPID, syscall.SIGKILL, "checkpoint verification failed"); signalErr != nil {
			log.Error(signalErr, "Failed to signal checkpoint verification failure to runtime process")
		}
		if statusErr := setCheckpointStatus("failed"); statusErr != nil {
			return statusErr
		}
		return nil
	}

452
	// Step 2: SIGUSR1 on success: notify the workload that checkpoint completed
453
	emitPodEvent(ctx, w.clientset, log, pod, "snapshot", corev1.EventTypeNormal, "CheckpointSucceeded", fmt.Sprintf("Checkpoint completed: %s", checkpointHash))
454
455
	if err := common.SendSignalToPID(log, containerPID, syscall.SIGUSR1, "checkpoint complete"); err != nil {
		log.Error(err, "Failed to signal checkpoint completion to runtime process")
456
		emitPodEvent(ctx, w.clientset, log, pod, "snapshot", corev1.EventTypeWarning, "CheckpointFailed", err.Error())
457
458
459
460
		if statusErr := setCheckpointStatus("failed"); statusErr != nil {
			return statusErr
		}
		return nil
461
462
	}

463
464
465
466
	if err := setCheckpointStatus("completed"); err != nil {
		return err
	}
	return nil
467
468
}

469
470
471
// runRestore runs the full restore workflow for a pod:
//  1. Mark the current container instance as in_progress
//  2. Call executor.Restore (inspect placeholder → nsrestore inside namespace)
472
473
//  3. SIGCONT the restored process to wake it up
//  4. Wait for the pod to become Ready
474
475
//  5. Mark the container instance as completed
func (w *NodeController) runRestore(ctx context.Context, pod *corev1.Pod, containerName, containerID, checkpointHash, checkpointLocation, checkpointStorageType, restoreAttemptKey string) error {
476
477
478
	releaseOnExit := true
	defer func() {
		if releaseOnExit {
479
			w.release(restoreAttemptKey)
480
481
		}
	}()
482
483
	podKey := fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)
	log := w.log.WithValues("pod", podKey, "checkpoint_hash", checkpointHash, "container_id", containerID)
484
485
	setRestoreStatus := func(value string) error {
		annotations := map[string]string{
486
487
			kubeAnnotationRestoreStatus:      value,
			kubeAnnotationRestoreContainerID: containerID,
488
		}
489
490
		if err := annotatePod(ctx, w.clientset, log, pod, annotations); err != nil {
			if value == "completed" {
491
492
493
494
495
496
497
				releaseOnExit = false
				return fmt.Errorf("failed to persist terminal restore status %q: %w", value, err)
			}
			return fmt.Errorf("failed to update restore status %q: %w", value, err)
		}
		return nil
	}
498
499

	if err := annotatePod(ctx, w.clientset, log, pod, map[string]string{
500
501
		kubeAnnotationRestoreStatus:      "in_progress",
		kubeAnnotationRestoreContainerID: containerID,
502
	}); err != nil {
503
		return fmt.Errorf("failed to annotate pod with restore in_progress: %w", err)
504
	}
505

506
	// Step 1: Run the restore orchestrator (inspect + nsrestore)
507
508
509
510
511
512
513
514
515
516
	req := executor.RestoreRequest{
		CheckpointHash:        checkpointHash,
		CheckpointLocation:    checkpointLocation,
		CheckpointStorageType: checkpointStorageType,
		NSRestorePath:         w.config.Restore.NSRestorePath,
		PodName:               pod.Name,
		PodNamespace:          pod.Namespace,
		ContainerName:         containerName,
	}
	restoredPID, err := executor.Restore(ctx, w.containerd, log, req)
517
518
	if err != nil {
		log.Error(err, "External restore failed")
519
		emitPodEvent(ctx, w.clientset, log, pod, "snapshot", corev1.EventTypeWarning, "RestoreFailed", err.Error())
520
521
522
523
524
525
526
527
		placeholderHostPID, _, pidErr := common.ResolveContainerByPod(ctx, w.containerd, pod.Name, pod.Namespace, containerName)
		if pidErr != nil {
			releaseOnExit = false
			return fmt.Errorf("restore failed and placeholder PID could not be resolved: %w", pidErr)
		}
		if killErr := common.SendSignalToPID(log, placeholderHostPID, syscall.SIGKILL, "restore failed"); killErr != nil {
			releaseOnExit = false
			return fmt.Errorf("restore failed and placeholder could not be killed: %w", killErr)
528
529
		}
		return nil
530
531
	}

532
533
	// Step 2: SIGCONT the restored process via PID namespace
	placeholderHostPID, _, err := common.ResolveContainerByPod(ctx, w.containerd, pod.Name, pod.Namespace, containerName)
534
	if err != nil {
535
		log.Error(err, "Failed to resolve placeholder host PID for signaling")
536
		emitPodEvent(ctx, w.clientset, log, pod, "snapshot", corev1.EventTypeWarning, "RestoreFailed", err.Error())
537
538
		releaseOnExit = false
		return fmt.Errorf("failed to resolve placeholder host PID for signaling: %w", err)
539
540
541
	}
	if err := common.SendSignalViaPIDNamespace(ctx, log, placeholderHostPID, restoredPID, syscall.SIGCONT, "restore complete"); err != nil {
		log.Error(err, "Failed to signal restored runtime process")
542
		emitPodEvent(ctx, w.clientset, log, pod, "snapshot", corev1.EventTypeWarning, "RestoreFailed", err.Error())
543
544
		if killErr := common.SendSignalToPID(log, placeholderHostPID, syscall.SIGKILL, "restore signaling failed"); killErr != nil {
			log.Error(killErr, "Failed to kill placeholder after restore signaling failure")
545
		}
546
547
		releaseOnExit = false
		return fmt.Errorf("failed to signal restored runtime process: %w", err)
548
549
	}

550
551
552
553
554
555
556
557
558
	// Step 3: Wait for the pod to become Ready
	readyCtx := ctx
	if timeout := w.config.Restore.RestoreReadyTimeout(); timeout > 0 {
		var cancel context.CancelFunc
		readyCtx, cancel = context.WithTimeout(ctx, timeout)
		defer cancel()
	}
	if err := waitForPodReady(readyCtx, w.clientset, pod.Namespace, pod.Name, containerName); err != nil {
		log.Error(err, "Restore post-signal readiness check failed")
559
		emitPodEvent(ctx, w.clientset, log, pod, "snapshot", corev1.EventTypeWarning, "RestoreFailed", err.Error())
560
561
		if killErr := common.SendSignalToPID(log, placeholderHostPID, syscall.SIGKILL, "restore readiness failed"); killErr != nil {
			log.Error(killErr, "Failed to kill placeholder after restore readiness failure")
562
		}
563
564
		releaseOnExit = false
		return fmt.Errorf("restore post-signal readiness check failed: %w", err)
565
566
	}

567
	emitPodEvent(ctx, w.clientset, log, pod, "snapshot", corev1.EventTypeNormal, "RestoreSucceeded", fmt.Sprintf("Restore completed from checkpoint %s", checkpointHash))
568
569
570
571
	if err := setRestoreStatus("completed"); err != nil {
		return err
	}
	return nil
572
573
}

574
func (w *NodeController) tryAcquire(podKey string) bool {
575
576
577
578
579
580
581
582
583
	w.inFlightMu.Lock()
	defer w.inFlightMu.Unlock()
	if _, held := w.inFlight[podKey]; held {
		return false
	}
	w.inFlight[podKey] = struct{}{}
	return true
}

584
func (w *NodeController) release(podKey string) {
585
586
587
	w.inFlightMu.Lock()
	defer w.inFlightMu.Unlock()
	delete(w.inFlight, podKey)
588
}
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605

func checkpointStorageFromPod(pod *corev1.Pod) (string, string, error) {
	checkpointLocation := strings.TrimSpace(pod.Annotations[kubeAnnotationCheckpointLocation])
	if checkpointLocation == "" {
		return "", "", fmt.Errorf("missing %s annotation", kubeAnnotationCheckpointLocation)
	}

	checkpointStorageType := strings.TrimSpace(pod.Annotations[kubeAnnotationCheckpointStorageType])
	if checkpointStorageType == "" {
		return "", "", fmt.Errorf("missing %s annotation", kubeAnnotationCheckpointStorageType)
	}
	if checkpointStorageType != "pvc" {
		return "", "", fmt.Errorf("checkpoint storage type %q is not supported", checkpointStorageType)
	}

	return checkpointLocation, checkpointStorageType, nil
}