Unverified Commit 5a3fa1fd authored by Schwinn Saereesitthipitak's avatar Schwinn Saereesitthipitak Committed by GitHub
Browse files

fix(snapshot): add checkpoint and restore timing summaries (#7949)


Signed-off-by: default avatarSchwinn Saereesitthipitak <schwinns@nvidia.com>
parent a27475de
...@@ -31,14 +31,10 @@ func main() { ...@@ -31,14 +31,10 @@ func main() {
CgroupRoot: *cgroupRoot, CgroupRoot: *cgroupRoot,
} }
restoredPID, err := executor.RestoreInNamespace(context.Background(), opts, log) result, err := executor.RestoreInNamespace(context.Background(), opts, log)
if err != nil { if err != nil {
fatal(log, err, "restore failed") fatal(log, err, "restore failed")
} }
result := struct {
RestoredPID int `json:"restoredPID"`
}{RestoredPID: restoredPID}
if err := json.NewEncoder(os.Stdout).Encode(result); err != nil { if err := json.NewEncoder(os.Stdout).Encode(result); err != nil {
fatal(log, err, "Failed to write restore result") fatal(log, err, "Failed to write restore result")
} }
......
...@@ -225,11 +225,12 @@ func (w *NodeController) reconcileCheckpointPod(ctx context.Context, pod *corev1 ...@@ -225,11 +225,12 @@ func (w *NodeController) reconcileCheckpointPod(ctx context.Context, pod *corev1
return return
} }
w.log.Info("Pod ready, triggering checkpoint", "pod", podKey, "checkpoint_id", checkpointID) startedAt := time.Now()
w.log.Info("Checkpoint target detected, triggering checkpoint", "pod", podKey, "checkpoint_id", checkpointID)
emitPodEvent(ctx, w.clientset, w.log, pod, "snapshot", corev1.EventTypeNormal, "CheckpointRequested", fmt.Sprintf("Checkpoint requested: %s", checkpointID)) emitPodEvent(ctx, w.clientset, w.log, pod, "snapshot", corev1.EventTypeNormal, "CheckpointRequested", fmt.Sprintf("Checkpoint requested: %s", checkpointID))
go func() { go func() {
if err := w.runCheckpoint(ctx, pod, job, checkpointID, checkpointLocation, podKey); err != nil { if err := w.runCheckpoint(ctx, pod, job, checkpointID, checkpointLocation, podKey, startedAt); err != nil {
opLog := w.log.WithValues("pod", podKey, "checkpoint_id", checkpointID) opLog := w.log.WithValues("pod", podKey, "checkpoint_id", checkpointID)
opLog.Error(err, "Checkpoint controller worker failed") opLog.Error(err, "Checkpoint controller worker failed")
emitPodEvent(ctx, w.clientset, opLog, pod, "snapshot", corev1.EventTypeWarning, "CheckpointWorkerFailed", err.Error()) emitPodEvent(ctx, w.clientset, opLog, pod, "snapshot", corev1.EventTypeWarning, "CheckpointWorkerFailed", err.Error())
...@@ -299,11 +300,12 @@ func (w *NodeController) reconcileRestorePod(ctx context.Context, pod *corev1.Po ...@@ -299,11 +300,12 @@ func (w *NodeController) reconcileRestorePod(ctx context.Context, pod *corev1.Po
return return
} }
w.log.Info("Restore pod running, triggering external restore", "pod", podKey, "checkpoint_id", checkpointID) startedAt := time.Now()
w.log.Info("Restore target detected, triggering external restore", "pod", podKey, "checkpoint_id", checkpointID)
emitPodEvent(ctx, w.clientset, w.log, pod, "snapshot", corev1.EventTypeNormal, "RestoreRequested", fmt.Sprintf("Restore requested from checkpoint %s", checkpointID)) emitPodEvent(ctx, w.clientset, w.log, pod, "snapshot", corev1.EventTypeNormal, "RestoreRequested", fmt.Sprintf("Restore requested from checkpoint %s", checkpointID))
go func() { go func() {
if err := w.runRestore(ctx, pod, containerName, containerID, checkpointID, checkpointLocation, restoreAttemptKey); err != nil { if err := w.runRestore(ctx, pod, containerName, containerID, checkpointID, checkpointLocation, restoreAttemptKey, startedAt); err != nil {
opLog := w.log.WithValues("pod", podKey, "checkpoint_id", checkpointID) opLog := w.log.WithValues("pod", podKey, "checkpoint_id", checkpointID)
opLog.Error(err, "Restore controller worker failed") opLog.Error(err, "Restore controller worker failed")
emitPodEvent(ctx, w.clientset, opLog, pod, "snapshot", corev1.EventTypeWarning, "RestoreWorkerFailed", err.Error()) emitPodEvent(ctx, w.clientset, opLog, pod, "snapshot", corev1.EventTypeWarning, "RestoreWorkerFailed", err.Error())
...@@ -317,7 +319,7 @@ func (w *NodeController) reconcileRestorePod(ctx context.Context, pod *corev1.Po ...@@ -317,7 +319,7 @@ func (w *NodeController) reconcileRestorePod(ctx context.Context, pod *corev1.Po
// 3. Call executor.Checkpoint (inspect → configure → CUDA lock/checkpoint → CRIU dump → rootfs diff) // 3. Call executor.Checkpoint (inspect → configure → CUDA lock/checkpoint → CRIU dump → rootfs diff)
// 4. SIGUSR1 the process on success (notify workload), SIGKILL on failure (terminate immediately) // 4. SIGUSR1 the process on success (notify workload), SIGKILL on failure (terminate immediately)
// 5. Mark job as completed or failed // 5. Mark job as completed or failed
func (w *NodeController) runCheckpoint(ctx context.Context, pod *corev1.Pod, job *batchv1.Job, checkpointID, checkpointLocation, podKey string) error { func (w *NodeController) runCheckpoint(ctx context.Context, pod *corev1.Pod, job *batchv1.Job, checkpointID, checkpointLocation, podKey string, startedAt time.Time) error {
releasePodOnExit := true releasePodOnExit := true
defer func() { defer func() {
if releasePodOnExit { if releasePodOnExit {
...@@ -396,6 +398,7 @@ func (w *NodeController) runCheckpoint(ctx context.Context, pod *corev1.Pod, job ...@@ -396,6 +398,7 @@ func (w *NodeController) runCheckpoint(ctx context.Context, pod *corev1.Pod, job
ContainerName: containerName, ContainerName: containerName,
CheckpointID: checkpointID, CheckpointID: checkpointID,
CheckpointLocation: checkpointLocation, CheckpointLocation: checkpointLocation,
StartedAt: startedAt,
NodeName: w.config.NodeName, NodeName: w.config.NodeName,
PodName: pod.Name, PodName: pod.Name,
PodNamespace: pod.Namespace, PodNamespace: pod.Namespace,
...@@ -458,7 +461,7 @@ func (w *NodeController) runCheckpoint(ctx context.Context, pod *corev1.Pod, job ...@@ -458,7 +461,7 @@ func (w *NodeController) runCheckpoint(ctx context.Context, pod *corev1.Pod, job
// 3. SIGCONT the restored process to wake it up // 3. SIGCONT the restored process to wake it up
// 4. Wait for the pod to become Ready // 4. Wait for the pod to become Ready
// 5. Mark the container instance as completed // 5. Mark the container instance as completed
func (w *NodeController) runRestore(ctx context.Context, pod *corev1.Pod, containerName, containerID, checkpointID, checkpointLocation, restoreAttemptKey string) error { func (w *NodeController) runRestore(ctx context.Context, pod *corev1.Pod, containerName, containerID, checkpointID, checkpointLocation, restoreAttemptKey string, startedAt time.Time) error {
releaseOnExit := true releaseOnExit := true
defer func() { defer func() {
if releaseOnExit { if releaseOnExit {
...@@ -493,6 +496,7 @@ func (w *NodeController) runRestore(ctx context.Context, pod *corev1.Pod, contai ...@@ -493,6 +496,7 @@ func (w *NodeController) runRestore(ctx context.Context, pod *corev1.Pod, contai
req := executor.RestoreRequest{ req := executor.RestoreRequest{
CheckpointID: checkpointID, CheckpointID: checkpointID,
CheckpointLocation: checkpointLocation, CheckpointLocation: checkpointLocation,
StartedAt: startedAt,
NSRestorePath: w.config.Restore.NSRestorePath, NSRestorePath: w.config.Restore.NSRestorePath,
PodName: pod.Name, PodName: pod.Name,
PodNamespace: pod.Namespace, PodNamespace: pod.Namespace,
......
...@@ -476,7 +476,7 @@ func TestRunCheckpointKeepsLeaseAndInFlightOnTerminalStatusPatchFailure(t *testi ...@@ -476,7 +476,7 @@ func TestRunCheckpointKeepsLeaseAndInFlightOnTerminalStatusPatchFailure(t *testi
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
} }
err := w.runCheckpoint(context.Background(), pod, job, "abc123", filepath.Join(t.TempDir(), "abc123"), "default/test-pod") err := w.runCheckpoint(context.Background(), pod, job, "abc123", filepath.Join(t.TempDir(), "abc123"), "default/test-pod", time.Now())
if err == nil { if err == nil {
t.Fatal("expected terminal checkpoint status update to fail") t.Fatal("expected terminal checkpoint status update to fail")
} }
......
...@@ -71,7 +71,7 @@ func ExecuteRestore( ...@@ -71,7 +71,7 @@ func ExecuteRestore(
defer closeFiles(inheritedFiles) defer closeFiles(inheritedFiles)
notify := &restoreNotify{log: log} notify := &restoreNotify{log: log}
log.Info("Executing go-criu Restore call") log.V(1).Info("Executing go-criu Restore call")
if err := c.Restore(criuOpts, notify); err != nil { if err := c.Restore(criuOpts, notify); err != nil {
log.Error(err, "go-criu Restore returned error") log.Error(err, "go-criu Restore returned error")
logging.LogRestoreErrors(checkpointPath, settings.WorkDir, log) logging.LogRestoreErrors(checkpointPath, settings.WorkDir, log)
...@@ -88,7 +88,7 @@ func BuildRestoreOpts(m *types.CheckpointManifest, checkpointPath string, cgroup ...@@ -88,7 +88,7 @@ func BuildRestoreOpts(m *types.CheckpointManifest, checkpointPath string, cgroup
if err != nil { if err != nil {
return nil, err return nil, err
} }
log.Info("Generated external mount map set", "ext_mount_count", len(extMounts)) log.V(1).Info("Generated external mount map set", "ext_mount_count", len(extMounts))
settings := m.CRIUDump.CRIU settings := m.CRIUDump.CRIU
criuOpts := &criurpc.CriuOpts{ criuOpts := &criurpc.CriuOpts{
...@@ -137,7 +137,7 @@ func buildRestoreExtMounts(m *types.CheckpointManifest) ([]*criurpc.ExtMountMap, ...@@ -137,7 +137,7 @@ func buildRestoreExtMounts(m *types.CheckpointManifest) ([]*criurpc.ExtMountMap,
func registerInheritFDs(c *criulib.Criu, stdioFDs []string, log logr.Logger) []*os.File { func registerInheritFDs(c *criulib.Criu, stdioFDs []string, log logr.Logger) []*os.File {
if len(stdioFDs) == 0 { if len(stdioFDs) == 0 {
log.Info("No stdio FD descriptors in manifest, skipping inherit-fd setup") log.V(1).Info("No stdio FD descriptors in manifest, skipping inherit-fd setup")
return nil return nil
} }
...@@ -161,7 +161,7 @@ func registerInheritFDs(c *criulib.Criu, stdioFDs []string, log logr.Logger) []* ...@@ -161,7 +161,7 @@ func registerInheritFDs(c *criulib.Criu, stdioFDs []string, log logr.Logger) []*
c.AddInheritFd(target, f) c.AddInheritFd(target, f)
} }
log.Info("Registered inherited stdio pipes", "count", len(openFiles)) log.V(1).Info("Registered inherited stdio pipes", "count", len(openFiles))
return openFiles return openFiles
} }
......
...@@ -8,6 +8,7 @@ import ( ...@@ -8,6 +8,7 @@ import (
"regexp" "regexp"
"strconv" "strconv"
"strings" "strings"
"time"
"github.com/go-logr/logr" "github.com/go-logr/logr"
"google.golang.org/grpc" "google.golang.org/grpc"
...@@ -25,6 +26,14 @@ var podResourcesSocketPath = "/var/lib/kubelet/pod-resources/kubelet.sock" ...@@ -25,6 +26,14 @@ var podResourcesSocketPath = "/var/lib/kubelet/pod-resources/kubelet.sock"
var gpuUUIDPattern = regexp.MustCompile(`^GPU-[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}$`) var gpuUUIDPattern = regexp.MustCompile(`^GPU-[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}$`)
type CheckpointPhaseTimings struct {
TotalDuration time.Duration
}
type RestorePhaseTimings struct {
TotalDuration time.Duration
}
// GetPodGPUUUIDs resolves GPU UUIDs for a pod/container from kubelet // GetPodGPUUUIDs resolves GPU UUIDs for a pod/container from kubelet
// PodResources (nvidia.com/gpu entries in GetDevices()). // PodResources (nvidia.com/gpu entries in GetDevices()).
func GetPodGPUUUIDs(ctx context.Context, podName, podNamespace, containerName string) ([]string, error) { func GetPodGPUUUIDs(ctx context.Context, podName, podNamespace, containerName string) ([]string, error) {
...@@ -174,38 +183,52 @@ func BuildDeviceMap(sourceUUIDs, targetUUIDs []string, log logr.Logger) (string, ...@@ -174,38 +183,52 @@ func BuildDeviceMap(sourceUUIDs, targetUUIDs []string, log logr.Logger) (string,
// LockAndCheckpointProcessTree locks and checkpoints CUDA state for all given PIDs. // LockAndCheckpointProcessTree locks and checkpoints CUDA state for all given PIDs.
// On failure, the caller is expected to fail the operation and terminate the workload. // On failure, the caller is expected to fail the operation and terminate the workload.
func LockAndCheckpointProcessTree(ctx context.Context, cudaPIDs []int, log logr.Logger) error { func LockAndCheckpointProcessTree(ctx context.Context, cudaPIDs []int, log logr.Logger) (CheckpointPhaseTimings, error) {
var timings CheckpointPhaseTimings
start := time.Now()
for _, pid := range cudaPIDs { for _, pid := range cudaPIDs {
if err := lock(ctx, pid, log); err != nil { if err := lock(ctx, pid, log); err != nil {
return err timings.TotalDuration = time.Since(start)
return timings, err
} }
} }
for _, pid := range cudaPIDs { for _, pid := range cudaPIDs {
if err := checkpoint(ctx, pid, log); err != nil { if err := checkpoint(ctx, pid, log); err != nil {
return err timings.TotalDuration = time.Since(start)
return timings, err
} }
} }
timings.TotalDuration = time.Since(start)
return nil return timings, nil
} }
// RestoreAndUnlockProcessTree restores and unlocks CUDA state for the given PIDs. // RestoreAndUnlockProcessTree restores and unlocks CUDA state for the given PIDs.
func RestoreAndUnlockProcessTree(ctx context.Context, cudaPIDs []int, deviceMap string, log logr.Logger) error { func RestoreAndUnlockProcessTree(ctx context.Context, cudaPIDs []int, deviceMap string, log logr.Logger) (RestorePhaseTimings, error) {
var timings RestorePhaseTimings
start := time.Now()
for _, pid := range cudaPIDs { for _, pid := range cudaPIDs {
if err := restoreProcess(ctx, pid, deviceMap, log); err != nil { if err := restoreProcess(ctx, pid, deviceMap, log); err != nil {
return err timings.TotalDuration = time.Since(start)
return timings, err
} }
} }
for _, pid := range cudaPIDs { for _, pid := range cudaPIDs {
if err := unlock(ctx, pid, log); err != nil { if err := unlock(ctx, pid, log); err != nil {
timings.TotalDuration = time.Since(start)
state, stateErr := getState(ctx, pid) state, stateErr := getState(ctx, pid)
if stateErr == nil && state == "running" { if stateErr == nil && state == "running" {
log.Info("cuda-checkpoint-helper unlock returned error but process is already running", "pid", pid) log.Info("cuda-checkpoint-helper unlock returned error but process is already running", "pid", pid)
continue continue
} }
return err return timings, err
} }
} }
return nil timings.TotalDuration = time.Since(start)
return timings, nil
} }
...@@ -82,7 +82,7 @@ func runAction(ctx context.Context, pid int, action, deviceMap string, log logr. ...@@ -82,7 +82,7 @@ func runAction(ctx context.Context, pid int, action, deviceMap string, log logr.
) )
return fmt.Errorf("cuda-checkpoint-helper %v failed for pid %d after %s: %w (output: %s)", args, pid, duration, err, out) return fmt.Errorf("cuda-checkpoint-helper %v failed for pid %d after %s: %w (output: %s)", args, pid, duration, err, out)
} }
log.Info("cuda-checkpoint-helper command succeeded", log.V(1).Info("cuda-checkpoint-helper command succeeded",
"pid", pid, "pid", pid,
"outermost_pid", details.OutermostPID, "outermost_pid", details.OutermostPID,
"innermost_pid", details.InnermostPID, "innermost_pid", details.InnermostPID,
......
...@@ -28,12 +28,21 @@ type CheckpointRequest struct { ...@@ -28,12 +28,21 @@ type CheckpointRequest struct {
ContainerName string ContainerName string
CheckpointID string CheckpointID string
CheckpointLocation string CheckpointLocation string
StartedAt time.Time
NodeName string NodeName string
PodName string PodName string
PodNamespace string PodNamespace string
Clientset kubernetes.Interface Clientset kubernetes.Interface
} }
type checkpointPhaseTimings struct {
PrepareDuration time.Duration
CUDADuration time.Duration
CRIUDumpDuration time.Duration
OverlayCaptureDuration time.Duration
FinalizeDuration time.Duration
}
// Checkpoint performs a CRIU dump of a container. // Checkpoint performs a CRIU dump of a container.
// The operation has three phases: inspect, configure, capture. // The operation has three phases: inspect, configure, capture.
// //
...@@ -42,6 +51,8 @@ type CheckpointRequest struct { ...@@ -42,6 +51,8 @@ type CheckpointRequest struct {
// renamed into place at the base path root. // renamed into place at the base path root.
func Checkpoint(ctx context.Context, ctrd *containerd.Client, log logr.Logger, req CheckpointRequest, cfg *types.AgentConfig) error { func Checkpoint(ctx context.Context, ctrd *containerd.Client, log logr.Logger, req CheckpointRequest, cfg *types.AgentConfig) error {
checkpointStart := time.Now() checkpointStart := time.Now()
phaseTimings := checkpointPhaseTimings{}
prepareStart := time.Now()
log.Info("=== Starting checkpoint operation ===") log.Info("=== Starting checkpoint operation ===")
if strings.TrimSpace(req.CheckpointID) == "" { if strings.TrimSpace(req.CheckpointID) == "" {
...@@ -73,26 +84,46 @@ func Checkpoint(ctx context.Context, ctrd *containerd.Client, log logr.Logger, r ...@@ -73,26 +84,46 @@ func Checkpoint(ctx context.Context, ctrd *containerd.Client, log logr.Logger, r
if err != nil { if err != nil {
return err return err
} }
phaseTimings.PrepareDuration = time.Since(prepareStart)
// Phase 3: Capture — CRIU dump, rootfs diff // Phase 3: Capture — CRIU dump, rootfs diff
criuDumpDuration, err := captureCheckpoint(ctx, criuOpts, &cfg.CRIU, data, state, tmpDir, log) captureTimings, err := captureCheckpoint(ctx, criuOpts, &cfg.CRIU, data, state, tmpDir, log)
if err != nil { if err != nil {
return err return err
} }
phaseTimings.CUDADuration = captureTimings.CUDADuration
phaseTimings.CRIUDumpDuration = captureTimings.CRIUDumpDuration
phaseTimings.OverlayCaptureDuration = captureTimings.OverlayCaptureDuration
// Remove any previous checkpoint with the same identity hash before finalizing // Remove any previous checkpoint with the same identity hash, then
// promote the staged checkpoint directory into place.
finalizeStart := time.Now()
if err := os.RemoveAll(finalDir); err != nil { if err := os.RemoveAll(finalDir); err != nil {
return fmt.Errorf("failed to remove previous checkpoint directory: %w", err) return fmt.Errorf("failed to remove previous checkpoint directory: %w", err)
} }
if err := os.Rename(tmpDir, finalDir); err != nil { if err := os.Rename(tmpDir, finalDir); err != nil {
return fmt.Errorf("failed to finalize checkpoint directory: %w", err) return fmt.Errorf("failed to finalize checkpoint directory: %w", err)
} }
phaseTimings.FinalizeDuration = time.Since(finalizeStart)
totalDuration := time.Since(checkpointStart) totalDuration := time.Since(checkpointStart)
log.Info("=== Checkpoint operation completed ===", log.Info("Checkpoint timing summary",
"total_duration", totalDuration, "checkpoint", map[string]any{
"criu_dump_duration", criuDumpDuration, "duration": totalDuration.String(),
"phases": map[string]string{
"prepare_duration": phaseTimings.PrepareDuration.String(),
"cuda_duration": phaseTimings.CUDADuration.String(),
"criu_dump_duration": phaseTimings.CRIUDumpDuration.String(),
"overlay_capture_duration": phaseTimings.OverlayCaptureDuration.String(),
"finalize_duration": phaseTimings.FinalizeDuration.String(),
},
},
)
if !req.StartedAt.IsZero() {
log.Info("Checkpoint wall time from agent detection",
"started_to_checkpoint_complete", time.Since(req.StartedAt),
) )
}
return nil return nil
} }
...@@ -156,7 +187,7 @@ func inspectContainer(ctx context.Context, ctrd *containerd.Client, log logr.Log ...@@ -156,7 +187,7 @@ func inspectContainer(ctx context.Context, ctrd *containerd.Client, log logr.Log
cudaNamespacePIDs = append(cudaNamespacePIDs, process.InnermostPID) cudaNamespacePIDs = append(cudaNamespacePIDs, process.InnermostPID)
} }
if len(cudaHostPIDs) > 0 { if len(cudaHostPIDs) > 0 {
log.Info("Resolved checkpoint CUDA PID mapping", "host_pids", cudaHostPIDs, "namespace_pids", cudaNamespacePIDs) log.V(1).Info("Resolved checkpoint CUDA PID mapping", "host_pids", cudaHostPIDs, "namespace_pids", cudaNamespacePIDs)
} }
var gpuUUIDs []string var gpuUUIDs []string
if len(cudaHostPIDs) > 0 { if len(cudaHostPIDs) > 0 {
...@@ -218,30 +249,37 @@ func configureCheckpoint( ...@@ -218,30 +249,37 @@ func configureCheckpoint(
return criuOpts, m, nil return criuOpts, m, nil
} }
func captureCheckpoint(ctx context.Context, criuOpts *criurpc.CriuOpts, criuSettings *types.CRIUSettings, data *types.CheckpointManifest, state *types.CheckpointContainerSnapshot, checkpointDir string, log logr.Logger) (time.Duration, error) { func captureCheckpoint(ctx context.Context, criuOpts *criurpc.CriuOpts, criuSettings *types.CRIUSettings, data *types.CheckpointManifest, state *types.CheckpointContainerSnapshot, checkpointDir string, log logr.Logger) (*checkpointPhaseTimings, error) {
timings := &checkpointPhaseTimings{}
// CUDA lock+checkpoint must happen before CRIU dump // CUDA lock+checkpoint must happen before CRIU dump
if len(state.CUDAHostPIDs) > 0 { if len(state.CUDAHostPIDs) > 0 {
if err := cuda.LockAndCheckpointProcessTree(ctx, state.CUDAHostPIDs, log); err != nil { cudaTimings, err := cuda.LockAndCheckpointProcessTree(ctx, state.CUDAHostPIDs, log)
return 0, fmt.Errorf("CUDA checkpoint failed: %w", err) if err != nil {
return nil, fmt.Errorf("CUDA checkpoint failed: %w", err)
} }
timings.CUDADuration = cudaTimings.TotalDuration
} }
criuDumpDuration, err := criu.ExecuteDump(criuOpts, checkpointDir, criuSettings, log) criuDumpDuration, err := criu.ExecuteDump(criuOpts, checkpointDir, criuSettings, log)
if err != nil { if err != nil {
return 0, err return nil, err
} }
timings.CRIUDumpDuration = criuDumpDuration
// Overlay rootfs diff capture is best-effort. Failures are logged but not // Overlay rootfs diff capture is best-effort. Failures are logged but not
// propagated — a checkpoint without overlay diffs is still valid for restore // propagated — a checkpoint without overlay diffs is still valid for restore
// (the base container image provides the filesystem). // (the base container image provides the filesystem).
if state.UpperDir != "" { if state.UpperDir != "" {
overlayCaptureStart := time.Now()
if _, err := snapshotruntime.CaptureRootfsDiff(state.UpperDir, checkpointDir, data.Overlay.Exclusions, data.Overlay.BindMountDests); err != nil { if _, err := snapshotruntime.CaptureRootfsDiff(state.UpperDir, checkpointDir, data.Overlay.Exclusions, data.Overlay.BindMountDests); err != nil {
log.Error(err, "Failed to capture rootfs diff") log.Error(err, "Failed to capture rootfs diff")
} }
if _, err := snapshotruntime.CaptureDeletedFiles(state.UpperDir, checkpointDir); err != nil { if _, err := snapshotruntime.CaptureDeletedFiles(state.UpperDir, checkpointDir); err != nil {
log.Error(err, "Failed to capture deleted files") log.Error(err, "Failed to capture deleted files")
} }
timings.OverlayCaptureDuration = time.Since(overlayCaptureStart)
} }
return criuDumpDuration, nil return timings, nil
} }
...@@ -22,8 +22,15 @@ type RestoreOptions struct { ...@@ -22,8 +22,15 @@ type RestoreOptions struct {
CgroupRoot string CgroupRoot string
} }
type RestoreInNamespaceResult struct {
RestoredPID int `json:"restoredPID"`
NSRestoreSetupDuration time.Duration `json:"nsrestoreSetupDuration"`
CRIURestoreDuration time.Duration `json:"criuRestoreDuration"`
CUDADuration time.Duration `json:"cudaDuration"`
}
// RestoreInNamespace performs a full restore from inside the target container's namespaces. // RestoreInNamespace performs a full restore from inside the target container's namespaces.
func RestoreInNamespace(ctx context.Context, opts RestoreOptions, log logr.Logger) (int, error) { func RestoreInNamespace(ctx context.Context, opts RestoreOptions, log logr.Logger) (*RestoreInNamespaceResult, error) {
restoreStart := time.Now() restoreStart := time.Now()
log.Info("Starting nsrestore workflow", log.Info("Starting nsrestore workflow",
"checkpoint_path", opts.CheckpointPath, "checkpoint_path", opts.CheckpointPath,
...@@ -31,49 +38,77 @@ func RestoreInNamespace(ctx context.Context, opts RestoreOptions, log logr.Logge ...@@ -31,49 +38,77 @@ func RestoreInNamespace(ctx context.Context, opts RestoreOptions, log logr.Logge
"cgroup_root", opts.CgroupRoot, "cgroup_root", opts.CgroupRoot,
) )
manifestReadStart := time.Now()
m, err := types.ReadManifest(opts.CheckpointPath) m, err := types.ReadManifest(opts.CheckpointPath)
if err != nil { if err != nil {
return 0, fmt.Errorf("failed to read manifest: %w", err) return nil, fmt.Errorf("failed to read manifest: %w", err)
} }
log.Info("Loaded checkpoint manifest", manifestReadDuration := time.Since(manifestReadStart)
log.V(1).Info("Loaded checkpoint manifest",
"ext_mounts", len(m.CRIUDump.ExtMnt), "ext_mounts", len(m.CRIUDump.ExtMnt),
"criu_log_level", m.CRIUDump.CRIU.LogLevel, "criu_log_level", m.CRIUDump.CRIU.LogLevel,
"manage_cgroups_mode", m.CRIUDump.CRIU.ManageCgroupsMode, "manage_cgroups_mode", m.CRIUDump.CRIU.ManageCgroupsMode,
"checkpoint_has_cuda", !m.CUDA.IsEmpty(), "checkpoint_has_cuda", !m.CUDA.IsEmpty(),
) )
// Phase 1: Configure — build CRIU opts from manifest // Phase 1: Configure — build CRIU opts from manifest
configureStart := time.Now()
criuOpts, err := criu.BuildRestoreOpts(m, opts.CheckpointPath, opts.CgroupRoot, log) criuOpts, err := criu.BuildRestoreOpts(m, opts.CheckpointPath, opts.CgroupRoot, log)
if err != nil { if err != nil {
return 0, err return nil, err
} }
configureDuration := time.Since(configureStart)
// Phase 2: Execute — rootfs, CRIU restore, CUDA restore // Phase 2: Execute — rootfs, CRIU restore, CUDA restore
restoredPID, err := executeRestore(ctx, criuOpts, m, opts, log) executeTimings, restoredPID, err := executeRestore(ctx, criuOpts, m, opts, log)
if err != nil { if err != nil {
return 0, err return nil, err
} }
log.Info("nsrestore completed", "restored_pid", restoredPID, "duration", time.Since(restoreStart)) result := &RestoreInNamespaceResult{
return restoredPID, nil RestoredPID: restoredPID,
NSRestoreSetupDuration: manifestReadDuration + configureDuration + executeTimings.nsrestoreSetupDuration,
CRIURestoreDuration: executeTimings.criuRestoreDuration,
CUDADuration: executeTimings.cudaDuration,
}
log.V(1).Info("nsrestore timing summary",
"restored_pid", restoredPID,
"nsrestore_setup_duration", result.NSRestoreSetupDuration,
"criu_restore_duration", result.CRIURestoreDuration,
"cuda_duration", result.CUDADuration,
"total_duration", time.Since(restoreStart),
)
return result, nil
} }
func executeRestore(ctx context.Context, criuOpts *criurpc.CriuOpts, m *types.CheckpointManifest, opts RestoreOptions, log logr.Logger) (int, error) { type nsrestorePhaseTimings struct {
nsrestoreSetupDuration time.Duration
criuRestoreDuration time.Duration
cudaDuration time.Duration
}
func executeRestore(ctx context.Context, criuOpts *criurpc.CriuOpts, m *types.CheckpointManifest, opts RestoreOptions, log logr.Logger) (*nsrestorePhaseTimings, int, error) {
timings := &nsrestorePhaseTimings{}
// Apply rootfs diff inside the namespace (target root is /) // Apply rootfs diff inside the namespace (target root is /)
nsrestoreSetupStart := time.Now()
if err := snapshotruntime.ApplyRootfsDiff(opts.CheckpointPath, "/", log); err != nil { if err := snapshotruntime.ApplyRootfsDiff(opts.CheckpointPath, "/", log); err != nil {
return 0, fmt.Errorf("rootfs diff failed: %w", err) return nil, 0, fmt.Errorf("rootfs diff failed: %w", err)
} }
if err := snapshotruntime.ApplyDeletedFiles(opts.CheckpointPath, "/", log); err != nil { if err := snapshotruntime.ApplyDeletedFiles(opts.CheckpointPath, "/", log); err != nil {
log.Error(err, "Failed to apply deleted files") log.Error(err, "Failed to apply deleted files")
} }
// Unmount placeholder's /dev/shm so CRIU can recreate tmpfs with checkpointed content // Unmount placeholder's /dev/shm so CRIU can recreate tmpfs with checkpointed content
if err := syscall.Unmount("/dev/shm", 0); err != nil { if err := syscall.Unmount("/dev/shm", 0); err != nil {
return 0, fmt.Errorf("failed to unmount /dev/shm before restore: %w", err) return nil, 0, fmt.Errorf("failed to unmount /dev/shm before restore: %w", err)
} }
if err := snapshotruntime.RemountProcSys(true); err != nil { if err := snapshotruntime.RemountProcSys(true); err != nil {
return 0, fmt.Errorf("failed to remount /proc/sys read-write for restore: %w", err) return nil, 0, fmt.Errorf("failed to remount /proc/sys read-write for restore: %w", err)
} }
timings.nsrestoreSetupDuration = time.Since(nsrestoreSetupStart)
defer func() { defer func() {
if err := snapshotruntime.RemountProcSys(false); err != nil { if err := snapshotruntime.RemountProcSys(false); err != nil {
log.Error(err, "Failed to remount /proc/sys read-only after restore") log.Error(err, "Failed to remount /proc/sys read-only after restore")
...@@ -81,22 +116,26 @@ func executeRestore(ctx context.Context, criuOpts *criurpc.CriuOpts, m *types.Ch ...@@ -81,22 +116,26 @@ func executeRestore(ctx context.Context, criuOpts *criurpc.CriuOpts, m *types.Ch
}() }()
// CRIU restore // CRIU restore
criuRestoreStart := time.Now()
restoredPID, err := criu.ExecuteRestore(criuOpts, m, opts.CheckpointPath, log) restoredPID, err := criu.ExecuteRestore(criuOpts, m, opts.CheckpointPath, log)
if err != nil { if err != nil {
return 0, err return nil, 0, err
} }
timings.criuRestoreDuration = time.Since(criuRestoreStart)
cudaStart := time.Now()
processes, err := snapshotruntime.ReadProcessTable("/proc") processes, err := snapshotruntime.ReadProcessTable("/proc")
if err != nil { if err != nil {
return 0, fmt.Errorf("failed to read restored process table: %w", err) return nil, 0, fmt.Errorf("failed to read restored process table: %w", err)
} }
log.Info("Restored process table snapshot", log.V(1).Info("Restored process table snapshot",
"proc_root", "/proc", "proc_root", "/proc",
"criu_callback_pid", restoredPID, "criu_callback_pid", restoredPID,
"process_count", len(processes), "process_count", len(processes),
"manifest_cuda_pids", m.CUDA.PIDs, "manifest_cuda_pids", m.CUDA.PIDs,
) )
for _, process := range processes { for _, process := range processes {
log.Info("Restored process entry", log.V(1).Info("Restored process entry",
"observed_pid", process.ObservedPID, "observed_pid", process.ObservedPID,
"parent_pid", process.ParentPID, "parent_pid", process.ParentPID,
"outermost_pid", process.OutermostPID, "outermost_pid", process.OutermostPID,
...@@ -111,17 +150,19 @@ func executeRestore(ctx context.Context, criuOpts *criurpc.CriuOpts, m *types.Ch ...@@ -111,17 +150,19 @@ func executeRestore(ctx context.Context, criuOpts *criurpc.CriuOpts, m *types.Ch
if !m.CUDA.IsEmpty() { if !m.CUDA.IsEmpty() {
restorePIDs, err := snapshotruntime.ResolveManifestPIDsToObservedPIDs(processes, int(restoredPID), m.CUDA.PIDs) restorePIDs, err := snapshotruntime.ResolveManifestPIDsToObservedPIDs(processes, int(restoredPID), m.CUDA.PIDs)
if err != nil { if err != nil {
return 0, fmt.Errorf("failed to resolve restored CUDA PIDs: %w", err) return nil, 0, fmt.Errorf("failed to resolve restored CUDA PIDs: %w", err)
} }
log.Info("Resolved manifest CUDA PIDs to current restore PIDs", log.V(1).Info("Resolved manifest CUDA PIDs to current restore PIDs",
"manifest_cuda_pids", m.CUDA.PIDs, "manifest_cuda_pids", m.CUDA.PIDs,
"restored_cuda_pids", restorePIDs, "restored_cuda_pids", restorePIDs,
"criu_callback_pid", restoredPID, "criu_callback_pid", restoredPID,
) )
if err := cuda.RestoreAndUnlockProcessTree(ctx, restorePIDs, opts.CUDADeviceMap, log); err != nil { _, err = cuda.RestoreAndUnlockProcessTree(ctx, restorePIDs, opts.CUDADeviceMap, log)
return 0, fmt.Errorf("CUDA restore failed: %w", err) if err != nil {
return nil, 0, fmt.Errorf("CUDA restore failed: %w", err)
} }
} }
timings.cudaDuration = time.Since(cudaStart)
return int(restoredPID), nil return timings, int(restoredPID), nil
} }
...@@ -27,6 +27,7 @@ import ( ...@@ -27,6 +27,7 @@ import (
type RestoreRequest struct { type RestoreRequest struct {
CheckpointID string CheckpointID string
CheckpointLocation string CheckpointLocation string
StartedAt time.Time
NSRestorePath string NSRestorePath string
PodName string PodName string
PodNamespace string PodNamespace string
...@@ -47,30 +48,53 @@ func Restore(ctx context.Context, ctrd *containerd.Client, log logr.Logger, req ...@@ -47,30 +48,53 @@ func Restore(ctx context.Context, ctrd *containerd.Client, log logr.Logger, req
"container", req.ContainerName, "container", req.ContainerName,
) )
// Phase 1: Inspect — resolve placeholder, discover target GPUs, build device map // Phase 1: Host inspect — resolve placeholder, discover target GPUs, build device map
hostInspectStart := time.Now()
snap, err := inspectRestore(ctx, ctrd, log, req) snap, err := inspectRestore(ctx, ctrd, log, req)
if err != nil { if err != nil {
return 0, err return 0, err
} }
hostInspectDuration := time.Since(hostInspectStart)
// Phase 2: Execute — nsrestore handles rootfs, CRIU restore, and CUDA restore inside namespace // Phase 2: Execute — nsrestore handles rootfs, CRIU restore, and CUDA restore inside namespace
restoredPID, err := execNSRestore(ctx, log, req, snap) result, err := execNSRestore(ctx, log, req, snap)
if err != nil { if err != nil {
return 0, fmt.Errorf("nsrestore failed: %w", err) return 0, fmt.Errorf("nsrestore failed: %w", err)
} }
log.Info("nsrestore completed", "restored_pid", restoredPID) restoreDuration := hostInspectDuration + result.NSRestoreSetupDuration + result.CRIURestoreDuration + result.CUDADuration
log.Info("Restore timing summary",
"restore", map[string]any{
"duration": restoreDuration.String(),
"phases": map[string]string{
"host_inspect_duration": hostInspectDuration.String(),
"nsrestore_setup_duration": result.NSRestoreSetupDuration.String(),
"criu_restore_duration": result.CRIURestoreDuration.String(),
"cuda_duration": result.CUDADuration.String(),
},
},
)
if !req.StartedAt.IsZero() {
log.Info("Restore wall time from agent detection",
"started_to_restore_complete", time.Since(req.StartedAt),
)
}
// Validate restored process from the host side // Validate restored process from the host side
validationStart := time.Now()
procRoot := filepath.Join(snap.TargetRoot, "proc") procRoot := filepath.Join(snap.TargetRoot, "proc")
if err := snapshotruntime.ValidateProcessState(procRoot, restoredPID); err != nil { if err := snapshotruntime.ValidateProcessState(procRoot, result.RestoredPID); err != nil {
restoreLogPath := filepath.Join(snap.TargetRoot, "var", "criu-work", criu.RestoreLogFilename) restoreLogPath := filepath.Join(snap.TargetRoot, "var", "criu-work", criu.RestoreLogFilename)
logging.LogProcessDiagnostics(procRoot, restoredPID, restoreLogPath, log) logging.LogProcessDiagnostics(procRoot, result.RestoredPID, restoreLogPath, log)
return 0, fmt.Errorf("restored process failed post-restore validation: %w", err) return 0, fmt.Errorf("restored process failed post-restore validation: %w", err)
} }
log.Info("=== External restore completed ===", "total_duration", time.Since(restoreStart)) log.Info("=== External restore completed ===",
"restored_pid", result.RestoredPID,
"validation_duration", time.Since(validationStart),
"total_duration", time.Since(restoreStart),
)
return restoredPID, nil return result.RestoredPID, nil
} }
func inspectRestore(ctx context.Context, ctrd *containerd.Client, log logr.Logger, req RestoreRequest) (*types.RestoreContainerSnapshot, error) { func inspectRestore(ctx context.Context, ctrd *containerd.Client, log logr.Logger, req RestoreRequest) (*types.RestoreContainerSnapshot, error) {
...@@ -105,7 +129,7 @@ func inspectRestore(ctx context.Context, ctrd *containerd.Client, log logr.Logge ...@@ -105,7 +129,7 @@ func inspectRestore(ctx context.Context, ctrd *containerd.Client, log logr.Logge
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to resolve placeholder container: %w", err) return nil, fmt.Errorf("failed to resolve placeholder container: %w", err)
} }
log.Info("Resolved placeholder container", "pid", placeholderPID) log.V(1).Info("Resolved placeholder container", "pid", placeholderPID)
cgroupRoot, err := snapshotruntime.ResolveCgroupRootFromHostPID(placeholderPID) cgroupRoot, err := snapshotruntime.ResolveCgroupRootFromHostPID(placeholderPID)
if err != nil { if err != nil {
...@@ -137,7 +161,7 @@ func inspectRestore(ctx context.Context, ctrd *containerd.Client, log logr.Logge ...@@ -137,7 +161,7 @@ func inspectRestore(ctx context.Context, ctrd *containerd.Client, log logr.Logge
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to build CUDA device map: %w", err) return nil, fmt.Errorf("failed to build CUDA device map: %w", err)
} }
log.Info("GPU UUIDs for device map", log.V(1).Info("GPU UUIDs for device map",
"source_uuids", m.CUDA.SourceGPUUUIDs, "source_uuids", m.CUDA.SourceGPUUUIDs,
"target_uuids", targetGPUUUIDs, "target_uuids", targetGPUUUIDs,
"device_map", cudaDeviceMap, "device_map", cudaDeviceMap,
...@@ -155,7 +179,7 @@ func inspectRestore(ctx context.Context, ctrd *containerd.Client, log logr.Logge ...@@ -155,7 +179,7 @@ func inspectRestore(ctx context.Context, ctrd *containerd.Client, log logr.Logge
// execNSRestore launches the nsrestore binary inside the placeholder container's // execNSRestore launches the nsrestore binary inside the placeholder container's
// namespaces via nsenter and parses the restored PID from stdout JSON. // namespaces via nsenter and parses the restored PID from stdout JSON.
func execNSRestore(ctx context.Context, log logr.Logger, req RestoreRequest, snap *types.RestoreContainerSnapshot) (int, error) { func execNSRestore(ctx context.Context, log logr.Logger, req RestoreRequest, snap *types.RestoreContainerSnapshot) (*RestoreInNamespaceResult, error) {
args := []string{ args := []string{
"-t", strconv.Itoa(snap.PlaceholderPID), "-t", strconv.Itoa(snap.PlaceholderPID),
// Intentionally exclude cgroup namespace (-C): CRIU must manage cgroups // Intentionally exclude cgroup namespace (-C): CRIU must manage cgroups
...@@ -181,18 +205,16 @@ func execNSRestore(ctx context.Context, log logr.Logger, req RestoreRequest, sna ...@@ -181,18 +205,16 @@ func execNSRestore(ctx context.Context, log logr.Logger, req RestoreRequest, sna
cmd.Stderr = os.Stderr cmd.Stderr = os.Stderr
if err := cmd.Run(); err != nil { if err := cmd.Run(); err != nil {
return 0, fmt.Errorf("nsrestore failed: %w\nstdout: %s", err, stdout.String()) return nil, fmt.Errorf("nsrestore failed: %w\nstdout: %s", err, stdout.String())
} }
var result struct { var result RestoreInNamespaceResult
RestoredPID int `json:"restoredPID"`
}
if err := json.Unmarshal(stdout.Bytes(), &result); err != nil { if err := json.Unmarshal(stdout.Bytes(), &result); err != nil {
return 0, fmt.Errorf("failed to parse nsrestore result: %w\nstdout: %s", err, stdout.String()) return nil, fmt.Errorf("failed to parse nsrestore result: %w\nstdout: %s", err, stdout.String())
} }
if result.RestoredPID <= 0 { if result.RestoredPID <= 0 {
return 0, fmt.Errorf("nsrestore returned invalid PID %d", result.RestoredPID) return nil, fmt.Errorf("nsrestore returned invalid PID %d", result.RestoredPID)
} }
return result.RestoredPID, nil return &result, nil
} }
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment