controller.go 21.3 KB
Newer Older
1
2
3
4
5
// Package controller implements the node-local control loop inside snapshot-agent.
// It does not own CRDs or replace the operator. Instead it watches pod, job, and
// lease state on the current node and delegates CRIU/CUDA execution to the
// snapshot executor workflows.
package controller
6
7
8
9
10
11

import (
	"context"
	"fmt"
	"os"
	"path/filepath"
12
	"strings"
13
	"sync"
14
	"syscall"
15
16
	"time"

17
18
	"github.com/containerd/containerd"
	"github.com/go-logr/logr"
19
20
	"github.com/google/uuid"
	batchv1 "k8s.io/api/batch/v1"
21
22
23
24
25
26
27
28
	corev1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/labels"
	"k8s.io/client-go/informers"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/cache"

29
30
31
32
	"github.com/ai-dynamo/dynamo/deploy/snapshot/internal/executor"
	snapshotruntime "github.com/ai-dynamo/dynamo/deploy/snapshot/internal/runtime"
	"github.com/ai-dynamo/dynamo/deploy/snapshot/internal/types"
	snapshotprotocol "github.com/ai-dynamo/dynamo/deploy/snapshot/protocol"
33
)
34

35
36
37
// NodeController watches local-node pods with checkpoint metadata and reconciles
// snapshot execution for checkpoint and restore requests.
type NodeController struct {
38
39
40
41
	config     *types.AgentConfig
	clientset  kubernetes.Interface
	containerd *containerd.Client
	log        logr.Logger
42
	holderID   string
43

44
45
	inFlight   map[string]struct{}
	inFlightMu sync.Mutex
46
47
48
49

	stopCh chan struct{}
}

50
51
// NewNodeController creates the node-local controller that runs inside snapshot-agent.
func NewNodeController(
52
53
54
	cfg *types.AgentConfig,
	containerd *containerd.Client,
	log logr.Logger,
55
) (*NodeController, error) {
56
57
58
59
60
61
62
63
64
65
	restConfig, err := rest.InClusterConfig()
	if err != nil {
		return nil, fmt.Errorf("failed to get in-cluster config: %w", err)
	}

	clientset, err := kubernetes.NewForConfig(restConfig)
	if err != nil {
		return nil, fmt.Errorf("failed to create kubernetes client: %w", err)
	}

66
	return &NodeController{
67
68
69
70
		config:     cfg,
		clientset:  clientset,
		containerd: containerd,
		log:        log,
71
		holderID:   "snapshot-agent/" + uuid.NewString(),
72
73
		inFlight:   make(map[string]struct{}),
		stopCh:     make(chan struct{}),
74
75
76
	}, nil
}

77
78
79
// Run starts the local pod informers and processes checkpoint/restore events.
func (w *NodeController) Run(ctx context.Context) error {
	w.log.Info("Starting snapshot node controller",
80
		"node", w.config.NodeName,
81
82
		"checkpoint", snapshotprotocol.CheckpointSourceLabel,
		"restore", snapshotprotocol.RestoreTargetLabel,
83
	)
84

85
86
87
88
89
90
	var nsOptions []informers.SharedInformerOption
	if w.config.RestrictedNamespace != "" {
		w.log.Info("Restricting pod watching to namespace", "namespace", w.config.RestrictedNamespace)
		nsOptions = append(nsOptions, informers.WithNamespace(w.config.RestrictedNamespace))
	} else {
		w.log.Info("Watching pods cluster-wide (all namespaces)")
91
92
	}

93
94
95
96
	var syncFuncs []cache.InformerSynced

	// Checkpoint informer
	checkpointSelector := labels.SelectorFromSet(labels.Set{
97
		snapshotprotocol.CheckpointSourceLabel: "true",
98
99
	}).String()

100
	ckptFactoryOpts := append([]informers.SharedInformerOption{
101
		informers.WithTweakListOptions(func(opts *metav1.ListOptions) {
102
			opts.LabelSelector = checkpointSelector
103
		}),
104
	}, nsOptions...)
105

106
107
	ckptFactory := informers.NewSharedInformerFactoryWithOptions(
		w.clientset, 30*time.Second, ckptFactoryOpts...,
108
109
	)

110
	ckptInformer := ckptFactory.Core().V1().Pods().Informer()
111
	if _, err := ckptInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
112
		AddFunc: func(obj interface{}) {
113
114
115
116
			pod, ok := podFromInformerObj(obj)
			if !ok {
				return
			}
117
			w.reconcileCheckpointPod(ctx, pod)
118
		},
119
120
121
122
123
		UpdateFunc: func(_, newObj interface{}) {
			pod, ok := podFromInformerObj(newObj)
			if !ok {
				return
			}
124
			w.reconcileCheckpointPod(ctx, pod)
125
		},
126
127
128
	}); err != nil {
		return fmt.Errorf("failed to add checkpoint informer handler: %w", err)
	}
129
130
	go ckptFactory.Start(w.stopCh)
	syncFuncs = append(syncFuncs, ckptInformer.HasSynced)
131

132
133
	// Restore informer
	restoreSelector := labels.SelectorFromSet(labels.Set{
134
		snapshotprotocol.RestoreTargetLabel: "true",
135
	}).String()
136

137
138
139
140
141
	restoreFactoryOpts := append([]informers.SharedInformerOption{
		informers.WithTweakListOptions(func(opts *metav1.ListOptions) {
			opts.LabelSelector = restoreSelector
		}),
	}, nsOptions...)
142

143
144
145
	restoreFactory := informers.NewSharedInformerFactoryWithOptions(
		w.clientset, 30*time.Second, restoreFactoryOpts...,
	)
146

147
	restoreInformer := restoreFactory.Core().V1().Pods().Informer()
148
	if _, err := restoreInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
149
150
151
152
153
		AddFunc: func(obj interface{}) {
			pod, ok := podFromInformerObj(obj)
			if !ok {
				return
			}
154
			w.reconcileRestorePod(ctx, pod)
155
156
157
158
159
160
		},
		UpdateFunc: func(_, newObj interface{}) {
			pod, ok := podFromInformerObj(newObj)
			if !ok {
				return
			}
161
			w.reconcileRestorePod(ctx, pod)
162
		},
163
164
165
	}); err != nil {
		return fmt.Errorf("failed to add restore informer handler: %w", err)
	}
166
167
	go restoreFactory.Start(w.stopCh)
	syncFuncs = append(syncFuncs, restoreInformer.HasSynced)
168

169
170
	if !cache.WaitForCacheSync(w.stopCh, syncFuncs...) {
		return fmt.Errorf("failed to sync informer caches")
171
172
	}

173
	w.log.Info("Snapshot node controller started and caches synced")
174
	<-ctx.Done()
175
	close(w.stopCh)
176
	return nil
177
178
}

179
func (w *NodeController) reconcileCheckpointPod(ctx context.Context, pod *corev1.Pod) {
180
181
182
	if pod.Spec.NodeName != w.config.NodeName {
		return
	}
183
	if !isPodReady(pod) {
184
185
186
187
188
		return
	}

	podKey := fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)

189
190
191
	checkpointID, ok := pod.Labels[snapshotprotocol.CheckpointIDLabel]
	if !ok || checkpointID == "" {
		w.log.Info("Pod has checkpoint label but no checkpoint-id label", "pod", podKey)
192
193
194
		return
	}

195
196
197
198
199
200
	job, err := getCheckpointJob(ctx, w.clientset, pod)
	if err != nil {
		w.log.Error(err, "Failed to resolve checkpoint job", "pod", podKey)
		return
	}

201
202
	jobStatus := job.Annotations[snapshotprotocol.CheckpointStatusAnnotation]
	if jobStatus == snapshotprotocol.CheckpointStatusCompleted || jobStatus == snapshotprotocol.CheckpointStatusFailed {
203
204
205
		return
	}

206
207
	if !w.tryAcquire(podKey) {
		return
208
209
	}

210
	checkpointLocation, err := w.checkpointLocationFromPod(pod, checkpointID)
211
212
	if err != nil {
		w.release(podKey)
213
		w.log.Error(err, "Checkpoint pod is missing storage metadata", "pod", podKey, "checkpoint_id", checkpointID)
214
215
216
217
218
219
		return
	}

	acquiredLease, err := acquireCheckpointLease(ctx, w.clientset, w.log, job, w.holderID)
	if err != nil {
		w.release(podKey)
220
		w.log.Error(err, "Failed to acquire checkpoint lease", "pod", podKey, "checkpoint_id", checkpointID)
221
222
223
224
225
226
227
		return
	}
	if !acquiredLease {
		w.release(podKey)
		return
	}

228
229
	w.log.Info("Pod ready, triggering checkpoint", "pod", podKey, "checkpoint_id", checkpointID)
	emitPodEvent(ctx, w.clientset, w.log, pod, "snapshot", corev1.EventTypeNormal, "CheckpointRequested", fmt.Sprintf("Checkpoint requested: %s", checkpointID))
230

231
	go func() {
232
233
		if err := w.runCheckpoint(ctx, pod, job, checkpointID, checkpointLocation, podKey); err != nil {
			opLog := w.log.WithValues("pod", podKey, "checkpoint_id", checkpointID)
234
			opLog.Error(err, "Checkpoint controller worker failed")
235
236
237
			emitPodEvent(ctx, w.clientset, opLog, pod, "snapshot", corev1.EventTypeWarning, "CheckpointWorkerFailed", err.Error())
		}
	}()
238
239
}

240
func (w *NodeController) reconcileRestorePod(ctx context.Context, pod *corev1.Pod) {
241
242
	if pod.Spec.NodeName != w.config.NodeName {
		return
243
244
	}

245
	podKey := fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)
246

247
	if pod.Status.Phase != corev1.PodRunning {
248
249
250
		return
	}

251
252
253
	checkpointID, ok := pod.Labels[snapshotprotocol.CheckpointIDLabel]
	if !ok || checkpointID == "" {
		w.log.Info("Restore pod has no checkpoint-id label", "pod", podKey)
254
255
256
		return
	}

257
258
	if strings.ContainsAny(checkpointID, "/\\") || strings.Contains(checkpointID, "..") || filepath.Clean(checkpointID) != checkpointID {
		w.log.Error(fmt.Errorf("invalid checkpoint id %q", checkpointID), "Invalid checkpoint id on restore pod", "pod", podKey)
259
260
261
		return
	}

262
	checkpointLocation, err := w.checkpointLocationFromPod(pod, checkpointID)
263
	if err != nil {
264
		w.log.Error(err, "Restore pod is missing storage metadata", "pod", podKey, "checkpoint_id", checkpointID)
265
266
267
		return
	}
	if _, err := os.Stat(checkpointLocation); os.IsNotExist(err) {
268
		w.log.V(1).Info("Checkpoint not ready on disk, skipping restore", "pod", podKey, "checkpoint_id", checkpointID, "checkpoint_location", checkpointLocation)
269
		return
270
271
	}

272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
	containerName := resolveMainContainerName(pod)
	if containerName == "" {
		w.log.Info("Restore pod has no containers", "pod", podKey)
		return
	}

	containerID := ""
	for _, cs := range pod.Status.ContainerStatuses {
		if cs.Name != containerName || cs.ContainerID == "" {
			continue
		}
		containerID = strings.TrimPrefix(cs.ContainerID, "containerd://")
		break
	}
	if containerID == "" {
		w.log.V(1).Info("Restore pod has no running main container yet", "pod", podKey, "container", containerName)
		return
	}

291
292
293
	annotationStatus := pod.Annotations[snapshotprotocol.RestoreStatusAnnotation]
	annotationContainerID := pod.Annotations[snapshotprotocol.RestoreContainerIDAnnotation]
	if annotationContainerID == containerID && (annotationStatus == snapshotprotocol.RestoreStatusCompleted || annotationStatus == snapshotprotocol.RestoreStatusInProgress) {
294
295
296
297
298
		return
	}

	restoreAttemptKey := fmt.Sprintf("%s/%s", podKey, containerID)
	if !w.tryAcquire(restoreAttemptKey) {
299
300
301
		return
	}

302
303
	w.log.Info("Restore pod running, triggering external restore", "pod", podKey, "checkpoint_id", checkpointID)
	emitPodEvent(ctx, w.clientset, w.log, pod, "snapshot", corev1.EventTypeNormal, "RestoreRequested", fmt.Sprintf("Restore requested from checkpoint %s", checkpointID))
304

305
	go func() {
306
307
		if err := w.runRestore(ctx, pod, containerName, containerID, checkpointID, checkpointLocation, restoreAttemptKey); err != nil {
			opLog := w.log.WithValues("pod", podKey, "checkpoint_id", checkpointID)
308
			opLog.Error(err, "Restore controller worker failed")
309
310
311
			emitPodEvent(ctx, w.clientset, opLog, pod, "snapshot", corev1.EventTypeWarning, "RestoreWorkerFailed", err.Error())
		}
	}()
312
}
313

314
315
// runCheckpoint runs the full checkpoint workflow for a pod:
//  1. Hold and renew the checkpoint lease
316
//  2. Resolve the container ID and host PID
317
//  3. Call executor.Checkpoint (inspect → configure → CUDA lock/checkpoint → CRIU dump → rootfs diff)
318
//  4. SIGUSR1 the process on success (notify workload), SIGKILL on failure (terminate immediately)
319
//  5. Mark job as completed or failed
320
func (w *NodeController) runCheckpoint(ctx context.Context, pod *corev1.Pod, job *batchv1.Job, checkpointID, checkpointLocation, podKey string) error {
321
	releasePodOnExit := true
322
	defer func() {
323
		if releasePodOnExit {
324
325
326
			w.release(podKey)
		}
	}()
327
	log := w.log.WithValues("pod", podKey, "checkpoint_id", checkpointID)
328
329
	leaseCtx, stopLease := context.WithCancelCause(ctx)
	defer stopLease(nil)
330

331
332
333
334
	releaseLeaseOnExit := true
	defer func() {
		if !releaseLeaseOnExit {
			return
335
		}
336
337
338
339
340
341
		releaseCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
		defer cancel()
		if err := releaseCheckpointLease(releaseCtx, w.clientset, log, job, w.holderID); err != nil {
			log.Error(err, "Failed to release checkpoint lease")
		}
	}()
342

343
344
345
346
	go w.renewCheckpointLease(leaseCtx, log, job, stopLease)

	setCheckpointStatus := func(value string) error {
		if err := annotateJob(ctx, w.clientset, log, job, map[string]string{
347
			snapshotprotocol.CheckpointStatusAnnotation: value,
348
349
350
351
		}); err != nil {
			releasePodOnExit = false
			releaseLeaseOnExit = false
			return fmt.Errorf("failed to persist terminal checkpoint status %q: %w", value, err)
352
353
354
		}
		return nil
	}
355
356
357
358
359
360

	// Resolve the target container
	containerName := resolveMainContainerName(pod)
	if containerName == "" {
		err := fmt.Errorf("no containers found in pod spec")
		log.Error(err, "Checkpoint failed")
361
		emitPodEvent(ctx, w.clientset, log, pod, "snapshot", corev1.EventTypeWarning, "CheckpointFailed", err.Error())
362
		if statusErr := setCheckpointStatus(snapshotprotocol.CheckpointStatusFailed); statusErr != nil {
363
364
365
			return statusErr
		}
		return nil
366
367
368
369
370
371
372
373
374
	}
	var containerID string
	for _, cs := range pod.Status.ContainerStatuses {
		if cs.Name == containerName {
			containerID = strings.TrimPrefix(cs.ContainerID, "containerd://")
			break
		}
	}
	if containerID == "" {
375
		emitPodEvent(ctx, w.clientset, log, pod, "snapshot", corev1.EventTypeWarning, "CheckpointFailed", "Could not resolve target container ID")
376
		if statusErr := setCheckpointStatus(snapshotprotocol.CheckpointStatusFailed); statusErr != nil {
377
378
379
			return statusErr
		}
		return nil
380
381
	}

382
	// Resolve the container's host PID (needed for signaling after checkpoint)
383
	containerPID, _, err := snapshotruntime.ResolveContainer(ctx, w.containerd, containerID)
384
	if err != nil {
385
		log.Error(err, "Failed to resolve container")
386
		emitPodEvent(ctx, w.clientset, log, pod, "snapshot", corev1.EventTypeWarning, "CheckpointFailed", fmt.Sprintf("Container resolve failed: %v", err))
387
		if statusErr := setCheckpointStatus(snapshotprotocol.CheckpointStatusFailed); statusErr != nil {
388
389
390
			return statusErr
		}
		return nil
391
392
	}

393
	// Step 1: Run the checkpoint orchestrator
394
	req := executor.CheckpointRequest{
395
396
397
398
399
400
401
402
		ContainerID:        containerID,
		ContainerName:      containerName,
		CheckpointID:       checkpointID,
		CheckpointLocation: checkpointLocation,
		NodeName:           w.config.NodeName,
		PodName:            pod.Name,
		PodNamespace:       pod.Namespace,
		Clientset:          w.clientset,
403
404
405
406
407
	}
	if err := executor.Checkpoint(leaseCtx, w.containerd, log, req, w.config); err != nil {
		if cause := context.Cause(leaseCtx); cause != nil && cause != context.Canceled {
			err = fmt.Errorf("checkpoint lease lost: %w", cause)
		}
408
		log.Error(err, "Checkpoint failed")
409
		emitPodEvent(ctx, w.clientset, log, pod, "snapshot", corev1.EventTypeWarning, "CheckpointFailed", err.Error())
410
		// SIGKILL on failure: process is unrecoverable (CUDA locked), terminate immediately
411
		if signalErr := snapshotruntime.SendSignalToPID(log, containerPID, syscall.SIGKILL, "checkpoint failed"); signalErr != nil {
412
413
			log.Error(signalErr, "Failed to signal checkpoint failure to runtime process")
		}
414
		if statusErr := setCheckpointStatus(snapshotprotocol.CheckpointStatusFailed); statusErr != nil {
415
416
417
			return statusErr
		}
		return nil
418
419
	}

420
421
422
423
424
425
426
427
428
	info, err := os.Stat(checkpointLocation)
	if err != nil || !info.IsDir() {
		if err == nil {
			err = fmt.Errorf("published checkpoint path %s is not a directory", checkpointLocation)
		} else {
			err = fmt.Errorf("published checkpoint path %s is missing: %w", checkpointLocation, err)
		}
		log.Error(err, "Checkpoint failed verification")
		emitPodEvent(ctx, w.clientset, log, pod, "snapshot", corev1.EventTypeWarning, "CheckpointFailed", err.Error())
429
		if signalErr := snapshotruntime.SendSignalToPID(log, containerPID, syscall.SIGKILL, "checkpoint verification failed"); signalErr != nil {
430
431
			log.Error(signalErr, "Failed to signal checkpoint verification failure to runtime process")
		}
432
		if statusErr := setCheckpointStatus(snapshotprotocol.CheckpointStatusFailed); statusErr != nil {
433
434
435
436
437
			return statusErr
		}
		return nil
	}

438
	// Step 2: SIGUSR1 on success: notify the workload that checkpoint completed
439
440
	emitPodEvent(ctx, w.clientset, log, pod, "snapshot", corev1.EventTypeNormal, "CheckpointSucceeded", fmt.Sprintf("Checkpoint completed: %s", checkpointID))
	if err := snapshotruntime.SendSignalToPID(log, containerPID, syscall.SIGUSR1, "checkpoint complete"); err != nil {
441
		log.Error(err, "Failed to signal checkpoint completion to runtime process")
442
		emitPodEvent(ctx, w.clientset, log, pod, "snapshot", corev1.EventTypeWarning, "CheckpointFailed", err.Error())
443
		if statusErr := setCheckpointStatus(snapshotprotocol.CheckpointStatusFailed); statusErr != nil {
444
445
446
			return statusErr
		}
		return nil
447
448
	}

449
	if err := setCheckpointStatus(snapshotprotocol.CheckpointStatusCompleted); err != nil {
450
451
452
		return err
	}
	return nil
453
454
}

455
456
457
// runRestore runs the full restore workflow for a pod:
//  1. Mark the current container instance as in_progress
//  2. Call executor.Restore (inspect placeholder → nsrestore inside namespace)
458
459
//  3. SIGCONT the restored process to wake it up
//  4. Wait for the pod to become Ready
460
//  5. Mark the container instance as completed
461
func (w *NodeController) runRestore(ctx context.Context, pod *corev1.Pod, containerName, containerID, checkpointID, checkpointLocation, restoreAttemptKey string) error {
462
463
464
	releaseOnExit := true
	defer func() {
		if releaseOnExit {
465
			w.release(restoreAttemptKey)
466
467
		}
	}()
468
	podKey := fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)
469
	log := w.log.WithValues("pod", podKey, "checkpoint_id", checkpointID, "container_id", containerID)
470
471
	setRestoreStatus := func(value string) error {
		annotations := map[string]string{
472
473
			snapshotprotocol.RestoreStatusAnnotation:      value,
			snapshotprotocol.RestoreContainerIDAnnotation: containerID,
474
		}
475
		if err := annotatePod(ctx, w.clientset, log, pod, annotations); err != nil {
476
			if value == snapshotprotocol.RestoreStatusCompleted {
477
478
479
480
481
482
483
				releaseOnExit = false
				return fmt.Errorf("failed to persist terminal restore status %q: %w", value, err)
			}
			return fmt.Errorf("failed to update restore status %q: %w", value, err)
		}
		return nil
	}
484
485

	if err := annotatePod(ctx, w.clientset, log, pod, map[string]string{
486
487
		snapshotprotocol.RestoreStatusAnnotation:      snapshotprotocol.RestoreStatusInProgress,
		snapshotprotocol.RestoreContainerIDAnnotation: containerID,
488
	}); err != nil {
489
		return fmt.Errorf("failed to annotate pod with restore in_progress: %w", err)
490
	}
491

492
	// Step 1: Run the restore orchestrator (inspect + nsrestore)
493
	req := executor.RestoreRequest{
494
495
496
497
498
499
500
		CheckpointID:       checkpointID,
		CheckpointLocation: checkpointLocation,
		NSRestorePath:      w.config.Restore.NSRestorePath,
		PodName:            pod.Name,
		PodNamespace:       pod.Namespace,
		ContainerName:      containerName,
		Clientset:          w.clientset,
501
502
	}
	restoredPID, err := executor.Restore(ctx, w.containerd, log, req)
503
504
	if err != nil {
		log.Error(err, "External restore failed")
505
		emitPodEvent(ctx, w.clientset, log, pod, "snapshot", corev1.EventTypeWarning, "RestoreFailed", err.Error())
506
		placeholderHostPID, _, pidErr := snapshotruntime.ResolveContainerByPod(ctx, w.containerd, pod.Name, pod.Namespace, containerName)
507
508
509
510
		if pidErr != nil {
			releaseOnExit = false
			return fmt.Errorf("restore failed and placeholder PID could not be resolved: %w", pidErr)
		}
511
		if killErr := snapshotruntime.SendSignalToPID(log, placeholderHostPID, syscall.SIGKILL, "restore failed"); killErr != nil {
512
513
			releaseOnExit = false
			return fmt.Errorf("restore failed and placeholder could not be killed: %w", killErr)
514
515
		}
		return nil
516
517
	}

518
	// Step 2: SIGCONT the restored process via PID namespace
519
	placeholderHostPID, _, err := snapshotruntime.ResolveContainerByPod(ctx, w.containerd, pod.Name, pod.Namespace, containerName)
520
	if err != nil {
521
		log.Error(err, "Failed to resolve placeholder host PID for signaling")
522
		emitPodEvent(ctx, w.clientset, log, pod, "snapshot", corev1.EventTypeWarning, "RestoreFailed", err.Error())
523
524
		releaseOnExit = false
		return fmt.Errorf("failed to resolve placeholder host PID for signaling: %w", err)
525
	}
526
	if err := snapshotruntime.SendSignalViaPIDNamespace(ctx, log, placeholderHostPID, restoredPID, syscall.SIGCONT, "restore complete"); err != nil {
527
		log.Error(err, "Failed to signal restored runtime process")
528
		emitPodEvent(ctx, w.clientset, log, pod, "snapshot", corev1.EventTypeWarning, "RestoreFailed", err.Error())
529
		if killErr := snapshotruntime.SendSignalToPID(log, placeholderHostPID, syscall.SIGKILL, "restore signaling failed"); killErr != nil {
530
			log.Error(killErr, "Failed to kill placeholder after restore signaling failure")
531
		}
532
533
		releaseOnExit = false
		return fmt.Errorf("failed to signal restored runtime process: %w", err)
534
535
	}

536
537
538
539
540
541
542
543
544
	// Step 3: Wait for the pod to become Ready
	readyCtx := ctx
	if timeout := w.config.Restore.RestoreReadyTimeout(); timeout > 0 {
		var cancel context.CancelFunc
		readyCtx, cancel = context.WithTimeout(ctx, timeout)
		defer cancel()
	}
	if err := waitForPodReady(readyCtx, w.clientset, pod.Namespace, pod.Name, containerName); err != nil {
		log.Error(err, "Restore post-signal readiness check failed")
545
		emitPodEvent(ctx, w.clientset, log, pod, "snapshot", corev1.EventTypeWarning, "RestoreFailed", err.Error())
546
		if killErr := snapshotruntime.SendSignalToPID(log, placeholderHostPID, syscall.SIGKILL, "restore readiness failed"); killErr != nil {
547
			log.Error(killErr, "Failed to kill placeholder after restore readiness failure")
548
		}
549
550
		releaseOnExit = false
		return fmt.Errorf("restore post-signal readiness check failed: %w", err)
551
552
	}

553
554
	emitPodEvent(ctx, w.clientset, log, pod, "snapshot", corev1.EventTypeNormal, "RestoreSucceeded", fmt.Sprintf("Restore completed from checkpoint %s", checkpointID))
	if err := setRestoreStatus(snapshotprotocol.RestoreStatusCompleted); err != nil {
555
556
557
		return err
	}
	return nil
558
559
}

560
func (w *NodeController) tryAcquire(podKey string) bool {
561
562
563
564
565
566
567
568
569
	w.inFlightMu.Lock()
	defer w.inFlightMu.Unlock()
	if _, held := w.inFlight[podKey]; held {
		return false
	}
	w.inFlight[podKey] = struct{}{}
	return true
}

570
func (w *NodeController) release(podKey string) {
571
572
573
	w.inFlightMu.Lock()
	defer w.inFlightMu.Unlock()
	delete(w.inFlight, podKey)
574
}
575

576
577
578
579
580
581
582
583
584
585
586
func (w *NodeController) checkpointLocationFromPod(pod *corev1.Pod, checkpointID string) (string, error) {
	resolvedStorage, err := snapshotprotocol.ResolveCheckpointStorage(
		checkpointID,
		strings.TrimSpace(pod.Annotations[snapshotprotocol.CheckpointArtifactVersionAnnotation]),
		snapshotprotocol.Storage{
			Type:     w.config.Storage.Type,
			BasePath: w.config.Storage.BasePath,
		},
	)
	if err != nil {
		return "", err
587
	}
588
	return resolvedStorage.Location, nil
589
}