Unverified Commit bb8fc8a4 authored by Schwinn Saereesitthipitak's avatar Schwinn Saereesitthipitak Committed by GitHub
Browse files

feat(chrek): external restore, signal-based IPC, and package refactor (#6286)


Co-authored-by: default avatarDan Feigin <dfeigin@nvidia.com>
parent c8423b57
package watcher
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ktypes "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)
func podFromInformerObj(obj interface{}) (*corev1.Pod, bool) {
if pod, ok := obj.(*corev1.Pod); ok {
return pod, true
}
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
return nil, false
}
pod, ok := tombstone.Obj.(*corev1.Pod)
return pod, ok
}
func resolveMainContainerName(pod *corev1.Pod) string {
containerName := ""
for _, c := range pod.Spec.Containers {
if c.Name == "main" {
return c.Name
}
if containerName == "" {
containerName = c.Name
}
}
return containerName
}
func isPodReady(pod *corev1.Pod) bool {
if pod.Status.Phase != corev1.PodRunning {
return false
}
for _, cond := range pod.Status.Conditions {
if cond.Type == corev1.PodReady && cond.Status == corev1.ConditionTrue {
return true
}
}
return false
}
func annotatePod(ctx context.Context, clientset kubernetes.Interface, log logr.Logger, pod *corev1.Pod, annotations map[string]string) error {
patchBytes, err := json.Marshal(map[string]any{
"metadata": map[string]any{
"annotations": annotations,
},
})
if err != nil {
return fmt.Errorf("failed to build annotation patch payload: %w", err)
}
_, err = clientset.CoreV1().Pods(pod.Namespace).Patch(
ctx, pod.Name, ktypes.MergePatchType, patchBytes, metav1.PatchOptions{},
)
if err != nil {
log.Error(err, "Failed to annotate pod",
"pod", fmt.Sprintf("%s/%s", pod.Namespace, pod.Name),
"annotations", annotations,
)
}
return err
}
func waitForPodReady(ctx context.Context, clientset kubernetes.Interface, namespace, podName, containerName string) error {
lastPhase := ""
for {
pod, err := clientset.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get pod %s/%s: %w", namespace, podName, err)
}
lastPhase = string(pod.Status.Phase)
for _, condition := range pod.Status.Conditions {
if condition.Type == corev1.PodReady && condition.Status == corev1.ConditionTrue {
return nil
}
}
for _, cs := range pod.Status.ContainerStatuses {
if cs.Name != containerName {
continue
}
if cs.State.Terminated != nil {
return fmt.Errorf(
"pod %s/%s container %s terminated: reason=%s exitCode=%d",
namespace, podName, containerName,
cs.State.Terminated.Reason, cs.State.Terminated.ExitCode,
)
}
}
select {
case <-ctx.Done():
return fmt.Errorf("pod %s/%s did not become Ready (last phase: %s): %w", namespace, podName, lastPhase, ctx.Err())
case <-time.After(1 * time.Second):
}
}
}
func emitPodEvent(ctx context.Context, clientset kubernetes.Interface, log logr.Logger, pod *corev1.Pod, component, eventType, reason, message string) {
event := &corev1.Event{
ObjectMeta: metav1.ObjectMeta{
GenerateName: fmt.Sprintf("%s-", pod.Name),
Namespace: pod.Namespace,
},
InvolvedObject: corev1.ObjectReference{
Kind: "Pod",
Namespace: pod.Namespace,
Name: pod.Name,
UID: pod.UID,
APIVersion: "v1",
},
Type: eventType,
Reason: reason,
Message: message,
Source: corev1.EventSource{
Component: component,
},
Count: 1,
FirstTimestamp: metav1.Now(),
LastTimestamp: metav1.Now(),
}
if _, err := clientset.CoreV1().Events(pod.Namespace).Create(ctx, event, metav1.CreateOptions{}); err != nil {
log.Error(err, "Failed to create event",
"pod", fmt.Sprintf("%s/%s", pod.Namespace, pod.Name),
"reason", reason,
"message", message,
)
}
}
// Package watcher provides Kubernetes pod watching for automatic checkpointing.
// Package watcher provides Kubernetes pod watching for automatic checkpoint/restore.
// The watcher is the sole entry point for chrek operations — it detects pods with
// checkpoint/restore labels and calls the orchestrators directly.
package watcher
import (
"context"
"encoding/json"
"fmt"
"net/http"
"os"
"path/filepath"
"strings"
"sync"
"syscall"
"time"
"github.com/sirupsen/logrus"
"github.com/containerd/containerd"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
......@@ -20,46 +23,38 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"github.com/ai-dynamo/dynamo/deploy/chrek/pkg/checkpoint"
"github.com/ai-dynamo/dynamo/deploy/chrek/pkg/common"
"github.com/ai-dynamo/dynamo/deploy/chrek/pkg/orchestrate"
"github.com/ai-dynamo/dynamo/deploy/chrek/pkg/types"
)
// SignalFile represents the content of a checkpoint completion signal file
type SignalFile struct {
CheckpointID string `json:"checkpoint_id"`
CheckpointPath string `json:"checkpoint_path"`
Timestamp time.Time `json:"timestamp"`
Success bool `json:"success"`
Error string `json:"error,omitempty"`
}
// WatcherConfig holds watcher configuration.
type WatcherConfig struct {
NodeName string
ListenAddr string // HTTP server address for health checks (e.g., ":8080")
RestrictedNamespace string // Optional: restrict watching to this namespace (empty = cluster-wide)
// Checkpoint configuration (from ConfigMap)
CheckpointSpec *checkpoint.CheckpointSpec
}
const (
kubeLabelIsCheckpointSource = "nvidia.com/chrek-is-checkpoint-source"
kubeLabelCheckpointHash = "nvidia.com/chrek-checkpoint-hash"
kubeLabelIsRestoreTarget = "nvidia.com/chrek-is-restore-target"
kubeAnnotationCheckpointStatus = "nvidia.com/chrek-checkpoint-status"
kubeAnnotationRestoreStatus = "nvidia.com/chrek-restore-status"
)
// Watcher watches for pods with checkpoint labels and triggers checkpoints
// Watcher watches for pods with checkpoint/restore labels and triggers operations.
type Watcher struct {
config WatcherConfig
clientset kubernetes.Interface
discoveryClient *checkpoint.DiscoveryClient
checkpointer *checkpoint.Checkpointer
log *logrus.Entry
config *types.AgentConfig
clientset kubernetes.Interface
containerd *containerd.Client
log logr.Logger
// Track pods checkpoint status: "in_progress", "completed", or "" (not started/failed)
checkpointed map[string]string
checkpointedMu sync.RWMutex
inFlight map[string]struct{}
inFlightMu sync.Mutex
stopCh chan struct{}
}
// NewWatcher creates a new pod watcher
func NewWatcher(cfg WatcherConfig, discoveryClient *checkpoint.DiscoveryClient, checkpointer *checkpoint.Checkpointer) (*Watcher, error) {
// Create in-cluster Kubernetes client
// NewWatcher creates a new pod watcher.
func NewWatcher(
cfg *types.AgentConfig,
containerd *containerd.Client,
log logr.Logger,
) (*Watcher, error) {
restConfig, err := rest.InClusterConfig()
if err != nil {
return nil, fmt.Errorf("failed to get in-cluster config: %w", err)
......@@ -71,375 +66,366 @@ func NewWatcher(cfg WatcherConfig, discoveryClient *checkpoint.DiscoveryClient,
}
return &Watcher{
config: cfg,
clientset: clientset,
discoveryClient: discoveryClient,
checkpointer: checkpointer,
log: logrus.WithField("component", "watcher"),
checkpointed: make(map[string]string),
config: cfg,
clientset: clientset,
containerd: containerd,
log: log,
inFlight: make(map[string]struct{}),
stopCh: make(chan struct{}),
}, nil
}
// Start begins watching for pods and starts the health check server
// Start begins watching for pods and processing checkpoint/restore events.
func (w *Watcher) Start(ctx context.Context) error {
if w.config.CheckpointSpec == nil {
return fmt.Errorf("checkpoint spec is required")
}
w.log.WithFields(logrus.Fields{
"node": w.config.NodeName,
"label": checkpoint.KubeLabelCheckpointSource,
}).Info("Starting pod watcher")
w.log.Info("Starting pod watcher",
"node", w.config.NodeName,
"checkpoint", kubeLabelIsCheckpointSource,
"restore", kubeLabelIsRestoreTarget,
)
// Start health check HTTP server if address is configured
if w.config.ListenAddr != "" {
httpServer := w.startHealthServer(ctx)
defer func() {
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
httpServer.Shutdown(shutdownCtx)
}()
var nsOptions []informers.SharedInformerOption
if w.config.RestrictedNamespace != "" {
w.log.Info("Restricting pod watching to namespace", "namespace", w.config.RestrictedNamespace)
nsOptions = append(nsOptions, informers.WithNamespace(w.config.RestrictedNamespace))
} else {
w.log.Info("Watching pods cluster-wide (all namespaces)")
}
// Create informer factory with label selector and optional namespace restriction
labelSelector := labels.SelectorFromSet(labels.Set{
checkpoint.KubeLabelCheckpointSource: "true",
var syncFuncs []cache.InformerSynced
// Checkpoint informer
checkpointSelector := labels.SelectorFromSet(labels.Set{
kubeLabelIsCheckpointSource: "true",
}).String()
factoryOptions := []informers.SharedInformerOption{
ckptFactoryOpts := append([]informers.SharedInformerOption{
informers.WithTweakListOptions(func(opts *metav1.ListOptions) {
opts.LabelSelector = labelSelector
opts.LabelSelector = checkpointSelector
}),
}
}, nsOptions...)
// If namespace is specified, restrict watching to that namespace
if w.config.RestrictedNamespace != "" {
w.log.WithField("namespace", w.config.RestrictedNamespace).Info("Restricting pod watching to namespace")
factoryOptions = append(factoryOptions, informers.WithNamespace(w.config.RestrictedNamespace))
} else {
w.log.Info("Watching pods cluster-wide (all namespaces)")
}
factory := informers.NewSharedInformerFactoryWithOptions(
w.clientset,
30*time.Second,
factoryOptions...,
ckptFactory := informers.NewSharedInformerFactoryWithOptions(
w.clientset, 30*time.Second, ckptFactoryOpts...,
)
podInformer := factory.Core().V1().Pods().Informer()
// Add event handlers
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
ckptInformer := ckptFactory.Core().V1().Pods().Informer()
ckptInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*corev1.Pod)
w.handlePodEvent(ctx, pod)
pod, ok := podFromInformerObj(obj)
if !ok {
return
}
w.handleCheckpointPodEvent(ctx, pod)
},
UpdateFunc: func(oldObj, newObj interface{}) {
pod := newObj.(*corev1.Pod)
w.handlePodEvent(ctx, pod)
UpdateFunc: func(_, newObj interface{}) {
pod, ok := podFromInformerObj(newObj)
if !ok {
return
}
w.handleCheckpointPodEvent(ctx, pod)
},
})
go ckptFactory.Start(w.stopCh)
syncFuncs = append(syncFuncs, ckptInformer.HasSynced)
// Start informer
go factory.Start(w.stopCh)
// Wait for cache sync
if !cache.WaitForCacheSync(w.stopCh, podInformer.HasSynced) {
return fmt.Errorf("failed to sync informer cache")
}
w.log.Info("Pod watcher started and cache synced")
// Wait for context cancellation
<-ctx.Done()
close(w.stopCh)
// Restore informer
restoreSelector := labels.SelectorFromSet(labels.Set{
kubeLabelIsRestoreTarget: "true",
}).String()
return nil
}
restoreFactoryOpts := append([]informers.SharedInformerOption{
informers.WithTweakListOptions(func(opts *metav1.ListOptions) {
opts.LabelSelector = restoreSelector
}),
}, nsOptions...)
// HealthResponse is the response for health check endpoint
type HealthResponse struct {
Status string `json:"status"`
NodeName string `json:"node_name"`
}
restoreFactory := informers.NewSharedInformerFactoryWithOptions(
w.clientset, 30*time.Second, restoreFactoryOpts...,
)
// startHealthServer starts an HTTP server for health checks
func (w *Watcher) startHealthServer(ctx context.Context) *http.Server {
mux := http.NewServeMux()
mux.HandleFunc("/health", func(rw http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(rw, "Method not allowed", http.StatusMethodNotAllowed)
return
}
rw.Header().Set("Content-Type", "application/json")
json.NewEncoder(rw).Encode(HealthResponse{
Status: "healthy",
NodeName: w.config.NodeName,
})
restoreInformer := restoreFactory.Core().V1().Pods().Informer()
restoreInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod, ok := podFromInformerObj(obj)
if !ok {
return
}
w.handleRestorePodEvent(ctx, pod)
},
UpdateFunc: func(_, newObj interface{}) {
pod, ok := podFromInformerObj(newObj)
if !ok {
return
}
w.handleRestorePodEvent(ctx, pod)
},
})
go restoreFactory.Start(w.stopCh)
syncFuncs = append(syncFuncs, restoreInformer.HasSynced)
server := &http.Server{
Addr: w.config.ListenAddr,
Handler: mux,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
IdleTimeout: 60 * time.Second,
if !cache.WaitForCacheSync(w.stopCh, syncFuncs...) {
return fmt.Errorf("failed to sync informer caches")
}
go func() {
w.log.WithField("addr", w.config.ListenAddr).Info("Starting health check server")
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
w.log.WithError(err).Error("Health check server error")
}
}()
return server
}
// Stop stops the watcher
func (w *Watcher) Stop() {
w.log.Info("Pod watcher started and caches synced")
<-ctx.Done()
close(w.stopCh)
return nil
}
// handlePodEvent processes a pod event
func (w *Watcher) handlePodEvent(ctx context.Context, pod *corev1.Pod) {
// Filter to pods on this node
func (w *Watcher) handleCheckpointPodEvent(ctx context.Context, pod *corev1.Pod) {
if pod.Spec.NodeName != w.config.NodeName {
return
}
// Check if pod is Ready
if !w.isPodReady(pod) {
if !isPodReady(pod) {
return
}
// Check if we've already checkpointed this pod
podKey := fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)
// Get checkpoint ID from label (uses the checkpoint hash)
checkpointID, ok := pod.Labels[checkpoint.KubeLabelCheckpointHash]
if !ok || checkpointID == "" {
w.log.WithField("pod", podKey).Warn("Pod has checkpoint label but no checkpoint-hash label")
checkpointHash, ok := pod.Labels[kubeLabelCheckpointHash]
if !ok || checkpointHash == "" {
w.log.Info("Pod has checkpoint label but no checkpoint-hash label", "pod", podKey)
return
}
// Check if checkpoint is already in progress or completed for this pod
w.checkpointedMu.Lock()
status := w.checkpointed[podKey]
if status == "completed" || status == "in_progress" {
w.checkpointedMu.Unlock()
annotationStatus := pod.Annotations[kubeAnnotationCheckpointStatus]
if annotationStatus == "completed" || annotationStatus == "in_progress" {
return
}
// Mark as in_progress to prevent concurrent attempts
w.checkpointed[podKey] = "in_progress"
w.checkpointedMu.Unlock()
// Trigger checkpoint
w.log.WithFields(logrus.Fields{
"pod": podKey,
"checkpoint_id": checkpointID,
}).Info("Pod ready, triggering checkpoint")
go w.doCheckpoint(ctx, pod, checkpointID, podKey)
}
// isPodReady checks if all containers in the pod are ready
func (w *Watcher) isPodReady(pod *corev1.Pod) bool {
if pod.Status.Phase != corev1.PodRunning {
return false
if !w.tryAcquire(podKey) {
return
}
for _, cond := range pod.Status.Conditions {
if cond.Type == corev1.PodReady && cond.Status == corev1.ConditionTrue {
return true
}
}
w.log.Info("Pod ready, triggering checkpoint", "pod", podKey, "checkpoint_hash", checkpointHash)
emitPodEvent(ctx, w.clientset, w.log, pod, "chrek", corev1.EventTypeNormal, "CheckpointRequested", fmt.Sprintf("Checkpoint requested: %s", checkpointHash))
return false
go w.doCheckpoint(ctx, pod, checkpointHash, podKey)
}
// doCheckpoint performs the checkpoint and writes the signal file
func (w *Watcher) doCheckpoint(ctx context.Context, pod *corev1.Pod, checkpointID, podKey string) {
log := w.log.WithFields(logrus.Fields{
"pod": podKey,
"checkpoint_id": checkpointID,
})
// Find the main container and get signal file path from env
var containerID string
var containerName string
var signalFilePath string
for _, container := range pod.Spec.Containers {
if container.Name == "main" || len(pod.Spec.Containers) == 1 {
containerName = container.Name
// Get signal file path from environment
for _, env := range container.Env {
if env.Name == "DYN_CHECKPOINT_SIGNAL_FILE" {
signalFilePath = env.Value
break
}
}
break
}
func (w *Watcher) handleRestorePodEvent(ctx context.Context, pod *corev1.Pod) {
if pod.Spec.NodeName != w.config.NodeName {
return
}
// Get container ID from status
for _, cs := range pod.Status.ContainerStatuses {
if cs.Name == "main" || len(pod.Status.ContainerStatuses) == 1 {
// Remove containerd:// prefix
containerID = cs.ContainerID
if len(containerID) > 13 && containerID[:13] == "containerd://" {
containerID = containerID[13:]
}
break
}
}
podKey := fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)
if containerID == "" {
log.Error("Could not find container ID")
w.checkpointedMu.Lock()
delete(w.checkpointed, podKey)
w.checkpointedMu.Unlock()
if pod.Status.Phase != corev1.PodRunning {
return
}
if signalFilePath == "" {
log.Warn("No DYN_CHECKPOINT_SIGNAL_FILE env var found, signal file will not be written")
annotationStatus := pod.Annotations[kubeAnnotationRestoreStatus]
if isPodReady(pod) {
return
}
log.WithFields(logrus.Fields{
"container_id": containerID,
"signal_file_path": signalFilePath,
}).Info("Found container, starting checkpoint")
// Restore failures require explicit intervention (new label/update) before retry.
if annotationStatus == "completed" || annotationStatus == "in_progress" || annotationStatus == "failed" {
return
}
// Resolve container to get PID for signal file writing.
containerPID, _, err := w.discoveryClient.ResolveContainer(ctx, containerID)
if err != nil {
log.WithError(err).Error("Failed to resolve container")
w.checkpointedMu.Lock()
delete(w.checkpointed, podKey)
w.checkpointedMu.Unlock()
checkpointHash, ok := pod.Labels[kubeLabelCheckpointHash]
if !ok || checkpointHash == "" {
w.log.Info("Restore pod has no checkpoint-hash label", "pod", podKey)
return
}
// Validate CheckpointSpec is set
if w.config.CheckpointSpec == nil {
log.Error("CheckpointSpec is nil - cannot perform checkpoint")
w.checkpointedMu.Lock()
delete(w.checkpointed, podKey)
w.checkpointedMu.Unlock()
if strings.ContainsAny(checkpointHash, "/\\") || strings.Contains(checkpointHash, "..") || filepath.Clean(checkpointHash) != checkpointHash {
w.log.Error(fmt.Errorf("invalid checkpoint hash %q", checkpointHash), "Invalid checkpoint hash on restore pod", "pod", podKey)
return
}
// Perform checkpoint
params := checkpoint.CheckpointRequest{
ContainerID: containerID,
ContainerName: containerName,
CheckpointID: checkpointID,
CheckpointDir: w.config.CheckpointSpec.BasePath,
NodeName: w.config.NodeName,
PodName: pod.Name,
PodNamespace: pod.Namespace,
checkpointDir := filepath.Join(w.config.BasePath, checkpointHash)
if _, err := os.Stat(checkpointDir); os.IsNotExist(err) {
w.log.V(1).Info("Checkpoint not ready on disk, skipping restore", "pod", podKey, "checkpoint_hash", checkpointHash)
return
}
result, err := w.checkpointer.Checkpoint(ctx, params, w.config.CheckpointSpec)
if err != nil {
log.WithError(err).Error("Checkpoint failed")
// Write failure marker to PVC so restore pods know checkpoint failed
checkpointDir := filepath.Join(w.config.CheckpointSpec.BasePath, checkpointID)
w.writeCheckpointDoneMarker(checkpointDir, checkpointID, false, err.Error(), log)
if signalFilePath != "" {
w.writeSignalFileToPod(containerPID, signalFilePath, checkpointID, "", false, err.Error())
}
// Clear the in_progress status so checkpoint can be retried
w.checkpointedMu.Lock()
delete(w.checkpointed, podKey)
w.checkpointedMu.Unlock()
if !w.tryAcquire(podKey) {
return
}
log.WithField("checkpoint_dir", result.CheckpointDir).Info("Checkpoint completed successfully")
w.log.Info("Restore pod running, triggering external restore", "pod", podKey, "checkpoint_hash", checkpointHash)
emitPodEvent(ctx, w.clientset, w.log, pod, "chrek", corev1.EventTypeNormal, "RestoreRequested", fmt.Sprintf("Restore requested from checkpoint %s", checkpointHash))
// Write checkpoint.done marker to PVC for cross-node restore detection
w.writeCheckpointDoneMarker(result.CheckpointDir, checkpointID, true, "", log)
go w.doRestore(ctx, pod, checkpointHash, podKey)
}
// Write signal file to pod's hostPath for checkpoint job pod to exit
if signalFilePath != "" {
w.writeSignalFileToPod(containerPID, signalFilePath, checkpointID, result.CheckpointDir, true, "")
// doCheckpoint runs the full checkpoint workflow for a pod:
// 1. Mark pod as in_progress
// 2. Resolve the container ID and host PID
// 3. Call orchestrate.Checkpoint (inspect → configure → CUDA lock/checkpoint → CRIU dump → rootfs diff)
// 4. SIGUSR1 the process on success (notify workload), SIGUSR2 on failure (wake it up)
// 5. Mark pod as completed or failed
func (w *Watcher) doCheckpoint(ctx context.Context, pod *corev1.Pod, checkpointHash, podKey string) {
defer w.release(podKey)
log := w.log.WithValues("pod", podKey, "checkpoint_hash", checkpointHash)
if err := annotatePod(ctx, w.clientset, log, pod, map[string]string{
kubeAnnotationCheckpointStatus: "in_progress",
}); err != nil {
log.Error(err, "Failed to annotate pod with checkpoint in_progress")
return
}
// Mark as completed so we don't checkpoint again
w.checkpointedMu.Lock()
w.checkpointed[podKey] = "completed"
w.checkpointedMu.Unlock()
}
// writeSignalFileToPod writes a signal file to the checkpointed pod's filesystem
// via /proc/<pid>/root to indicate checkpoint completion
func (w *Watcher) writeSignalFileToPod(pid int, signalFilePath, checkpointID, checkpointPath string, success bool, errMsg string) {
signal := SignalFile{
CheckpointID: checkpointID,
CheckpointPath: checkpointPath,
Timestamp: time.Now().UTC(),
Success: success,
Error: errMsg,
// Resolve the target container
containerName := resolveMainContainerName(pod)
if containerName == "" {
err := fmt.Errorf("no containers found in pod spec")
log.Error(err, "Checkpoint failed")
emitPodEvent(ctx, w.clientset, log, pod, "chrek", corev1.EventTypeWarning, "CheckpointFailed", err.Error())
annotatePod(ctx, w.clientset, log, pod, map[string]string{kubeAnnotationCheckpointStatus: "failed"})
return
}
var containerID string
for _, cs := range pod.Status.ContainerStatuses {
if cs.Name == containerName {
containerID = strings.TrimPrefix(cs.ContainerID, "containerd://")
break
}
}
if containerID == "" {
emitPodEvent(ctx, w.clientset, log, pod, "chrek", corev1.EventTypeWarning, "CheckpointFailed", "Could not resolve target container ID")
annotatePod(ctx, w.clientset, log, pod, map[string]string{kubeAnnotationCheckpointStatus: "failed"})
return
}
data, err := json.MarshalIndent(signal, "", " ")
// Resolve the container's host PID (needed for signaling after checkpoint)
containerPID, _, err := common.ResolveContainer(ctx, w.containerd, containerID)
if err != nil {
w.log.WithError(err).Error("Failed to marshal signal file")
log.Error(err, "Failed to resolve container")
emitPodEvent(ctx, w.clientset, log, pod, "chrek", corev1.EventTypeWarning, "CheckpointFailed", fmt.Sprintf("Container resolve failed: %v", err))
annotatePod(ctx, w.clientset, log, pod, map[string]string{kubeAnnotationCheckpointStatus: "failed"})
return
}
// Write to the pod's filesystem via /proc/<pid>/root
hostSignalPath := fmt.Sprintf("%s/%d/root%s", checkpoint.HostProcPath, pid, signalFilePath)
// Ensure signal directory exists in pod's filesystem
signalDir := filepath.Dir(hostSignalPath)
if err := os.MkdirAll(signalDir, 0755); err != nil {
w.log.WithError(err).WithField("path", signalDir).Error("Failed to create signal directory in pod")
// Step 1: Run the checkpoint orchestrator
req := orchestrate.CheckpointRequest{
ContainerID: containerID,
ContainerName: containerName,
CheckpointHash: checkpointHash,
CheckpointDir: w.config.BasePath,
NodeName: w.config.NodeName,
PodName: pod.Name,
PodNamespace: pod.Namespace,
}
if err := orchestrate.Checkpoint(ctx, w.containerd, log, req, w.config); err != nil {
log.Error(err, "Checkpoint failed")
emitPodEvent(ctx, w.clientset, log, pod, "chrek", corev1.EventTypeWarning, "CheckpointFailed", err.Error())
// SIGUSR2 on failure: tell the workload to wake up and continue
if signalErr := common.SendSignalToPID(log, containerPID, syscall.SIGUSR2, "checkpoint failed"); signalErr != nil {
log.Error(signalErr, "Failed to signal checkpoint failure to runtime process")
}
annotatePod(ctx, w.clientset, log, pod, map[string]string{kubeAnnotationCheckpointStatus: "failed"})
return
}
if err := os.WriteFile(hostSignalPath, data, 0644); err != nil {
w.log.WithError(err).WithField("path", hostSignalPath).Error("Failed to write signal file to pod")
// Step 2: SIGUSR1 on success: notify the workload that checkpoint completed
emitPodEvent(ctx, w.clientset, log, pod, "chrek", corev1.EventTypeNormal, "CheckpointSucceeded", fmt.Sprintf("Checkpoint completed: %s", checkpointHash))
if err := common.SendSignalToPID(log, containerPID, syscall.SIGUSR1, "checkpoint complete"); err != nil {
log.Error(err, "Failed to signal checkpoint completion to runtime process")
emitPodEvent(ctx, w.clientset, log, pod, "chrek", corev1.EventTypeWarning, "CheckpointFailed", err.Error())
annotatePod(ctx, w.clientset, log, pod, map[string]string{kubeAnnotationCheckpointStatus: "failed"})
return
}
w.log.WithFields(logrus.Fields{
"host_path": hostSignalPath,
"pod_path": signalFilePath,
"pid": pid,
"success": success,
}).Info("Signal file written to pod filesystem")
annotatePod(ctx, w.clientset, log, pod, map[string]string{kubeAnnotationCheckpointStatus: "completed"})
}
// writeCheckpointDoneMarker writes a checkpoint.done marker file to the checkpoint directory on shared PVC.
func (w *Watcher) writeCheckpointDoneMarker(checkpointDir, checkpointID string, success bool, errMsg string, log *logrus.Entry) {
markerPath := filepath.Join(checkpointDir, checkpoint.CheckpointDoneFilename)
// doRestore runs the full restore workflow for a pod:
// 1. Mark pod as in_progress
// 2. Call orchestrate.Restore (inspect placeholder → nsrestore inside namespace)
// 3. SIGCONT the restored process to wake it up
// 4. Wait for the pod to become Ready
// 5. Mark pod as completed or failed
func (w *Watcher) doRestore(ctx context.Context, pod *corev1.Pod, checkpointHash, podKey string) {
defer w.release(podKey)
log := w.log.WithValues("pod", podKey, "checkpoint_hash", checkpointHash)
if err := annotatePod(ctx, w.clientset, log, pod, map[string]string{
kubeAnnotationRestoreStatus: "in_progress",
}); err != nil {
log.Error(err, "Failed to annotate pod with restore in_progress")
return
}
marker := SignalFile{
CheckpointID: checkpointID,
CheckpointPath: checkpointDir,
Timestamp: time.Now().UTC(),
Success: success,
Error: errMsg,
containerName := resolveMainContainerName(pod)
if containerName == "" {
err := fmt.Errorf("no containers found in pod spec")
log.Error(err, "Restore failed")
emitPodEvent(ctx, w.clientset, log, pod, "chrek", corev1.EventTypeWarning, "RestoreFailed", err.Error())
annotatePod(ctx, w.clientset, log, pod, map[string]string{kubeAnnotationRestoreStatus: "failed"})
return
}
// Step 1: Run the restore orchestrator (inspect + nsrestore)
req := orchestrate.RestoreRequest{
CheckpointHash: checkpointHash,
CheckpointBase: w.config.BasePath,
NSRestorePath: w.config.Restore.NSRestorePath,
PodName: pod.Name,
PodNamespace: pod.Namespace,
ContainerName: containerName,
}
restoredPID, err := orchestrate.Restore(ctx, w.containerd, log, req)
if err != nil {
log.Error(err, "External restore failed")
emitPodEvent(ctx, w.clientset, log, pod, "chrek", corev1.EventTypeWarning, "RestoreFailed", err.Error())
annotatePod(ctx, w.clientset, log, pod, map[string]string{kubeAnnotationRestoreStatus: "failed"})
return
}
data, err := json.MarshalIndent(marker, "", " ")
// Step 2: SIGCONT the restored process via PID namespace
placeholderHostPID, _, err := common.ResolveContainerByPod(ctx, w.containerd, pod.Name, pod.Namespace, containerName)
if err != nil {
log.WithError(err).Error("Failed to marshal checkpoint.done marker")
log.Error(err, "Failed to resolve placeholder host PID for signaling")
emitPodEvent(ctx, w.clientset, log, pod, "chrek", corev1.EventTypeWarning, "RestoreFailed", err.Error())
annotatePod(ctx, w.clientset, log, pod, map[string]string{kubeAnnotationRestoreStatus: "failed"})
return
}
if err := common.SendSignalViaPIDNamespace(ctx, log, placeholderHostPID, restoredPID, syscall.SIGCONT, "restore complete"); err != nil {
log.Error(err, "Failed to signal restored runtime process")
emitPodEvent(ctx, w.clientset, log, pod, "chrek", corev1.EventTypeWarning, "RestoreFailed", err.Error())
annotatePod(ctx, w.clientset, log, pod, map[string]string{kubeAnnotationRestoreStatus: "failed"})
return
}
if err := os.WriteFile(markerPath, data, 0644); err != nil {
log.WithError(err).WithField("path", markerPath).Error("Failed to write checkpoint.done marker")
// Step 3: Wait for the pod to become Ready
readyCtx := ctx
if timeout := w.config.Restore.RestoreReadyTimeout(); timeout > 0 {
var cancel context.CancelFunc
readyCtx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}
if err := waitForPodReady(readyCtx, w.clientset, pod.Namespace, pod.Name, containerName); err != nil {
log.Error(err, "Restore post-signal readiness check failed")
emitPodEvent(ctx, w.clientset, log, pod, "chrek", corev1.EventTypeWarning, "RestoreFailed", err.Error())
annotatePod(ctx, w.clientset, log, pod, map[string]string{kubeAnnotationRestoreStatus: "failed"})
return
}
log.WithFields(logrus.Fields{
"path": markerPath,
"success": success,
}).Info("checkpoint.done marker written to PVC")
emitPodEvent(ctx, w.clientset, log, pod, "chrek", corev1.EventTypeNormal, "RestoreSucceeded", fmt.Sprintf("Restore completed from checkpoint %s", checkpointHash))
annotatePod(ctx, w.clientset, log, pod, map[string]string{kubeAnnotationRestoreStatus: "completed"})
}
func (w *Watcher) tryAcquire(podKey string) bool {
w.inFlightMu.Lock()
defer w.inFlightMu.Unlock()
if _, held := w.inFlight[podKey]; held {
return false
}
w.inFlight[podKey] = struct{}{}
return true
}
func (w *Watcher) release(podKey string) {
w.inFlightMu.Lock()
defer w.inFlightMu.Unlock()
delete(w.inFlight, podKey)
}
package watcher
import (
"context"
"os"
"path/filepath"
"testing"
"time"
"github.com/go-logr/logr/testr"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
"github.com/ai-dynamo/dynamo/deploy/chrek/pkg/types"
)
const testNodeName = "test-node"
// makeTestWatcher creates a Watcher with a fake k8s client and nil orchestrators.
// The fake clientset is empty so any goroutine launched by doCheckpoint/doRestore
// will fail on the first annotatePod call and exit cleanly.
func makeTestWatcher(t *testing.T) *Watcher {
t.Helper()
return &Watcher{
config: &types.AgentConfig{
NodeName: testNodeName,
BasePath: t.TempDir(),
},
clientset: fake.NewSimpleClientset(),
log: testr.New(t),
inFlight: make(map[string]struct{}),
stopCh: make(chan struct{}),
}
}
func makePod(name, namespace, nodeName string, phase corev1.PodPhase, ready bool, labels, annotations map[string]string) *corev1.Pod {
var conditions []corev1.PodCondition
if ready {
conditions = append(conditions, corev1.PodCondition{
Type: corev1.PodReady,
Status: corev1.ConditionTrue,
})
}
return &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
Labels: labels,
Annotations: annotations,
},
Spec: corev1.PodSpec{
NodeName: nodeName,
Containers: []corev1.Container{
{Name: "main"},
},
},
Status: corev1.PodStatus{
Phase: phase,
Conditions: conditions,
},
}
}
func TestHandleCheckpointPodEvent(t *testing.T) {
tests := []struct {
name string
nodeName string
phase corev1.PodPhase
ready bool
hash string
annotation string
preSeed bool // pre-populate inFlight to test deduplication
want bool // true = pod passes filtering and triggers checkpoint
}{
{
name: "happy path",
nodeName: testNodeName,
phase: corev1.PodRunning,
ready: true,
hash: "abc123",
want: true,
},
{
name: "wrong node",
nodeName: "other-node",
phase: corev1.PodRunning,
ready: true,
hash: "abc123",
want: false,
},
{
name: "not running",
nodeName: testNodeName,
phase: corev1.PodPending,
ready: false,
hash: "abc123",
want: false,
},
{
name: "running but not ready",
nodeName: testNodeName,
phase: corev1.PodRunning,
ready: false,
hash: "abc123",
want: false,
},
{
name: "missing hash label",
nodeName: testNodeName,
phase: corev1.PodRunning,
ready: true,
hash: "",
want: false,
},
{
name: "already completed",
nodeName: testNodeName,
phase: corev1.PodRunning,
ready: true,
hash: "abc123",
annotation: "completed",
want: false,
},
{
name: "already in progress",
nodeName: testNodeName,
phase: corev1.PodRunning,
ready: true,
hash: "abc123",
annotation: "in_progress",
want: false,
},
{
name: "duplicate in-flight",
nodeName: testNodeName,
phase: corev1.PodRunning,
ready: true,
hash: "abc123",
preSeed: true,
want: false,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
labels := map[string]string{
kubeLabelIsCheckpointSource: "true",
}
if tc.hash != "" {
labels[kubeLabelCheckpointHash] = tc.hash
}
var annotations map[string]string
if tc.annotation != "" {
annotations = map[string]string{
kubeAnnotationCheckpointStatus: tc.annotation,
}
}
pod := makePod("test-pod", "default", tc.nodeName, tc.phase, tc.ready, labels, annotations)
w := makeTestWatcher(t)
ctx := context.Background()
if tc.preSeed {
w.inFlight["default/test-pod"] = struct{}{}
}
w.handleCheckpointPodEvent(ctx, pod)
// tryAcquire adds to inFlight synchronously before launching the goroutine.
// For filtered pods, inFlight stays at its original size.
triggered := len(w.inFlight) > 0 && !tc.preSeed
if tc.preSeed {
// Duplicate: inFlight was 1 before and should remain exactly 1
triggered = false
}
if triggered != tc.want {
t.Errorf("triggered = %v, want %v (inFlight=%d, preSeed=%v)", triggered, tc.want, len(w.inFlight), tc.preSeed)
}
// Let the background goroutine (if any) finish before the test ends
if tc.want {
time.Sleep(50 * time.Millisecond)
}
})
}
}
func TestHandleRestorePodEvent(t *testing.T) {
tests := []struct {
name string
nodeName string
phase corev1.PodPhase
ready bool
hash string
annotation string
createDir bool // whether to create the checkpoint dir on disk
preSeed bool
want bool
}{
{
name: "happy path",
nodeName: testNodeName,
phase: corev1.PodRunning,
ready: false,
hash: "abc123",
createDir: true,
want: true,
},
{
name: "wrong node",
nodeName: "other-node",
phase: corev1.PodRunning,
ready: false,
hash: "abc123",
createDir: true,
want: false,
},
{
name: "not running",
nodeName: testNodeName,
phase: corev1.PodPending,
ready: false,
hash: "abc123",
createDir: true,
want: false,
},
{
name: "already ready",
nodeName: testNodeName,
phase: corev1.PodRunning,
ready: true,
hash: "abc123",
createDir: true,
want: false,
},
{
name: "missing hash",
nodeName: testNodeName,
phase: corev1.PodRunning,
ready: false,
hash: "",
want: false,
},
{
name: "invalid hash with path traversal",
nodeName: testNodeName,
phase: corev1.PodRunning,
ready: false,
hash: "../bad",
createDir: true,
want: false,
},
{
name: "already completed",
nodeName: testNodeName,
phase: corev1.PodRunning,
ready: false,
hash: "abc123",
annotation: "completed",
createDir: true,
want: false,
},
{
name: "already in progress",
nodeName: testNodeName,
phase: corev1.PodRunning,
ready: false,
hash: "abc123",
annotation: "in_progress",
createDir: true,
want: false,
},
{
name: "already failed",
nodeName: testNodeName,
phase: corev1.PodRunning,
ready: false,
hash: "abc123",
annotation: "failed",
createDir: true,
want: false,
},
{
name: "checkpoint not on disk",
nodeName: testNodeName,
phase: corev1.PodRunning,
ready: false,
hash: "abc123",
createDir: false,
want: false,
},
{
name: "duplicate in-flight",
nodeName: testNodeName,
phase: corev1.PodRunning,
ready: false,
hash: "abc123",
createDir: true,
preSeed: true,
want: false,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
labels := map[string]string{
kubeLabelIsRestoreTarget: "true",
}
if tc.hash != "" {
labels[kubeLabelCheckpointHash] = tc.hash
}
var annotations map[string]string
if tc.annotation != "" {
annotations = map[string]string{
kubeAnnotationRestoreStatus: tc.annotation,
}
}
pod := makePod("test-pod", "default", tc.nodeName, tc.phase, tc.ready, labels, annotations)
w := makeTestWatcher(t)
if tc.createDir && tc.hash != "" {
dir := filepath.Join(w.config.BasePath, tc.hash)
if err := os.MkdirAll(dir, 0o755); err != nil {
t.Fatalf("failed to create checkpoint dir: %v", err)
}
}
ctx := context.Background()
if tc.preSeed {
w.inFlight["default/test-pod"] = struct{}{}
}
w.handleRestorePodEvent(ctx, pod)
triggered := len(w.inFlight) > 0 && !tc.preSeed
if tc.preSeed {
triggered = false
}
if triggered != tc.want {
t.Errorf("triggered = %v, want %v (inFlight=%d, preSeed=%v)", triggered, tc.want, len(w.inFlight), tc.preSeed)
}
// Let the background goroutine (if any) finish before the test ends
if tc.want {
time.Sleep(50 * time.Millisecond)
}
})
}
}
# Chrek Helm Chart
> ⚠️ **Experimental Feature**: ChReK is currently in **beta/preview**. It requires privileged mode for restore operations, which may not be suitable for all production environments. See [Prerequisites](#prerequisites) for security considerations.
> ⚠️ **Experimental Feature**: ChReK is currently in **beta/preview**. The DaemonSet runs in privileged mode to perform CRIU operations. See [Prerequisites](#prerequisites) for security considerations.
This Helm chart deploys the checkpoint/restore infrastructure for NVIDIA Dynamo, including:
- Persistent Volume Claim (PVC) for checkpoint storage
......@@ -14,14 +14,14 @@ This Helm chart deploys the checkpoint/restore infrastructure for NVIDIA Dynamo,
## Prerequisites
⚠️ **Security Warning**: ChReK restore operations require **privileged mode**, which grants containers elevated host access. This may violate security policies in production environments. Only deploy in environments where privileged containers are acceptable.
⚠️ **Security Warning**: The ChReK DaemonSet runs in **privileged mode** with `hostPID`, `hostIPC`, and `hostNetwork` to perform CRIU checkpoint/restore operations. Workload pods do not need privileged mode. Only deploy in environments where a privileged DaemonSet is acceptable.
- Kubernetes 1.21+
- GPU nodes with NVIDIA runtime (`nvidia` runtime class)
- CRIU support in the container runtime (containerd with CRIU plugin)
- NVIDIA Dynamo operator installed (cluster-wide or namespace-scoped)
- containerd runtime (for container inspection; CRIU is bundled in ChReK images)
- NVIDIA Dynamo operator installed (cluster-wide or namespace-scoped), **or** manual pod configuration — see [Standalone Usage](../../../../docs/pages/kubernetes/chrek/standalone.md#using-chrek-without-the-dynamo-operator) for required labels, seccomp profiles, command overrides, and deployment strategy when running without the operator
- RWX (ReadWriteMany) storage class for multi-node deployments
- **Security clearance for privileged pods** (required for restore operations)
- **Security clearance for privileged DaemonSet** (the ChReK agent runs privileged with hostPID/hostIPC/hostNetwork)
## Installation
......@@ -63,11 +63,10 @@ See `values.yaml` for all configuration options.
| `storage.pvc.name` | PVC name (must match operator config) | `chrek-pvc` |
| `storage.pvc.size` | PVC size | `100Gi` |
| `storage.pvc.storageClass` | Storage class name | `""` (default) |
| `daemonset.image.repository` | DaemonSet image repository | `nvidia/chrek-agent` |
| `daemonset.image.repository` | DaemonSet image repository | `nvcr.io/nvidian/dynamo-dev/chrek-agent` |
| `daemonset.nodeSelector` | Node selector for GPU nodes | `nvidia.com/gpu.present: "true"` |
| `daemonset.runtimeClassName` | Runtime class for GPU access | `nvidia` |
| `daemonset.criu.timeout` | CRIU timeout in seconds | `"21600"` (6 hours) |
| `daemonset.criu.ghostLimit` | CRIU ghost file size limit | `"512MB"` |
| `config.checkpoint.criu.ghostLimit` | CRIU ghost file size limit in bytes | `536870912` (512MB) |
| `config.checkpoint.criu.logLevel` | CRIU logging verbosity (0-4) | `4` |
| `rbac.namespaceRestricted` | Use namespace-scoped RBAC | `true` |
## Usage
......
......@@ -10,52 +10,37 @@ metadata:
{{- include "chrek.labels" . | nindent 4 }}
data:
config.yaml: |
# Chrek Configuration
# This ConfigMap provides static configuration for the checkpoint agent.
# Dynamic values (NODE_NAME, RESTRICTED_NAMESPACE, etc.) come from environment variables.
basePath: {{ .Values.storage.pvc.basePath | quote }}
agent:
# How checkpoints are triggered: "http" for REST API, "watcher" for auto-checkpoint
signalSource: {{ .Values.config.agent.signalSource | quote }}
# Watcher/HTTP server address
listenAddr: {{ .Values.config.agent.listenAddr | quote }}
overlay:
systemDirs: {{ toYaml .Values.config.overlay.systemDirs | nindent 8 }}
cacheDirs: {{ toYaml .Values.config.overlay.cacheDirs | nindent 8 }}
additionalExclusions: {{ toYaml .Values.config.overlay.additionalExclusions | nindent 8 }}
checkpoint:
# Base path for checkpoint directories (shared PVC mount path)
basePath: {{ .Values.storage.pvc.basePath | quote }}
restore:
nsRestorePath: {{ .Values.config.restore.nsRestorePath | quote }}
restoreReadyTimeoutSeconds: {{ .Values.config.restore.restoreReadyTimeoutSeconds }}
criu:
# RPC options
ghostLimit: {{ .Values.config.checkpoint.criu.ghostLimit }}
timeout: {{ .Values.config.checkpoint.criu.timeout }}
logLevel: {{ .Values.config.checkpoint.criu.logLevel }}
workDir: {{ .Values.config.checkpoint.criu.workDir | quote }}
# K8s-specific options
leaveRunning: {{ .Values.config.checkpoint.criu.leaveRunning }}
shellJob: {{ .Values.config.checkpoint.criu.shellJob }}
tcpClose: {{ .Values.config.checkpoint.criu.tcpClose }}
fileLocks: {{ .Values.config.checkpoint.criu.fileLocks }}
orphanPtsMaster: {{ .Values.config.checkpoint.criu.orphanPtsMaster }}
extUnixSk: {{ .Values.config.checkpoint.criu.extUnixSk }}
linkRemap: {{ .Values.config.checkpoint.criu.linkRemap }}
extMasters: {{ .Values.config.checkpoint.criu.extMasters }}
manageCgroupsMode: {{ .Values.config.checkpoint.criu.manageCgroupsMode | quote }}
# Advanced options
autoDedup: {{ .Values.config.checkpoint.criu.autoDedup }}
lazyPages: {{ .Values.config.checkpoint.criu.lazyPages }}
# Config file options (NOT available via RPC)
libDir: {{ .Values.config.checkpoint.criu.libDir | quote }}
allowUprobes: {{ .Values.config.checkpoint.criu.allowUprobes }}
skipInFlight: {{ .Values.config.checkpoint.criu.skipInFlight }}
rootfsExclusions:
# System directories excluded from rootfs diff (NVIDIA GPU Operator injected)
systemDirs: {{ toYaml .Values.config.checkpoint.rootfsExclusions.systemDirs | nindent 10 }}
# Cache directories to exclude (reduces checkpoint size)
cacheDirs: {{ toYaml .Values.config.checkpoint.rootfsExclusions.cacheDirs | nindent 10 }}
# Additional custom exclusions
additionalExclusions: {{ toYaml .Values.config.checkpoint.rootfsExclusions.additionalExclusions | nindent 10 }}
# NOTE: Restore runtime configuration is NOT in this ConfigMap.
# Placeholder containers do not mount it. Restore uses hardcoded defaults
# + operator-injected env vars. CRIU options come from saved checkpoint manifest.
criu:
binaryPath: {{ .Values.config.criu.binaryPath | quote }}
ghostLimit: {{ .Values.config.criu.ghostLimit }}
logLevel: {{ .Values.config.criu.logLevel }}
workDir: {{ .Values.config.criu.workDir | quote }}
leaveRunning: {{ .Values.config.criu.leaveRunning }}
shellJob: {{ .Values.config.criu.shellJob }}
tcpClose: {{ .Values.config.criu.tcpClose }}
fileLocks: {{ .Values.config.criu.fileLocks }}
orphanPtsMaster: {{ .Values.config.criu.orphanPtsMaster }}
extUnixSk: {{ .Values.config.criu.extUnixSk }}
linkRemap: {{ .Values.config.criu.linkRemap }}
extMasters: {{ .Values.config.criu.extMasters }}
manageCgroupsMode: {{ .Values.config.criu.manageCgroupsMode | quote }}
rstSibling: {{ .Values.config.criu.rstSibling }}
mntnsCompatMode: {{ .Values.config.criu.mntnsCompatMode }}
evasiveDevices: {{ .Values.config.criu.evasiveDevices }}
forceIrmap: {{ .Values.config.criu.forceIrmap }}
autoDedup: {{ .Values.config.criu.autoDedup }}
lazyPages: {{ .Values.config.criu.lazyPages }}
libDir: {{ .Values.config.criu.libDir | quote }}
allowUprobes: {{ .Values.config.criu.allowUprobes }}
skipInFlight: {{ .Values.config.criu.skipInFlight }}
......@@ -8,17 +8,14 @@ metadata:
namespace: {{ .Release.Namespace }}
labels:
{{- include "chrek.labels" . | nindent 4 }}
app.kubernetes.io/component: checkpoint-agent
spec:
selector:
matchLabels:
app.kubernetes.io/name: checkpoint-agent
app.kubernetes.io/instance: {{ .Release.Name }}
{{- include "chrek.selectorLabels" . | nindent 6 }}
template:
metadata:
labels:
app.kubernetes.io/name: checkpoint-agent
app.kubernetes.io/instance: {{ .Release.Name }}
{{- include "chrek.selectorLabels" . | nindent 8 }}
{{- with .Values.daemonset.podLabels }}
{{- toYaml . | nindent 8 }}
{{- end }}
......@@ -45,10 +42,8 @@ spec:
affinity:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- if .Values.daemonset.runtimeClassName }}
# Use specified runtime class for GPU access (e.g., nvidia for CUDA checkpointing)
runtimeClassName: {{ .Values.daemonset.runtimeClassName }}
{{- end }}
# CUDA checkpoint/restore requires the nvidia container runtime
runtimeClassName: nvidia
{{- if .Values.seccomp.deploy }}
initContainers:
# Deploy seccomp profile to host before starting the agent
......@@ -109,12 +104,15 @@ spec:
- name: containerd-storage
mountPath: /var/lib/containerd
readOnly: true
# Mount host proc for CRIU and signal file writing
# Mount host proc for CRIU and runtime PID signaling
- name: host-proc
mountPath: /host/proc
# Mount host cgroup for CRIU
# Mount host cgroup for CRIU (write access needed for cgroup freezer)
- name: host-cgroup
mountPath: /sys/fs/cgroup
# Kubelet PodResources API socket (for GPU UUID discovery)
- name: kubelet-pod-resources
mountPath: /var/lib/kubelet/pod-resources
readOnly: true
{{- if and (eq .Values.storage.type "oci") .Values.storage.oci.credentialsSecretRef }}
# Mount docker config for OCI registry auth
......@@ -165,7 +163,7 @@ spec:
hostPath:
path: /var/lib/containerd
type: Directory
# Host proc (for CRIU and signal files - needs write access)
# Host proc (for CRIU and runtime signaling - needs write access)
- name: host-proc
hostPath:
path: /proc
......@@ -175,6 +173,11 @@ spec:
hostPath:
path: /sys/fs/cgroup
type: Directory
# Kubelet PodResources API socket directory
- name: kubelet-pod-resources
hostPath:
path: /var/lib/kubelet/pod-resources
type: Directory
{{- if and (eq .Values.storage.type "oci") .Values.storage.oci.credentialsSecretRef }}
- name: docker-config
secret:
......@@ -184,4 +187,3 @@ spec:
imagePullSecrets:
{{- toYaml . | nindent 8 }}
{{- end }}
......@@ -12,10 +12,14 @@ metadata:
{{- include "chrek.labels" . | nindent 4 }}
app.kubernetes.io/component: checkpoint-agent
rules:
# Watch pods in this namespace to detect checkpoint-source pods becoming ready
# Watch and annotate pods in this namespace to drive checkpoint/restore lifecycle
- apiGroups: [""]
resources: ["pods"]
verbs: ["get", "list", "watch"]
verbs: ["get", "list", "watch", "patch", "update"]
# Emit operational events on pod/restore lifecycle updates
- apiGroups: [""]
resources: ["events"]
verbs: ["create"]
{{- else }}
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
......@@ -25,10 +29,13 @@ metadata:
{{- include "chrek.labels" . | nindent 4 }}
app.kubernetes.io/component: checkpoint-agent
rules:
# Watch pods cluster-wide to detect checkpoint-source pods on assigned nodes
# Watch and annotate pods cluster-wide on assigned nodes
- apiGroups: [""]
resources: ["pods"]
verbs: ["get", "list", "watch"]
verbs: ["get", "list", "watch", "patch", "update"]
# Emit operational events on pod/restore lifecycle updates
- apiGroups: [""]
resources: ["events"]
verbs: ["create"]
{{- end }}
{{- end }}
......@@ -63,10 +63,10 @@ daemonset:
# Resource limits and requests
resources:
limits:
cpu: 2
cpu: 4
memory: 4Gi
requests:
cpu: 500m
cpu: 2
memory: 1Gi
# Node selector - target GPU nodes
......@@ -82,9 +82,6 @@ daemonset:
operator: Exists
effect: NoSchedule
# Runtime class name for GPU access
runtimeClassName: nvidia
# Pod labels
podLabels: {}
......@@ -120,60 +117,62 @@ rbac:
namespaceRestricted: true
# Static configuration (loaded from ConfigMap)
# Dynamic values (NODE_NAME, RESTRICTED_NAMESPACE, etc.) still come from environment variables
# Dynamic values (NODE_NAME, RESTRICTED_NAMESPACE, etc.) come from environment variables
config:
agent:
# How checkpoints are triggered: "http" or "watcher"
signalSource: "watcher"
# HTTP server address for health checks and API
listenAddr: ":8080"
checkpoint:
criu:
# Ghost file size limit in bytes (512MB recommended for GPU workloads)
ghostLimit: 536870912
# CRIU timeout in seconds (6 hours for large GPU checkpoints)
timeout: 21600
# CRIU logging verbosity (0-4)
logLevel: 4
# CRIU work directory for temporary files
workDir: "/var/criu-work"
# K8s-specific options (recommended defaults for containers)
leaveRunning: true # Keep process running after checkpoint
shellJob: true # Containers are often session leaders
tcpClose: true # Pod IPs change on restore/migration
fileLocks: true # Applications use file locks
orphanPtsMaster: true # Containers with TTYs
extUnixSk: true # External Unix sockets
linkRemap: true # Handle deleted-but-open files
extMasters: true # External bind mount masters
manageCgroupsMode: "ignore" # Let K8s manage cgroups (ignore/soft/full/strict)
# Advanced options
autoDedup: false # Auto-deduplication of memory pages
lazyPages: false # Lazy page migration (experimental)
# Config file options (NOT available via RPC - written to criu.conf)
libDir: "/usr/local/lib/criu" # Plugin directory (required for GPU checkpoints)
allowUprobes: true # Required for CUDA
skipInFlight: true # Skip in-flight TCP connections
rootfsExclusions:
# System directories excluded from rootfs diff capture
# These are injected by NVIDIA GPU Operator and cause conflicts during restore
systemDirs:
- "./usr"
- "./etc"
- "./opt"
- "./var"
- "./run"
# Cache directories to exclude (reduces checkpoint size)
cacheDirs:
- "./.cache/huggingface"
# Additional custom exclusions (application-specific)
additionalExclusions: []
# NOTE: Restore configuration is NOT in this ConfigMap.
# Placeholder containers do not mount it. Restore defaults are hardcoded in Go.
# CRIU options for restore come from the saved checkpoint manifest (manifest.yaml).
overlay:
# Virtual FS dirs are COW artifacts in the overlay upperdir.
systemDirs:
- /proc
- /sys
- /dev
# Cache directories to exclude (reduces checkpoint size)
cacheDirs:
- /.cache/huggingface
# Python bytecode is already loaded in memory at restore time and
# regenerated automatically on cold start.
additionalExclusions:
- "*/__pycache__"
- "*.pyc"
restore:
# Path to the nsrestore binary in the placeholder image
nsRestorePath: /usr/local/bin/nsrestore
# Maximum seconds to wait for a restored pod to become Ready (0 = no timeout)
restoreReadyTimeoutSeconds: 0
criu:
# Path to the criu binary
binaryPath: /usr/local/sbin/criu
# Ghost file size limit in bytes. Deleted-but-open files smaller than this
# are saved inline in the checkpoint image as ghost files.
ghostLimit: 536870912
# CRIU logging verbosity (0-4)
logLevel: 4
# CRIU work directory for temporary files
workDir: /var/criu-work
# K8s-specific options (recommended defaults for containers)
leaveRunning: true # Keep process running after checkpoint
shellJob: true # Containers are often session leaders
tcpClose: true # Pod IPs change on restore/migration
fileLocks: true # Applications use file locks
orphanPtsMaster: true # Containers with TTYs
extUnixSk: true # External Unix sockets
linkRemap: true # Required for deleted-but-open files (e.g. POSIX semaphores in /dev/shm)
extMasters: true # External bind mount masters
manageCgroupsMode: soft # CRIU cgroup management mode (ignore/soft/full/strict)
# Restore-specific options (only apply during CRIU restore, not dump)
rstSibling: true # Restore as sibling process (required for go-criu swrk mode)
mntnsCompatMode: false # Mount namespace compatibility mode
evasiveDevices: true # Use any device path if original is inaccessible
forceIrmap: true # Force resolving inotify/fsnotify watch names
# Advanced options
autoDedup: false # Auto-deduplication of memory pages
lazyPages: false # Lazy page migration (experimental)
# Config file options (NOT available via RPC - written to criu.conf)
libDir: "" # Keep empty: external CUDA checkpoint/restore only (no CRIU CUDA plugin)
allowUprobes: true # Leave enabled for kernel/userspace probe compatibility
skipInFlight: true # Skip in-flight TCP connections
......@@ -165,7 +165,6 @@ The chart includes built-in validation to prevent all operator conflicts:
| dynamo-operator.checkpoint.readyForCheckpointFilePath | string | `"/tmp/ready-for-checkpoint"` | Path written by worker when model is loaded and ready for checkpointing |
| dynamo-operator.checkpoint.restoreMarkerFilePath | string | `"/tmp/dynamo-restored"` | Path written by restore-entrypoint after successful CRIU restore |
| dynamo-operator.checkpoint.storage.type | string | `"pvc"` | Storage backend type: pvc, s3, or oci |
| dynamo-operator.checkpoint.storage.signalHostPath | string | `"/var/lib/chrek/signals"` | Host path for signal files (communication between checkpoint pod and DaemonSet) |
| dynamo-operator.checkpoint.storage.pvc.pvcName | string | `"chrek-pvc"` | Name of the PVC created by the chrek chart |
| dynamo-operator.checkpoint.storage.pvc.basePath | string | `"/checkpoints"` | Base path within the PVC for storing checkpoints |
| dynamo-operator.checkpoint.storage.s3.uri | string | `""` | S3 URI in format: s3://[endpoint/]bucket/prefix |
......
......@@ -148,10 +148,7 @@ spec:
{{- if .Values.checkpoint.enabled }}
- --checkpoint-enabled=true
- --checkpoint-storage-type={{ .Values.checkpoint.storage.type }}
- --checkpoint-signal-host-path={{ .Values.checkpoint.storage.signalHostPath }}
- --checkpoint-init-container-image={{ .Values.checkpoint.initContainerImage }}
- --checkpoint-ready-for-checkpoint-file-path={{ .Values.checkpoint.readyForCheckpointFilePath }}
- --checkpoint-restore-marker-file-path={{ .Values.checkpoint.restoreMarkerFilePath }}
{{- if eq .Values.checkpoint.storage.type "pvc" }}
- --checkpoint-pvc-name={{ .Values.checkpoint.storage.pvc.pvcName }}
- --checkpoint-pvc-base-path={{ .Values.checkpoint.storage.pvc.basePath }}
......
......@@ -155,18 +155,10 @@ checkpoint:
# Enable checkpoint/restore functionality
enabled: false
# Image used for init containers in checkpoint jobs (e.g., signal file cleanup)
# Defaults to busybox:latest if not specified
initContainerImage: "busybox:latest"
# Path written by worker when model is loaded and ready for checkpointing
# Must match the path expected by checkpoint-enabled runtime images
readyForCheckpointFilePath: "/tmp/ready-for-checkpoint"
# Path written by restore-entrypoint after successful CRIU restore
# Must match the path expected by checkpoint-enabled runtime images
restoreMarkerFilePath: "/tmp/dynamo-restored"
# Storage configuration
# These settings tell the operator where to find checkpoint storage
# Must match the configuration in the chrek chart
......@@ -174,9 +166,6 @@ checkpoint:
# Storage backend type: pvc, s3, or oci
type: pvc
# Host path for signal files (communication between checkpoint pod and DaemonSet)
signalHostPath: "/var/lib/chrek/signals"
# PVC configuration (used when type=pvc)
pvc:
# Name of the PVC created by the chrek chart
......
......@@ -219,15 +219,9 @@ dynamo-operator:
# -- Whether to enable checkpoint/restore functionality
enabled: false
# -- Image used for init containers in checkpoint jobs (e.g., signal file cleanup)
initContainerImage: "busybox:latest"
# -- Path written by worker when model is loaded and ready for checkpointing
readyForCheckpointFilePath: "/tmp/ready-for-checkpoint"
# -- Path written by restore-entrypoint after successful CRIU restore
restoreMarkerFilePath: "/tmp/dynamo-restored"
# Storage configuration
# These settings tell the operator where to find checkpoint storage
# Must match the configuration in the chrek chart
......@@ -235,9 +229,6 @@ dynamo-operator:
# -- Storage backend type: pvc, s3, or oci
type: pvc
# -- Host path for signal files (communication between checkpoint pod and DaemonSet)
signalHostPath: "/var/lib/chrek/signals"
# PVC storage configuration (used when type=pvc)
pvc:
# -- Name of the PVC created by the chrek chart
......
......@@ -128,8 +128,6 @@ const (
DynamoCheckpointConditionJobCreated DynamoCheckpointConditionType = "JobCreated"
// DynamoCheckpointConditionJobCompleted indicates whether the checkpoint Job has completed
DynamoCheckpointConditionJobCompleted DynamoCheckpointConditionType = "JobCompleted"
// DynamoCheckpointConditionTarAvailable indicates whether the checkpoint tar file exists
DynamoCheckpointConditionTarAvailable DynamoCheckpointConditionType = "TarAvailable"
)
// DynamoCheckpointStatus defines the observed state of DynamoCheckpoint
......
......@@ -161,16 +161,13 @@ func main() {
// Checkpoint configuration
var checkpointEnabled bool
var checkpointStorageType string
var checkpointSignalHostPath string
var checkpointPVCName string
var checkpointPVCBasePath string
var checkpointS3URI string
var checkpointS3CredentialsSecret string
var checkpointOCIURI string
var checkpointOCICredentialsSecret string
var checkpointInitContainerImage string
var checkpointReadyForCheckpointFilePath string
var checkpointRestoreMarkerFilePath string
flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
flag.BoolVar(&enableLeaderElection, "leader-elect", false,
......@@ -231,8 +228,6 @@ func main() {
"Enable checkpoint/restore functionality")
flag.StringVar(&checkpointStorageType, "checkpoint-storage-type", commonController.CheckpointStorageTypePVC,
"Checkpoint storage backend type: pvc, s3, or oci")
flag.StringVar(&checkpointSignalHostPath, "checkpoint-signal-host-path", "/var/lib/chrek/signals",
"Host path for signal files used for checkpoint job coordination")
flag.StringVar(&checkpointPVCName, "checkpoint-pvc-name", "chrek-pvc",
"Name of the PVC for checkpoint storage (used when storage-type=pvc)")
flag.StringVar(&checkpointPVCBasePath, "checkpoint-pvc-base-path", "/checkpoints",
......@@ -245,13 +240,9 @@ func main() {
"OCI URI for checkpoint storage: oci://registry/repository (used when storage-type=oci)")
flag.StringVar(&checkpointOCICredentialsSecret, "checkpoint-oci-credentials-secret", "",
"Docker config secret name for OCI registry auth (used when storage-type=oci)")
flag.StringVar(&checkpointInitContainerImage, "checkpoint-init-container-image", "busybox:latest",
"Image to use for checkpoint init containers (e.g., signal file cleanup)")
flag.StringVar(&checkpointReadyForCheckpointFilePath,
"checkpoint-ready-for-checkpoint-file-path", "/tmp/ready-for-checkpoint",
"Path written by the worker container when the model is loaded and ready for checkpointing")
flag.StringVar(&checkpointRestoreMarkerFilePath, "checkpoint-restore-marker-file-path", "/tmp/dynamo-restored",
"Path written by restore-entrypoint after successful CRIU restore")
opts := zap.Options{
Development: true,
}
......@@ -331,12 +322,9 @@ func main() {
DiscoveryBackend: discoveryBackend,
Checkpoint: commonController.CheckpointConfig{
Enabled: checkpointEnabled,
InitContainerImage: checkpointInitContainerImage,
ReadyForCheckpointFilePath: checkpointReadyForCheckpointFilePath,
RestoreMarkerFilePath: checkpointRestoreMarkerFilePath,
Storage: commonController.CheckpointStorageConfig{
Type: checkpointStorageType,
SignalHostPath: checkpointSignalHostPath,
Type: checkpointStorageType,
PVC: commonController.CheckpointPVCConfig{
PVCName: checkpointPVCName,
BasePath: checkpointPVCBasePath,
......
......@@ -64,12 +64,6 @@ func GetPVCBasePath(config *controller_common.CheckpointConfig) string {
return ""
}
// storageTypeToAPI converts controller_common storage type string to API enum
func storageTypeToAPI(storageType string) nvidiacomv1alpha1.DynamoCheckpointStorageType {
// Simply cast - the values match between controller constants and API enum
return nvidiacomv1alpha1.DynamoCheckpointStorageType(storageType)
}
// CheckpointInfo contains resolved checkpoint information for a DGD service
type CheckpointInfo struct {
// Enabled indicates if checkpointing is enabled
......@@ -158,8 +152,7 @@ func ResolveCheckpointForService(
}
// InjectCheckpointEnvVars adds checkpoint-related environment variables to a restored/DGD container.
// Sets PATH, HASH, RESTORE_MARKER_FILE, and SKIP_WAIT_FOR_CHECKPOINT. The restore entrypoint constructs
// the full checkpoint location from PATH + "/" + HASH.
// Sets PATH and HASH so the restored process knows its checkpoint identity.
// DYN_CHECKPOINT_LOCATION is reserved for future S3/OCI support.
func InjectCheckpointEnvVars(container *corev1.Container, info *CheckpointInfo, checkpointConfig *controller_common.CheckpointConfig) {
if !info.Enabled {
......@@ -168,7 +161,7 @@ func InjectCheckpointEnvVars(container *corev1.Container, info *CheckpointInfo,
var envVars []corev1.EnvVar
// For PVC storage: inject base path so restore-entrypoint constructs location = path/hash.
// For PVC storage: inject base path so the restored process knows its checkpoint location.
// For S3/OCI (future): inject DYN_CHECKPOINT_LOCATION directly.
storageType := controller_common.CheckpointStorageTypePVC
if checkpointConfig != nil && checkpointConfig.Storage.Type != "" {
......@@ -201,19 +194,6 @@ func InjectCheckpointEnvVars(container *corev1.Container, info *CheckpointInfo,
Value: info.Hash,
})
}
if checkpointConfig != nil && checkpointConfig.RestoreMarkerFilePath != "" {
envVars = append(envVars, corev1.EnvVar{
Name: consts.EnvRestoreMarkerFile,
Value: checkpointConfig.RestoreMarkerFilePath,
})
}
// Tell the restore entrypoint to check once and cold-start if no checkpoint is ready.
// Without this (standalone/DaemonSet path), the entrypoint polls indefinitely.
envVars = append(envVars, corev1.EnvVar{
Name: consts.EnvSkipWaitForCheckpoint,
Value: "1",
})
// Prepend checkpoint env vars to ensure they're available
container.Env = append(envVars, container.Env...)
......@@ -255,51 +235,6 @@ func InjectCheckpointVolumeMount(container *corev1.Container, basePath string) {
})
}
// InjectCheckpointSignalVolume adds the checkpoint signal hostPath volume to a pod spec
// This is needed for CRIU mount namespace consistency between checkpoint and restore pods
func InjectCheckpointSignalVolume(podSpec *corev1.PodSpec, checkpointConfig *controller_common.CheckpointConfig) {
// Check if volume already exists
for _, v := range podSpec.Volumes {
if v.Name == consts.CheckpointSignalVolumeName {
return
}
}
// Get signal host path from config or use default
signalHostPath := ""
if checkpointConfig != nil {
signalHostPath = checkpointConfig.Storage.SignalHostPath
}
hostPathType := corev1.HostPathDirectoryOrCreate
podSpec.Volumes = append(podSpec.Volumes, corev1.Volume{
Name: consts.CheckpointSignalVolumeName,
VolumeSource: corev1.VolumeSource{
HostPath: &corev1.HostPathVolumeSource{
Path: signalHostPath,
Type: &hostPathType,
},
},
})
}
// InjectCheckpointSignalVolumeMount adds the checkpoint signal volume mount to a container
// This is needed for CRIU mount namespace consistency between checkpoint and restore pods
func InjectCheckpointSignalVolumeMount(container *corev1.Container) {
// Check if mount already exists
for _, m := range container.VolumeMounts {
if m.Name == consts.CheckpointSignalVolumeName {
return
}
}
container.VolumeMounts = append(container.VolumeMounts, corev1.VolumeMount{
Name: consts.CheckpointSignalVolumeName,
MountPath: consts.CheckpointSignalMountPath,
ReadOnly: false,
})
}
// InjectPodInfoVolume adds a Downward API volume for pod identity and DGD info.
// This is critical for CRIU checkpoint/restore scenarios where environment variables
// contain stale values from the checkpoint source pod. The Downward API files
......@@ -389,15 +324,19 @@ func InjectPodInfoVolumeMount(container *corev1.Container) {
})
}
// InjectCheckpointIntoPodSpec injects checkpoint configuration into a pod spec.
// This is the single entry point for ALL checkpoint-related pod modifications:
// 1. Command/Args transformation - moves Command to Args to respect image ENTRYPOINT
// 2. Security context - applies hostIPC and privileged mode for CRIU restore
// 3. Environment variables - injects checkpoint path, hash, and CRIU settings
// 4. Storage configuration - adds volumes and mounts based on storage type
// InjectCheckpointIntoPodSpec injects checkpoint configuration into a pod spec for
// external restore via the chrek DaemonSet. The pod image is expected to be a
// runtime-compatible restore image (runtime + CRIU tooling). For ready checkpoints,
// the operator overrides command to `sleep infinity` so the watcher can trigger
// external restore via nsenter + nsrestore.
//
// Modifications applied:
// 1. Security context - seccomp profile (io_uring blocking, matches checkpoint environment)
// 2. Environment variables - checkpoint path and hash
// 3. Storage configuration - checkpoint PVC and Downward API (pod identity)
//
// Takes CheckpointInfo (resolved by ResolveCheckpointForService) and checkpoint config.
// Returns error if checkpoint is enabled but configuration is invalid.
// No hostIPC, no privileged mode — those are only needed when CRIU runs inside the
// container. With external restore, all privilege lives in the DaemonSet.
func InjectCheckpointIntoPodSpec(
podSpec *corev1.PodSpec,
checkpointInfo *CheckpointInfo,
......@@ -407,11 +346,8 @@ func InjectCheckpointIntoPodSpec(
return nil
}
// Use the checkpoint info as-is (already computed by ResolveCheckpointForService)
// We only need to compute hash if it's not already set
info := checkpointInfo
if info.Hash == "" {
// Identity is required to compute the hash
if info.Identity == nil {
return fmt.Errorf("checkpoint enabled but identity is nil and hash is not set")
}
......@@ -422,7 +358,7 @@ func InjectCheckpointIntoPodSpec(
info.Hash = hash
}
// Find the main container first (needed for all modifications)
// Find the main container (needed for volume mounts and env vars)
var mainContainer *corev1.Container
for i := range podSpec.Containers {
if podSpec.Containers[i].Name == consts.MainContainerName {
......@@ -430,7 +366,6 @@ func InjectCheckpointIntoPodSpec(
break
}
}
// If no main container found by name, use the first container
if mainContainer == nil && len(podSpec.Containers) > 0 {
mainContainer = &podSpec.Containers[0]
}
......@@ -438,25 +373,16 @@ func InjectCheckpointIntoPodSpec(
return fmt.Errorf("no container found to inject checkpoint config")
}
// 1. Handle command/args for checkpoint-enabled images
// When checkpoint is enabled, the image ENTRYPOINT is /restore-entrypoint which
// decides between restore and cold start. We pass the user's command as arguments
// to this ENTRYPOINT (used as cold-start fallback if no checkpoint is ready).
if len(mainContainer.Command) > 0 {
// Combine Command + Args into a single Args array
// This allows the image's ENTRYPOINT to receive the full command as arguments
combinedArgs := append(mainContainer.Command, mainContainer.Args...)
mainContainer.Args = combinedArgs
mainContainer.Command = nil // Clear Command to use image's ENTRYPOINT
// When a ready checkpoint exists, override the container command to sleep infinity.
// The DaemonSet watcher detects this pod via the checkpoint-restore label and
// performs external restore (nsenter + nsrestore). When no checkpoint is ready,
// the original command runs (cold start).
if info.Ready {
mainContainer.Command = []string{"sleep", "infinity"}
mainContainer.Args = nil
}
// If Command is empty but Args exists, keep Args as-is (they'll be passed to ENTRYPOINT)
// 2. Apply pod-level security context for CRIU restore
// hostIPC: Required for CRIU to access shared memory segments and IPC resources
podSpec.HostIPC = true
// Apply seccomp profile to match checkpoint environment
// This blocks io_uring syscalls required for CRIU compatibility
// Seccomp profile to match checkpoint environment (blocks io_uring syscalls)
if podSpec.SecurityContext == nil {
podSpec.SecurityContext = &corev1.PodSecurityContext{}
}
......@@ -465,13 +391,6 @@ func InjectCheckpointIntoPodSpec(
LocalhostProfile: ptr.To(consts.SeccompProfilePath),
}
// Apply container-level security context for CRIU restore
// Privileged mode is required for CRIU restore operations
if mainContainer.SecurityContext == nil {
mainContainer.SecurityContext = &corev1.SecurityContext{}
}
mainContainer.SecurityContext.Privileged = ptr.To(true)
// Determine storage type and compute location/path
storageType := controller_common.CheckpointStorageTypePVC // default
var storageConfig *controller_common.CheckpointStorageConfig
......@@ -484,27 +403,21 @@ func InjectCheckpointIntoPodSpec(
switch storageType {
case controller_common.CheckpointStorageTypeS3:
// S3 storage: location is s3:// URI
// URI format: s3://[endpoint/]bucket/prefix
info.StorageType = storageTypeToAPI(storageType)
info.StorageType = nvidiacomv1alpha1.DynamoCheckpointStorageType(storageType)
if storageConfig == nil || storageConfig.S3.URI == "" {
return fmt.Errorf("S3 storage type selected but no S3 URI configured (set checkpoint.storage.s3.uri)")
}
info.Location = fmt.Sprintf("%s/%s.tar", storageConfig.S3.URI, info.Hash)
case controller_common.CheckpointStorageTypeOCI:
// OCI storage: location is oci:// URI
// URI format: oci://registry/repository
info.StorageType = storageTypeToAPI(storageType)
info.StorageType = nvidiacomv1alpha1.DynamoCheckpointStorageType(storageType)
if storageConfig == nil || storageConfig.OCI.URI == "" {
return fmt.Errorf("OCI storage type selected but no OCI URI configured (set checkpoint.storage.oci.uri)")
}
info.Location = fmt.Sprintf("%s:%s", storageConfig.OCI.URI, info.Hash)
default: // controller_common.CheckpointStorageTypePVC
// PVC storage: location is the checkpoint directory
// k8s-runc-bypass expects: /checkpoints/{hash}/ (directory with checkpoint data)
info.StorageType = storageTypeToAPI(storageType)
default: // PVC
info.StorageType = nvidiacomv1alpha1.DynamoCheckpointStorageType(storageType)
basePath := getPVCBasePath(storageConfig)
if storageConfig == nil || storageConfig.PVC.PVCName == "" {
return fmt.Errorf("PVC storage type selected but no PVC name configured (set checkpoint.storage.pvc.pvcName)")
......@@ -515,31 +428,22 @@ func InjectCheckpointIntoPodSpec(
}
info.Location = fmt.Sprintf("%s/%s", basePath, info.Hash)
// Inject PVC volume and mount (only for PVC storage)
InjectCheckpointVolume(podSpec, pvcName)
InjectCheckpointVolumeMount(mainContainer, basePath)
}
// Inject signal volume for CRIU mount namespace consistency
// Even though restore pods don't use the signal file, they need it mounted
// to match the checkpoint job's mount namespace for CRIU compatibility
InjectCheckpointSignalVolume(podSpec, checkpointConfig)
InjectCheckpointSignalVolumeMount(mainContainer)
// Inject Downward API volume for pod identity after CRIU restore
// CRIU preserves environment variables from checkpoint time, so pod identity
// env vars (POD_NAME, POD_UID, POD_NAMESPACE) contain stale values.
// The Dynamo runtime reads from /etc/podinfo/ files first to get correct identity.
// Downward API volume for pod identity after CRIU restore
InjectPodInfoVolume(podSpec)
InjectPodInfoVolumeMount(mainContainer)
// Inject checkpoint environment variables (for all storage types)
// Checkpoint environment variables (path, hash)
InjectCheckpointEnvVars(mainContainer, info, checkpointConfig)
return nil
}
// InjectCheckpointLabelsFromConfig adds checkpoint labels to a label map based on config
// InjectCheckpointLabelsFromConfig adds checkpoint identity labels to a label map based on config.
// Restore trigger labels are injected only when a concrete restore request is prepared.
func InjectCheckpointLabelsFromConfig(labels map[string]string, config *nvidiacomv1alpha1.ServiceCheckpointConfig) (map[string]string, error) {
if config == nil || !config.Enabled {
return labels, nil
......
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package checkpoint
import (
"context"
"testing"
nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
controller_common "github.com/ai-dynamo/dynamo/deploy/operator/internal/controller_common"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
)
const (
testHash = "abc123def4567890"
testNamespace = "default"
)
func testPVCConfig() *controller_common.CheckpointConfig {
return &controller_common.CheckpointConfig{
Enabled: true,
Storage: controller_common.CheckpointStorageConfig{
Type: controller_common.CheckpointStorageTypePVC,
PVC: controller_common.CheckpointPVCConfig{
PVCName: "chrek-pvc",
BasePath: "/checkpoints",
},
},
}
}
func testIdentity() nvidiacomv1alpha1.DynamoCheckpointIdentity {
return nvidiacomv1alpha1.DynamoCheckpointIdentity{
Model: "meta-llama/Llama-2-7b-hf",
BackendFramework: "vllm",
}
}
func testPodSpec() *corev1.PodSpec {
return &corev1.PodSpec{
Containers: []corev1.Container{{
Name: consts.MainContainerName,
Image: "test-image:latest",
Command: []string{"python3"},
Args: []string{"-m", "dynamo.vllm"},
}},
}
}
func testScheme() *runtime.Scheme {
s := runtime.NewScheme()
_ = nvidiacomv1alpha1.AddToScheme(s)
_ = corev1.AddToScheme(s)
return s
}
func testInfo() *CheckpointInfo {
return &CheckpointInfo{Enabled: true, Hash: testHash}
}
// --- Helper function tests ---
func TestHelpers(t *testing.T) {
// GetPVCBasePath
assert.Equal(t, "", GetPVCBasePath(nil))
assert.Equal(t, "/checkpoints", GetPVCBasePath(testPVCConfig()))
// getCheckpointInfoFromCheckpoint — ready
ckpt := &nvidiacomv1alpha1.DynamoCheckpoint{
ObjectMeta: metav1.ObjectMeta{Name: "ckpt-abc"},
Spec: nvidiacomv1alpha1.DynamoCheckpointSpec{Identity: testIdentity()},
Status: nvidiacomv1alpha1.DynamoCheckpointStatus{
Phase: nvidiacomv1alpha1.DynamoCheckpointPhaseReady, IdentityHash: testHash,
Location: "/checkpoints/" + testHash, StorageType: "pvc",
},
}
info := getCheckpointInfoFromCheckpoint(ckpt)
assert.True(t, info.Enabled)
assert.True(t, info.Ready)
assert.Equal(t, testHash, info.Hash)
assert.Equal(t, "/checkpoints/"+testHash, info.Location)
// getCheckpointInfoFromCheckpoint — not ready
ckpt.Status.Phase = nvidiacomv1alpha1.DynamoCheckpointPhaseCreating
info = getCheckpointInfoFromCheckpoint(ckpt)
assert.False(t, info.Ready)
}
// --- Injection idempotency tests ---
func TestInjectionIdempotency(t *testing.T) {
// Volume injection is idempotent
podSpec := &corev1.PodSpec{Volumes: []corev1.Volume{{Name: consts.CheckpointVolumeName}, {Name: consts.PodInfoVolumeName}}}
InjectCheckpointVolume(podSpec, "chrek-pvc")
InjectPodInfoVolume(podSpec)
assert.Len(t, podSpec.Volumes, 2)
// Mount injection is idempotent
container := &corev1.Container{VolumeMounts: []corev1.VolumeMount{
{Name: consts.CheckpointVolumeName}, {Name: consts.PodInfoVolumeName},
}}
InjectCheckpointVolumeMount(container, "/checkpoints")
InjectPodInfoVolumeMount(container)
assert.Len(t, container.VolumeMounts, 2)
}
// --- InjectCheckpointEnvVars tests ---
func TestInjectCheckpointEnvVars(t *testing.T) {
t.Run("PVC storage injects PATH and HASH", func(t *testing.T) {
container := &corev1.Container{}
InjectCheckpointEnvVars(container, testInfo(), testPVCConfig())
envMap := make(map[string]string, len(container.Env))
for _, e := range container.Env {
envMap[e.Name] = e.Value
}
assert.Equal(t, "/checkpoints", envMap[consts.EnvCheckpointPath])
assert.Equal(t, testHash, envMap[consts.EnvCheckpointHash])
_, hasLocation := envMap[consts.EnvCheckpointLocation]
assert.False(t, hasLocation)
})
t.Run("S3 storage injects LOCATION and HASH", func(t *testing.T) {
container := &corev1.Container{}
info := &CheckpointInfo{Enabled: true, Hash: testHash, Location: "s3://bucket/" + testHash + ".tar"}
config := &controller_common.CheckpointConfig{
Storage: controller_common.CheckpointStorageConfig{
Type: controller_common.CheckpointStorageTypeS3,
S3: controller_common.CheckpointS3Config{URI: "s3://bucket"},
},
}
InjectCheckpointEnvVars(container, info, config)
envMap := make(map[string]string, len(container.Env))
for _, e := range container.Env {
envMap[e.Name] = e.Value
}
assert.Equal(t, "s3://bucket/"+testHash+".tar", envMap[consts.EnvCheckpointLocation])
assert.Equal(t, testHash, envMap[consts.EnvCheckpointHash])
})
t.Run("disabled is a no-op", func(t *testing.T) {
container := &corev1.Container{}
InjectCheckpointEnvVars(container, &CheckpointInfo{Enabled: false}, testPVCConfig())
assert.Empty(t, container.Env)
})
t.Run("preserves existing env vars", func(t *testing.T) {
container := &corev1.Container{Env: []corev1.EnvVar{{Name: "EXISTING", Value: "keep"}}}
InjectCheckpointEnvVars(container, testInfo(), testPVCConfig())
envMap := make(map[string]string, len(container.Env))
for _, e := range container.Env {
envMap[e.Name] = e.Value
}
assert.Equal(t, "keep", envMap["EXISTING"])
assert.Equal(t, testHash, envMap[consts.EnvCheckpointHash])
})
}
// --- InjectCheckpointLabelsFromConfig tests ---
func TestInjectCheckpointLabelsFromConfig(t *testing.T) {
// Disabled/nil configs are no-ops
for _, cfg := range []*nvidiacomv1alpha1.ServiceCheckpointConfig{nil, {Enabled: false}} {
labels := map[string]string{"existing": "value"}
result, err := InjectCheckpointLabelsFromConfig(labels, cfg)
require.NoError(t, err)
assert.Equal(t, map[string]string{"existing": "value"}, result)
}
// Enabled with identity adds hash label
identity := testIdentity()
result, err := InjectCheckpointLabelsFromConfig(nil, &nvidiacomv1alpha1.ServiceCheckpointConfig{
Enabled: true, Identity: &identity,
})
require.NoError(t, err)
hash, ok := result[consts.KubeLabelCheckpointHash]
assert.True(t, ok)
assert.Len(t, hash, 16)
// Enabled without identity does not add hash
result, err = InjectCheckpointLabelsFromConfig(map[string]string{}, &nvidiacomv1alpha1.ServiceCheckpointConfig{Enabled: true})
require.NoError(t, err)
_, ok = result[consts.KubeLabelCheckpointHash]
assert.False(t, ok)
}
// --- InjectCheckpointIntoPodSpec tests ---
func TestInjectCheckpointIntoPodSpec(t *testing.T) {
t.Run("nil or disabled info is a no-op", func(t *testing.T) {
for _, info := range []*CheckpointInfo{nil, {Enabled: false}} {
podSpec := testPodSpec()
require.NoError(t, InjectCheckpointIntoPodSpec(podSpec, info, testPVCConfig()))
assert.Equal(t, []string{"python3"}, podSpec.Containers[0].Command)
}
})
t.Run("ready checkpoint overrides command to sleep infinity", func(t *testing.T) {
podSpec := testPodSpec()
info := &CheckpointInfo{Enabled: true, Ready: true, Hash: testHash}
require.NoError(t, InjectCheckpointIntoPodSpec(podSpec, info, testPVCConfig()))
assert.Equal(t, []string{"sleep", "infinity"}, podSpec.Containers[0].Command)
assert.Nil(t, podSpec.Containers[0].Args)
})
t.Run("not-ready checkpoint preserves original command", func(t *testing.T) {
podSpec := testPodSpec()
require.NoError(t, InjectCheckpointIntoPodSpec(podSpec, testInfo(), testPVCConfig()))
assert.Equal(t, []string{"python3"}, podSpec.Containers[0].Command)
})
t.Run("sets seccomp profile", func(t *testing.T) {
podSpec := testPodSpec()
require.NoError(t, InjectCheckpointIntoPodSpec(podSpec, testInfo(), testPVCConfig()))
require.NotNil(t, podSpec.SecurityContext)
require.NotNil(t, podSpec.SecurityContext.SeccompProfile)
assert.Equal(t, corev1.SeccompProfileTypeLocalhost, podSpec.SecurityContext.SeccompProfile.Type)
assert.Equal(t, consts.SeccompProfilePath, *podSpec.SecurityContext.SeccompProfile.LocalhostProfile)
})
t.Run("preserves existing security context", func(t *testing.T) {
podSpec := testPodSpec()
podSpec.SecurityContext = &corev1.PodSecurityContext{RunAsUser: ptr.To(int64(1000))}
require.NoError(t, InjectCheckpointIntoPodSpec(podSpec, testInfo(), testPVCConfig()))
assert.Equal(t, int64(1000), *podSpec.SecurityContext.RunAsUser)
require.NotNil(t, podSpec.SecurityContext.SeccompProfile)
})
t.Run("PVC storage injects volumes, mounts, and env vars", func(t *testing.T) {
podSpec := testPodSpec()
require.NoError(t, InjectCheckpointIntoPodSpec(podSpec, testInfo(), testPVCConfig()))
// Volumes
volNames := make(map[string]bool)
for _, v := range podSpec.Volumes {
volNames[v.Name] = true
if v.Name == consts.CheckpointVolumeName {
assert.Equal(t, "chrek-pvc", v.PersistentVolumeClaim.ClaimName)
}
}
assert.True(t, volNames[consts.CheckpointVolumeName])
assert.True(t, volNames[consts.PodInfoVolumeName])
// Mounts
mountPaths := make(map[string]string)
for _, m := range podSpec.Containers[0].VolumeMounts {
mountPaths[m.Name] = m.MountPath
}
assert.Equal(t, "/checkpoints", mountPaths[consts.CheckpointVolumeName])
assert.Equal(t, consts.PodInfoMountPath, mountPaths[consts.PodInfoVolumeName])
// Env
envMap := make(map[string]string, len(podSpec.Containers[0].Env))
for _, e := range podSpec.Containers[0].Env {
envMap[e.Name] = e.Value
}
assert.Equal(t, "/checkpoints", envMap[consts.EnvCheckpointPath])
assert.Equal(t, testHash, envMap[consts.EnvCheckpointHash])
})
t.Run("computes hash from identity when hash is empty", func(t *testing.T) {
podSpec := testPodSpec()
identity := testIdentity()
info := &CheckpointInfo{Enabled: true, Identity: &identity}
require.NoError(t, InjectCheckpointIntoPodSpec(podSpec, info, testPVCConfig()))
assert.Len(t, info.Hash, 16)
})
t.Run("S3 and OCI storage set location", func(t *testing.T) {
for _, tc := range []struct {
storageType string
config controller_common.CheckpointStorageConfig
wantLoc string
}{
{"s3", controller_common.CheckpointStorageConfig{
Type: controller_common.CheckpointStorageTypeS3,
S3: controller_common.CheckpointS3Config{URI: "s3://bucket/prefix"},
}, "s3://bucket/prefix/" + testHash + ".tar"},
{"oci", controller_common.CheckpointStorageConfig{
Type: controller_common.CheckpointStorageTypeOCI,
OCI: controller_common.CheckpointOCIConfig{URI: "oci://registry/repo"},
}, "oci://registry/repo:" + testHash},
} {
t.Run(tc.storageType, func(t *testing.T) {
podSpec := testPodSpec()
info := &CheckpointInfo{Enabled: true, Hash: testHash}
require.NoError(t, InjectCheckpointIntoPodSpec(podSpec, info, &controller_common.CheckpointConfig{Storage: tc.config}))
assert.Equal(t, tc.wantLoc, info.Location)
})
}
})
t.Run("error cases", func(t *testing.T) {
for _, tc := range []struct {
name string
podSpec *corev1.PodSpec
info *CheckpointInfo
config *controller_common.CheckpointConfig
errMsg string
}{
{"hash empty and identity nil", testPodSpec(), &CheckpointInfo{Enabled: true}, testPVCConfig(), "identity is nil"},
{"no containers", &corev1.PodSpec{}, testInfo(), testPVCConfig(), "no container found"},
{"PVC name missing", testPodSpec(), testInfo(), &controller_common.CheckpointConfig{
Storage: controller_common.CheckpointStorageConfig{Type: "pvc", PVC: controller_common.CheckpointPVCConfig{BasePath: "/checkpoints"}},
}, "no PVC name"},
{"PVC base path missing", testPodSpec(), testInfo(), &controller_common.CheckpointConfig{
Storage: controller_common.CheckpointStorageConfig{Type: "pvc", PVC: controller_common.CheckpointPVCConfig{PVCName: "chrek-pvc"}},
}, "no PVC base path"},
{"S3 URI missing", testPodSpec(), testInfo(), &controller_common.CheckpointConfig{
Storage: controller_common.CheckpointStorageConfig{Type: "s3"},
}, "S3"},
{"OCI URI missing", testPodSpec(), testInfo(), &controller_common.CheckpointConfig{
Storage: controller_common.CheckpointStorageConfig{Type: "oci"},
}, "OCI"},
} {
t.Run(tc.name, func(t *testing.T) {
err := InjectCheckpointIntoPodSpec(tc.podSpec, tc.info, tc.config)
require.Error(t, err)
assert.Contains(t, err.Error(), tc.errMsg)
})
}
})
t.Run("falls back to first container when main not found", func(t *testing.T) {
podSpec := &corev1.PodSpec{Containers: []corev1.Container{{Name: "sidecar", Image: "img", Command: []string{"python3"}}}}
info := &CheckpointInfo{Enabled: true, Ready: true, Hash: testHash}
require.NoError(t, InjectCheckpointIntoPodSpec(podSpec, info, testPVCConfig()))
assert.Equal(t, []string{"sleep", "infinity"}, podSpec.Containers[0].Command)
})
}
// --- ResolveCheckpointForService tests ---
func TestResolveCheckpointForService(t *testing.T) {
ctx := context.Background()
s := testScheme()
t.Run("nil or disabled config returns disabled", func(t *testing.T) {
c := fake.NewClientBuilder().WithScheme(s).Build()
for _, cfg := range []*nvidiacomv1alpha1.ServiceCheckpointConfig{nil, {Enabled: false}} {
info, err := ResolveCheckpointForService(ctx, c, testNamespace, cfg)
require.NoError(t, err)
assert.False(t, info.Enabled)
}
})
t.Run("checkpointRef resolves ready CR", func(t *testing.T) {
ckpt := &nvidiacomv1alpha1.DynamoCheckpoint{
ObjectMeta: metav1.ObjectMeta{Name: "my-ckpt", Namespace: testNamespace},
Spec: nvidiacomv1alpha1.DynamoCheckpointSpec{Identity: testIdentity()},
Status: nvidiacomv1alpha1.DynamoCheckpointStatus{
Phase: nvidiacomv1alpha1.DynamoCheckpointPhaseReady, IdentityHash: testHash,
Location: "/checkpoints/" + testHash, StorageType: "pvc",
},
}
c := fake.NewClientBuilder().WithScheme(s).WithObjects(ckpt).WithStatusSubresource(ckpt).Build()
ref := "my-ckpt"
info, err := ResolveCheckpointForService(ctx, c, testNamespace, &nvidiacomv1alpha1.ServiceCheckpointConfig{
Enabled: true, CheckpointRef: &ref,
})
require.NoError(t, err)
assert.True(t, info.Ready)
assert.Equal(t, testHash, info.Hash)
assert.Equal(t, "/checkpoints/"+testHash, info.Location)
})
t.Run("checkpointRef resolves not-ready CR", func(t *testing.T) {
ckpt := &nvidiacomv1alpha1.DynamoCheckpoint{
ObjectMeta: metav1.ObjectMeta{Name: "pending-ckpt", Namespace: testNamespace},
Spec: nvidiacomv1alpha1.DynamoCheckpointSpec{Identity: testIdentity()},
Status: nvidiacomv1alpha1.DynamoCheckpointStatus{Phase: nvidiacomv1alpha1.DynamoCheckpointPhaseCreating},
}
c := fake.NewClientBuilder().WithScheme(s).WithObjects(ckpt).WithStatusSubresource(ckpt).Build()
ref := "pending-ckpt"
info, err := ResolveCheckpointForService(ctx, c, testNamespace, &nvidiacomv1alpha1.ServiceCheckpointConfig{
Enabled: true, CheckpointRef: &ref,
})
require.NoError(t, err)
assert.False(t, info.Ready)
})
t.Run("checkpointRef errors when CR not found", func(t *testing.T) {
c := fake.NewClientBuilder().WithScheme(s).Build()
ref := "nonexistent"
_, err := ResolveCheckpointForService(ctx, c, testNamespace, &nvidiacomv1alpha1.ServiceCheckpointConfig{
Enabled: true, CheckpointRef: &ref,
})
assert.ErrorContains(t, err, "nonexistent")
})
t.Run("identity lookup finds existing checkpoint by label", func(t *testing.T) {
identity := testIdentity()
hash, err := ComputeIdentityHash(identity)
require.NoError(t, err)
ckpt := &nvidiacomv1alpha1.DynamoCheckpoint{
ObjectMeta: metav1.ObjectMeta{
Name: hash, Namespace: testNamespace,
Labels: map[string]string{consts.KubeLabelCheckpointHash: hash},
},
Spec: nvidiacomv1alpha1.DynamoCheckpointSpec{Identity: identity},
Status: nvidiacomv1alpha1.DynamoCheckpointStatus{
Phase: nvidiacomv1alpha1.DynamoCheckpointPhaseReady, IdentityHash: hash,
Location: "/checkpoints/" + hash, StorageType: "pvc",
},
}
c := fake.NewClientBuilder().WithScheme(s).WithObjects(ckpt).WithStatusSubresource(ckpt).Build()
info, err := ResolveCheckpointForService(ctx, c, testNamespace, &nvidiacomv1alpha1.ServiceCheckpointConfig{
Enabled: true, Identity: &identity,
})
require.NoError(t, err)
assert.True(t, info.Ready)
assert.Equal(t, hash, info.Hash)
})
t.Run("identity lookup returns not-ready when no CR found", func(t *testing.T) {
c := fake.NewClientBuilder().WithScheme(s).Build()
identity := testIdentity()
info, err := ResolveCheckpointForService(ctx, c, testNamespace, &nvidiacomv1alpha1.ServiceCheckpointConfig{
Enabled: true, Identity: &identity,
})
require.NoError(t, err)
assert.False(t, info.Ready)
assert.Len(t, info.Hash, 16)
})
t.Run("errors when enabled but no ref and no identity", func(t *testing.T) {
c := fake.NewClientBuilder().WithScheme(s).Build()
_, err := ResolveCheckpointForService(ctx, c, testNamespace, &nvidiacomv1alpha1.ServiceCheckpointConfig{Enabled: true})
assert.ErrorContains(t, err, "no checkpointRef or identity")
})
}
......@@ -132,24 +132,19 @@ const (
// deploy/chrek/pkg/config/constants.go. If you change a value here, update there too.
// Kubernetes labels
KubeLabelCheckpointSource = "nvidia.com/checkpoint-source" // Pod label that triggers DaemonSet auto-checkpoint
KubeLabelCheckpointHash = "nvidia.com/checkpoint-hash" // Checkpoint identity hash for deduplication
KubeLabelCheckpointName = "nvidia.com/checkpoint-name" // DynamoCheckpoint CR name reference
KubeLabelIsCheckpointSource = "nvidia.com/chrek-is-checkpoint-source" // Pod label that triggers DaemonSet auto-checkpoint
KubeLabelCheckpointHash = "nvidia.com/chrek-checkpoint-hash" // Checkpoint identity hash (= DynamoCheckpoint CR name)
KubeLabelIsRestoreTarget = "nvidia.com/chrek-is-restore-target" // Pod label that triggers DaemonSet auto-restore
// Environment variables injected into pods
EnvCheckpointStorageType = "DYN_CHECKPOINT_STORAGE_TYPE" // Storage backend (pvc, s3, oci) — checkpoint job pods only
EnvCheckpointLocation = "DYN_CHECKPOINT_LOCATION" // Full checkpoint URI — future S3/OCI; for PVC, use PATH+HASH instead
EnvCheckpointPath = "DYN_CHECKPOINT_PATH" // Base checkpoint directory (e.g., /checkpoints) — PVC restored pods
EnvCheckpointHash = "DYN_CHECKPOINT_HASH" // Identity hash — all checkpoint-related pods
EnvCheckpointSignalFile = "DYN_CHECKPOINT_SIGNAL_FILE" // Signal file path — checkpoint job pods
EnvReadyForCheckpointFile = "DYN_READY_FOR_CHECKPOINT_FILE" // Ready-for-checkpoint file path — checkpoint job pods
EnvRestoreMarkerFile = "DYN_RESTORE_MARKER_FILE" // Restore marker path — injected into restore and checkpoint job pods
EnvSkipWaitForCheckpoint = "SKIP_WAIT_FOR_CHECKPOINT" // Skip polling, check once — restored/DGD pods
// Checkpoint pod-internal constants
CheckpointVolumeName = "checkpoint-storage" // Pod-internal volume name for checkpoint PVC
CheckpointSignalVolumeName = "checkpoint-signal" // Pod-internal volume name for signal hostPath
CheckpointSignalMountPath = "/checkpoint-signal" // Mount path for signal volume inside pods
SignalFileCleanupInitContainerName = "cleanup-signal-file" // Init container that removes stale signal files before job starts
CheckpointVolumeName = "checkpoint-storage" // Pod-internal volume name for checkpoint PVC
// SeccompProfilePath is the localhost seccomp profile that blocks io_uring syscalls.
// Deployed to nodes by the chrek DaemonSet init container.
......
......@@ -207,14 +207,7 @@ func (r *CheckpointReconciler) handleCreating(ctx context.Context, ckpt *nvidiac
Type: string(nvidiacomv1alpha1.DynamoCheckpointConditionJobCompleted),
Status: metav1.ConditionTrue,
Reason: "JobSucceeded",
Message: "Checkpoint job completed successfully",
LastTransitionTime: metav1.Now(),
})
meta.SetStatusCondition(&ckpt.Status.Conditions, metav1.Condition{
Type: string(nvidiacomv1alpha1.DynamoCheckpointConditionTarAvailable),
Status: metav1.ConditionTrue,
Reason: "TarCreated",
Message: fmt.Sprintf("Checkpoint available at %s", ckpt.Status.Location),
Message: fmt.Sprintf("Checkpoint job completed, available at %s", ckpt.Status.Location),
LastTransitionTime: metav1.Now(),
})
......@@ -264,46 +257,8 @@ func (r *CheckpointReconciler) buildCheckpointJob(ckpt *nvidiacomv1alpha1.Dynamo
if podTemplate.Labels == nil {
podTemplate.Labels = make(map[string]string)
}
podTemplate.Labels[consts.KubeLabelCheckpointName] = ckpt.Name
podTemplate.Labels[consts.KubeLabelCheckpointHash] = ckpt.Status.IdentityHash
podTemplate.Labels[consts.KubeLabelCheckpointSource] = "true"
// Add signal volume (hostPath for communication with DaemonSet)
// The DaemonSet writes a signal file after checkpoint is complete
hostPathType := corev1.HostPathDirectoryOrCreate
podTemplate.Spec.Volumes = append(podTemplate.Spec.Volumes, corev1.Volume{
Name: consts.CheckpointSignalVolumeName,
VolumeSource: corev1.VolumeSource{
HostPath: &corev1.HostPathVolumeSource{
Path: r.Config.Checkpoint.Storage.SignalHostPath,
Type: &hostPathType,
},
},
})
// Compute the signal file path - unique per checkpoint hash
signalFilePath := consts.CheckpointSignalMountPath + "/" + ckpt.Status.IdentityHash
// Add initContainer to clean up any leftover signal file from previous runs
// This ensures a fresh start for each checkpoint job without affecting the checkpoint itself
// InitContainers complete before the main container starts, so they don't appear in the checkpoint
initContainerImage := r.Config.Checkpoint.InitContainerImage
podTemplate.Spec.InitContainers = append(podTemplate.Spec.InitContainers, corev1.Container{
Name: consts.SignalFileCleanupInitContainerName,
Image: initContainerImage,
Command: []string{
"sh",
"-c",
fmt.Sprintf("rm -f %s || true; echo 'Signal file cleanup complete'", signalFilePath),
},
VolumeMounts: []corev1.VolumeMount{
{
Name: consts.CheckpointSignalVolumeName,
MountPath: consts.CheckpointSignalMountPath,
},
},
})
podTemplate.Labels[consts.KubeLabelIsCheckpointSource] = "true"
// Add checkpoint env vars and volume mounts to main container
if len(podTemplate.Spec.Containers) > 0 {
......@@ -315,11 +270,6 @@ func (r *CheckpointReconciler) buildCheckpointJob(ckpt *nvidiacomv1alpha1.Dynamo
// Add checkpoint-related env vars
mainContainer.Env = append(mainContainer.Env,
// Signal file: DaemonSet writes this after checkpoint completes
corev1.EnvVar{
Name: consts.EnvCheckpointSignalFile,
Value: signalFilePath,
},
// Ready file: Worker creates this when model is loaded
corev1.EnvVar{
Name: consts.EnvReadyForCheckpointFile,
......@@ -340,19 +290,6 @@ func (r *CheckpointReconciler) buildCheckpointJob(ckpt *nvidiacomv1alpha1.Dynamo
Name: consts.EnvCheckpointStorageType,
Value: storageType,
},
// Restore marker: Written by restore-entrypoint after CRIU restore
corev1.EnvVar{
Name: consts.EnvRestoreMarkerFile,
Value: r.Config.Checkpoint.RestoreMarkerFilePath,
},
)
// Add signal volume mount (required for DaemonSet communication)
mainContainer.VolumeMounts = append(mainContainer.VolumeMounts,
corev1.VolumeMount{
Name: consts.CheckpointSignalVolumeName,
MountPath: consts.CheckpointSignalMountPath,
},
)
// Add checkpoint PVC volume and mount for mount namespace consistency with restore pods
......@@ -423,7 +360,6 @@ func (r *CheckpointReconciler) buildCheckpointJob(ckpt *nvidiacomv1alpha1.Dynamo
Name: jobName,
Namespace: ckpt.Namespace,
Labels: map[string]string{
consts.KubeLabelCheckpointName: ckpt.Name,
consts.KubeLabelCheckpointHash: ckpt.Status.IdentityHash,
},
},
......
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package controller
import (
"context"
"testing"
nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
controller_common "github.com/ai-dynamo/dynamo/deploy/operator/internal/controller_common"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
)
const (
testHash = "abc123def4567890"
testNamespace = "default"
)
func checkpointTestScheme() *runtime.Scheme {
s := runtime.NewScheme()
_ = nvidiacomv1alpha1.AddToScheme(s)
_ = corev1.AddToScheme(s)
_ = batchv1.AddToScheme(s)
return s
}
func checkpointTestConfig() controller_common.Config {
return controller_common.Config{
Checkpoint: controller_common.CheckpointConfig{
Enabled: true,
ReadyForCheckpointFilePath: "/tmp/ready-for-checkpoint",
Storage: controller_common.CheckpointStorageConfig{
Type: controller_common.CheckpointStorageTypePVC,
PVC: controller_common.CheckpointPVCConfig{
PVCName: "chrek-pvc",
BasePath: "/checkpoints",
},
},
},
}
}
func makeCheckpointReconciler(s *runtime.Scheme, objs ...client.Object) *CheckpointReconciler {
return &CheckpointReconciler{
Client: fake.NewClientBuilder().WithScheme(s).WithObjects(objs...).WithStatusSubresource(&nvidiacomv1alpha1.DynamoCheckpoint{}).Build(),
Config: checkpointTestConfig(),
Recorder: record.NewFakeRecorder(10),
}
}
func makeTestCheckpoint(name string, phase nvidiacomv1alpha1.DynamoCheckpointPhase) *nvidiacomv1alpha1.DynamoCheckpoint {
return &nvidiacomv1alpha1.DynamoCheckpoint{
ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: testNamespace},
Spec: nvidiacomv1alpha1.DynamoCheckpointSpec{
Identity: nvidiacomv1alpha1.DynamoCheckpointIdentity{
Model: "meta-llama/Llama-2-7b-hf",
BackendFramework: "vllm",
},
Job: nvidiacomv1alpha1.DynamoCheckpointJobConfig{
PodTemplateSpec: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Name: "main",
Image: "test-image:latest",
Command: []string{"python3", "-m", "dynamo.vllm"},
Env: []corev1.EnvVar{{Name: "HF_TOKEN", Value: "secret"}},
}},
},
},
},
},
Status: nvidiacomv1alpha1.DynamoCheckpointStatus{Phase: phase},
}
}
func TestBuildCheckpointJob(t *testing.T) {
s := checkpointTestScheme()
ckpt := makeTestCheckpoint("test-ckpt", nvidiacomv1alpha1.DynamoCheckpointPhasePending)
ckpt.Status.IdentityHash = testHash
r := makeCheckpointReconciler(s, ckpt)
job := r.buildCheckpointJob(ckpt, "checkpoint-test-ckpt")
podSpec := job.Spec.Template.Spec
main := podSpec.Containers[0]
// Job and pod template labels
assert.Equal(t, testHash, job.Labels[consts.KubeLabelCheckpointHash])
assert.Equal(t, "true", job.Spec.Template.Labels[consts.KubeLabelIsCheckpointSource])
assert.Equal(t, testHash, job.Spec.Template.Labels[consts.KubeLabelCheckpointHash])
// Env vars (checkpoint-specific + user-provided preserved)
envMap := make(map[string]string, len(main.Env))
for _, e := range main.Env {
envMap[e.Name] = e.Value
}
assert.Equal(t, "/tmp/ready-for-checkpoint", envMap[consts.EnvReadyForCheckpointFile])
assert.Equal(t, testHash, envMap[consts.EnvCheckpointHash])
assert.Equal(t, "/checkpoints/"+testHash, envMap[consts.EnvCheckpointLocation])
assert.Equal(t, "pvc", envMap[consts.EnvCheckpointStorageType])
assert.Equal(t, "secret", envMap["HF_TOKEN"])
// Seccomp profile
require.NotNil(t, podSpec.SecurityContext)
require.NotNil(t, podSpec.SecurityContext.SeccompProfile)
assert.Equal(t, corev1.SeccompProfileTypeLocalhost, podSpec.SecurityContext.SeccompProfile.Type)
assert.Equal(t, consts.SeccompProfilePath, *podSpec.SecurityContext.SeccompProfile.LocalhostProfile)
// Probes: readiness set, liveness/startup cleared
require.NotNil(t, main.ReadinessProbe)
assert.Equal(t, []string{"cat", "/tmp/ready-for-checkpoint"}, main.ReadinessProbe.Exec.Command)
assert.Nil(t, main.LivenessProbe)
assert.Nil(t, main.StartupProbe)
// Checkpoint PVC volume + mount
volNames := make(map[string]bool)
for _, v := range podSpec.Volumes {
volNames[v.Name] = true
if v.Name == consts.CheckpointVolumeName {
require.NotNil(t, v.PersistentVolumeClaim)
assert.Equal(t, "chrek-pvc", v.PersistentVolumeClaim.ClaimName)
}
if v.Name == consts.PodInfoVolumeName {
require.NotNil(t, v.DownwardAPI)
}
}
assert.True(t, volNames[consts.CheckpointVolumeName])
assert.True(t, volNames[consts.PodInfoVolumeName])
mountPaths := make(map[string]string)
for _, m := range main.VolumeMounts {
mountPaths[m.Name] = m.MountPath
}
assert.Equal(t, "/checkpoints", mountPaths[consts.CheckpointVolumeName])
assert.Equal(t, consts.PodInfoMountPath, mountPaths[consts.PodInfoVolumeName])
// Restart policy, user image/command preserved
assert.Equal(t, corev1.RestartPolicyNever, podSpec.RestartPolicy)
assert.Equal(t, "test-image:latest", main.Image)
assert.Equal(t, []string{"python3", "-m", "dynamo.vllm"}, main.Command)
// Default deadlines
assert.Equal(t, int64(3600), *job.Spec.ActiveDeadlineSeconds)
assert.Equal(t, int32(3), *job.Spec.BackoffLimit)
assert.Equal(t, int32(300), *job.Spec.TTLSecondsAfterFinished)
// Custom deadlines override defaults
deadline := int64(7200)
backoff := int32(5)
ttl := int32(600)
ckpt.Spec.Job.ActiveDeadlineSeconds = &deadline
ckpt.Spec.Job.BackoffLimit = &backoff
ckpt.Spec.Job.TTLSecondsAfterFinished = &ttl
job = r.buildCheckpointJob(ckpt, "checkpoint-test-ckpt")
assert.Equal(t, int64(7200), *job.Spec.ActiveDeadlineSeconds)
assert.Equal(t, int32(5), *job.Spec.BackoffLimit)
assert.Equal(t, int32(600), *job.Spec.TTLSecondsAfterFinished)
}
func TestCheckpointReconciler_Reconcile(t *testing.T) {
s := checkpointTestScheme()
ctx := context.Background()
t.Run("not found returns no error", func(t *testing.T) {
r := makeCheckpointReconciler(s)
result, err := r.Reconcile(ctx, ctrl.Request{
NamespacedName: types.NamespacedName{Name: "nonexistent", Namespace: testNamespace},
})
require.NoError(t, err)
assert.Equal(t, ctrl.Result{}, result)
})
t.Run("new CR computes hash and sets Pending", func(t *testing.T) {
ckpt := makeTestCheckpoint("new-ckpt", "")
r := makeCheckpointReconciler(s, ckpt)
_, err := r.Reconcile(ctx, ctrl.Request{
NamespacedName: types.NamespacedName{Name: "new-ckpt", Namespace: testNamespace},
})
require.NoError(t, err)
updated := &nvidiacomv1alpha1.DynamoCheckpoint{}
require.NoError(t, r.Get(ctx, types.NamespacedName{Name: "new-ckpt", Namespace: testNamespace}, updated))
assert.Equal(t, nvidiacomv1alpha1.DynamoCheckpointPhasePending, updated.Status.Phase)
assert.Len(t, updated.Status.IdentityHash, 16)
})
t.Run("Ready phase is a no-op", func(t *testing.T) {
ckpt := makeTestCheckpoint("ready-ckpt", nvidiacomv1alpha1.DynamoCheckpointPhaseReady)
ckpt.Status.IdentityHash = testHash
r := makeCheckpointReconciler(s, ckpt)
result, err := r.Reconcile(ctx, ctrl.Request{
NamespacedName: types.NamespacedName{Name: "ready-ckpt", Namespace: testNamespace},
})
require.NoError(t, err)
assert.Equal(t, ctrl.Result{}, result)
})
t.Run("unknown phase resets to Pending", func(t *testing.T) {
ckpt := makeTestCheckpoint("unknown-ckpt", "SomeUnknownPhase")
ckpt.Status.IdentityHash = testHash
r := makeCheckpointReconciler(s, ckpt)
_, err := r.Reconcile(ctx, ctrl.Request{
NamespacedName: types.NamespacedName{Name: "unknown-ckpt", Namespace: testNamespace},
})
require.NoError(t, err)
updated := &nvidiacomv1alpha1.DynamoCheckpoint{}
require.NoError(t, r.Get(ctx, types.NamespacedName{Name: "unknown-ckpt", Namespace: testNamespace}, updated))
assert.Equal(t, nvidiacomv1alpha1.DynamoCheckpointPhasePending, updated.Status.Phase)
})
}
func TestCheckpointReconciler_HandleCreating(t *testing.T) {
s := checkpointTestScheme()
ctx := context.Background()
// Helper to create a checkpoint CR in Creating phase with a named job
makeCreatingCkpt := func(name, jobName string) *nvidiacomv1alpha1.DynamoCheckpoint {
ckpt := makeTestCheckpoint(name, nvidiacomv1alpha1.DynamoCheckpointPhaseCreating)
ckpt.Status.IdentityHash = testHash
ckpt.Status.JobName = jobName
return ckpt
}
t.Run("succeeded job transitions to Ready", func(t *testing.T) {
ckpt := makeCreatingCkpt("ckpt-ok", "job-ok")
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{Name: "job-ok", Namespace: testNamespace},
Status: batchv1.JobStatus{Succeeded: 1},
}
r := makeCheckpointReconciler(s, ckpt, job)
_, err := r.handleCreating(ctx, ckpt)
require.NoError(t, err)
updated := &nvidiacomv1alpha1.DynamoCheckpoint{}
require.NoError(t, r.Get(ctx, types.NamespacedName{Name: "ckpt-ok", Namespace: testNamespace}, updated))
assert.Equal(t, nvidiacomv1alpha1.DynamoCheckpointPhaseReady, updated.Status.Phase)
assert.Equal(t, "/checkpoints/"+testHash, updated.Status.Location)
assert.Equal(t, nvidiacomv1alpha1.DynamoCheckpointStorageType("pvc"), updated.Status.StorageType)
assert.NotNil(t, updated.Status.CreatedAt)
})
t.Run("failed job transitions to Failed", func(t *testing.T) {
ckpt := makeCreatingCkpt("ckpt-fail", "job-fail")
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{Name: "job-fail", Namespace: testNamespace},
Status: batchv1.JobStatus{
Conditions: []batchv1.JobCondition{{Type: batchv1.JobFailed, Status: corev1.ConditionTrue}},
},
}
r := makeCheckpointReconciler(s, ckpt, job)
_, err := r.handleCreating(ctx, ckpt)
require.NoError(t, err)
updated := &nvidiacomv1alpha1.DynamoCheckpoint{}
require.NoError(t, r.Get(ctx, types.NamespacedName{Name: "ckpt-fail", Namespace: testNamespace}, updated))
assert.Equal(t, nvidiacomv1alpha1.DynamoCheckpointPhaseFailed, updated.Status.Phase)
})
t.Run("running job keeps Creating phase", func(t *testing.T) {
ckpt := makeCreatingCkpt("ckpt-run", "job-run")
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{Name: "job-run", Namespace: testNamespace},
Status: batchv1.JobStatus{Active: 1},
}
r := makeCheckpointReconciler(s, ckpt, job)
_, err := r.handleCreating(ctx, ckpt)
require.NoError(t, err)
updated := &nvidiacomv1alpha1.DynamoCheckpoint{}
require.NoError(t, r.Get(ctx, types.NamespacedName{Name: "ckpt-run", Namespace: testNamespace}, updated))
assert.Equal(t, nvidiacomv1alpha1.DynamoCheckpointPhaseCreating, updated.Status.Phase)
})
t.Run("deleted job resets to Pending", func(t *testing.T) {
ckpt := makeCreatingCkpt("ckpt-del", "job-deleted")
r := makeCheckpointReconciler(s, ckpt) // no job object
_, err := r.handleCreating(ctx, ckpt)
require.NoError(t, err)
updated := &nvidiacomv1alpha1.DynamoCheckpoint{}
require.NoError(t, r.Get(ctx, types.NamespacedName{Name: "ckpt-del", Namespace: testNamespace}, updated))
assert.Equal(t, nvidiacomv1alpha1.DynamoCheckpointPhasePending, updated.Status.Phase)
assert.Empty(t, updated.Status.JobName)
})
}
......@@ -76,6 +76,7 @@ type DynamoComponentDeploymentReconciler struct {
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamocomponentdeployments,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamocomponentdeployments/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamocomponentdeployments/finalizers,verbs=update
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamocheckpoints,verbs=get;list
//+kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch
......@@ -951,6 +952,17 @@ func (r *DynamoComponentDeploymentReconciler) generateDeployment(ctx context.Con
}
}
// Checkpoint-restore pods must avoid overlap with prior replicas.
// Enforce Recreate whenever the rendered template is a restore target so
// the old pod is terminated before the restore placeholder is started.
if podTemplateSpec != nil &&
podTemplateSpec.Labels != nil &&
podTemplateSpec.Labels[commonconsts.KubeLabelIsRestoreTarget] == commonconsts.KubeLabelValueTrue {
strategy = appsv1.DeploymentStrategy{
Type: appsv1.RecreateDeploymentStrategyType,
}
}
kubeDeployment.Spec = appsv1.DeploymentSpec{
Replicas: opt.dynamoComponentDeployment.Spec.Replicas,
Selector: &metav1.LabelSelector{
......@@ -1053,6 +1065,19 @@ func (r *DynamoComponentDeploymentReconciler) generatePodTemplateSpec(ctx contex
maps.Copy(podAnnotations, extraPodMetadata.Annotations)
maps.Copy(podLabels, extraPodMetadata.Labels)
}
// Restore labels are operator-controlled. Clear any stale/user-provided
// value after metadata merge; the controller re-adds it only when the
// checkpoint contract below is satisfied.
delete(podLabels, commonconsts.KubeLabelIsRestoreTarget)
// Explicit restore orchestration contract:
// only mark pods as restore targets when checkpoint material is ready.
if checkpointInfo != nil && checkpointInfo.Enabled && checkpointInfo.Ready {
podLabels[commonconsts.KubeLabelIsRestoreTarget] = commonconsts.KubeLabelValueTrue
if checkpointInfo.Hash != "" {
podLabels[commonconsts.KubeLabelCheckpointHash] = checkpointInfo.Hash
}
}
// Propagate restart annotation to pod template to trigger rolling restart
// This is the same mechanism used by kubectl rollout restart
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment