metadata_builder.go 4.82 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
// metadata_builder provides checkpoint metadata construction.
package checkpoint

import (
	"context"
	"strings"

	"github.com/sirupsen/logrus"

	checkpointk8s "github.com/ai-dynamo/dynamo/deploy/chrek/pkg/checkpoint/k8s"
	"github.com/ai-dynamo/dynamo/deploy/chrek/pkg/common"
)

// MetadataBuilderConfig holds configuration for building checkpoint metadata.
type MetadataBuilderConfig struct {
	CheckpointID  string
	NodeName      string
	ContainerID   string
	ContainerName string
	PodName       string
	PodNamespace  string
	PID           int
	CUDAPluginDir string
}

// BuildCheckpointMetadata constructs checkpoint metadata from container state.
func BuildCheckpointMetadata(
	ctx context.Context,
	cfg MetadataBuilderConfig,
	containerInfo *checkpointk8s.ContainerInfo,
	mounts []MountMapping,
	namespaces map[NamespaceType]*NamespaceInfo,
	k8sClient *checkpointk8s.K8sClient,
	log *logrus.Entry,
) *common.CheckpointMetadata {
	meta := common.NewCheckpointMetadata(cfg.CheckpointID)
	meta.SourceNode = cfg.NodeName
	meta.ContainerID = cfg.ContainerID
	meta.PodName = cfg.PodName
	meta.PodNamespace = cfg.PodNamespace
	meta.PID = cfg.PID
	meta.Image = containerInfo.Image

	// Populate OCI spec derived paths
	meta.MaskedPaths = containerInfo.GetMaskedPaths()
	meta.ReadonlyPaths = containerInfo.GetReadonlyPaths()

	// Build mount metadata
	ociMountByDest := buildOCIMountLookup(containerInfo, meta)

	// Get K8s volume types if available
	k8sVolumes := getK8sVolumes(ctx, k8sClient, cfg, log)

	// Add mount metadata
	for _, mount := range mounts {
		mountMeta := buildMountMetadata(mount, k8sVolumes, ociMountByDest)
		meta.Mounts = append(meta.Mounts, mountMeta)
	}

	// Add namespace metadata
	for nsType, nsInfo := range namespaces {
		meta.Namespaces = append(meta.Namespaces, common.NamespaceMetadata{
			Type:       string(nsType),
			Inode:      nsInfo.Inode,
			IsExternal: nsInfo.IsExternal,
		})
	}

	// Set CRIU options (hardcoded as always-on for K8s, stored for compatibility)
	meta.CRIUOptions = common.CRIUOptionsMetadata{
		TcpEstablished: false, // Always false - we close TCP connections
		TcpClose:       true,  // Always true - pod IPs change on restore
		ShellJob:       true,  // Always true - containers are session leaders
		FileLocks:      true,  // Always true - apps use file locks
		LeaveRunning:   true,  // Always true - keep process running after checkpoint
		LinkRemap:      true,  // Always true - handle deleted-but-open files
		ExtMasters:     true,  // Always true - external bind mount masters
	}

	return meta
}

// buildOCIMountLookup builds a lookup map from OCI mounts and populates bind mount destinations.
func buildOCIMountLookup(containerInfo *checkpointk8s.ContainerInfo, meta *common.CheckpointMetadata) map[string]checkpointk8s.MountInfo {
	ociMounts := containerInfo.GetMounts()
	ociMountByDest := make(map[string]checkpointk8s.MountInfo)
	for _, m := range ociMounts {
		ociMountByDest[m.Destination] = m
		if m.Type == "bind" {
			meta.BindMountDests = append(meta.BindMountDests, m.Destination)
		}
	}
	return ociMountByDest
}

// getK8sVolumes fetches volume types from K8s API if available.
func getK8sVolumes(ctx context.Context, k8sClient *checkpointk8s.K8sClient, cfg MetadataBuilderConfig, log *logrus.Entry) map[string]*checkpointk8s.VolumeInfo {
	if k8sClient == nil || cfg.PodNamespace == "" || cfg.PodName == "" || cfg.ContainerName == "" {
		return nil
	}

	k8sVolumes, err := k8sClient.GetPodVolumes(ctx, cfg.PodNamespace, cfg.PodName, cfg.ContainerName)
	if err != nil {
		log.WithError(err).Warn("Failed to get volume types from K8s API, falling back to path-based detection")
		return nil
	}
	log.WithField("volume_count", len(k8sVolumes)).Debug("Got volume types from K8s API")
	return k8sVolumes
}

// buildMountMetadata constructs metadata for a single mount.
func buildMountMetadata(mount MountMapping, k8sVolumes map[string]*checkpointk8s.VolumeInfo, ociMountByDest map[string]checkpointk8s.MountInfo) common.MountMetadata {
	var volumeType, volumeName string

	// Try K8s API first for accurate volume types
	if k8sVolumes != nil {
		if volInfo, ok := k8sVolumes[mount.InsidePath]; ok {
			volumeType = volInfo.VolumeType
			volumeName = volInfo.VolumeName
		}
	}

	// Fall back to path-based detection if K8s API didn't provide info
	if volumeType == "" {
		volumeType, volumeName = checkpointk8s.DetectVolumeTypeFromPath(mount.OutsidePath)
	}

	mountMeta := common.MountMetadata{
		ContainerPath: mount.InsidePath,
		HostPath:      mount.OutsidePath,
		VolumeType:    volumeType,
		VolumeName:    volumeName,
		FSType:        mount.FSType,
		ReadOnly:      strings.Contains(mount.Options, "ro"),
	}

	// Cross-reference with OCI spec mount if available
	if ociMount, ok := ociMountByDest[mount.InsidePath]; ok {
		mountMeta.OCISource = ociMount.Source
		mountMeta.OCIType = ociMount.Type
		mountMeta.OCIOptions = ociMount.Options
	}

	return mountMeta
}