checkpoint.go 7.26 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
// Package checkpoint provides CRIU checkpoint (dump) operations.
package checkpoint

import (
	"context"
	"fmt"
	"os"
	"path/filepath"
	"time"

11
12
	criurpc "github.com/checkpoint-restore/go-criu/v7/rpc"
	specs "github.com/opencontainers/runtime-spec/specs-go"
13
14
15
16
17
18
	"github.com/sirupsen/logrus"
	"google.golang.org/protobuf/proto"

	"github.com/ai-dynamo/dynamo/deploy/chrek/pkg/common"
)

19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// ContainerInfoSnapshot holds runtime/container info needed for checkpointing.
type ContainerInfoSnapshot struct {
	PID        int
	RootFS     string
	UpperDir   string
	OCISpec    *specs.Spec
	MountInfo  []MountInfo
	Namespaces map[NamespaceType]*NamespaceInfo
}

// CheckpointManifest is saved as manifest.yaml at checkpoint time and loaded at restore.
type CheckpointManifest struct {
	CheckpointID string    `yaml:"checkpointId"`
	CreatedAt    time.Time `yaml:"createdAt"`

	CRIUDump   CRIUDumpManifest         `yaml:"criuDump"`
	K8s        SourcePodManifest        `yaml:"k8s"`
	Filesystem FilesystemManifest       `yaml:"filesystem"`
	Namespaces []NamespaceManifestEntry `yaml:"namespaces"`
}

// NewCheckpointManifest assembles a CheckpointManifest from per-module builders.
func NewCheckpointManifest(
	checkpointID string,
	criuDump CRIUDumpManifest,
	k8s SourcePodManifest,
	filesystem FilesystemManifest,
	namespaces []NamespaceManifestEntry,
) *CheckpointManifest {
	return &CheckpointManifest{
		CheckpointID: checkpointID,
		CreatedAt:    time.Now().UTC(),
		CRIUDump:     criuDump,
		K8s:          k8s,
		Filesystem:   filesystem,
		Namespaces:   namespaces,
	}
}

// CheckpointRequest holds per-checkpoint identifiers for a checkpoint operation.
type CheckpointRequest struct {
60
61
62
63
64
65
66
67
68
	ContainerID   string
	ContainerName string // K8s container name (for K8s API volume type lookup)
	CheckpointID  string
	CheckpointDir string
	NodeName      string
	PodName       string
	PodNamespace  string
}

69
70
// CheckpointOutcome contains the result of a checkpoint operation.
type CheckpointOutcome struct {
71
72
	CheckpointID  string
	CheckpointDir string
73
	Data          *CheckpointManifest
74
75
76
77
}

// Checkpointer performs CRIU checkpoint operations
type Checkpointer struct {
78
	discoveryClient *DiscoveryClient
79
80
81
82
	log             *logrus.Entry
}

// NewCheckpointer creates a new checkpointer
83
func NewCheckpointer(discoveryClient *DiscoveryClient) *Checkpointer {
84
85
86
87
88
89
	return &Checkpointer{
		discoveryClient: discoveryClient,
		log:             logrus.WithField("component", "checkpointer"),
	}
}

90
91
92
93
94
95
// Checkpoint performs a CRIU dump of a container.
// The operation has three phases: introspect, configure, capture.
func (c *Checkpointer) Checkpoint(ctx context.Context, req CheckpointRequest, spec *CheckpointSpec) (*CheckpointOutcome, error) {
	if spec == nil {
		return nil, fmt.Errorf("checkpoint spec is required")
	}
96
97
98
	checkpointStart := time.Now()
	c.log.Info("=== Starting checkpoint operation ===")

99
	checkpointDir := filepath.Join(req.CheckpointDir, req.CheckpointID)
100
101
102
103
	if err := os.MkdirAll(checkpointDir, 0700); err != nil {
		return nil, fmt.Errorf("failed to create checkpoint directory: %w", err)
	}

104
105
106
	// Open image directory FD for CRIU — must stay open through both configure and capture
	// phases since CRIU's swrk child process inherits this FD.
	imageDir, imageDirFD, err := common.OpenPathForCRIU(checkpointDir)
107
	if err != nil {
108
		return nil, fmt.Errorf("failed to open image directory: %w", err)
109
	}
110
111
112
113
	defer imageDir.Close()

	// Phase 1: Introspect container state
	state, err := c.introspect(ctx, req.ContainerID)
114
	if err != nil {
115
		return nil, err
116
	}
117
118
119

	// Phase 2: Configure CRIU options and build checkpoint manifest.
	criuOpts, data, err := c.configure(state, req, spec, checkpointDir, imageDirFD)
120
	if err != nil {
121
		return nil, err
122
123
	}

124
125
	// Phase 3: Capture — CRIU dump, /dev/shm, rootfs diff
	criuDumpDuration, err := c.capture(criuOpts, data, state, checkpointDir)
126
127
128
129
	if err != nil {
		return nil, err
	}

130
131
132
133
134
	totalDuration := time.Since(checkpointStart)
	c.log.WithFields(logrus.Fields{
		"total_duration":     totalDuration,
		"criu_dump_duration": criuDumpDuration,
	}).Info("=== Checkpoint operation completed ===")
135

136
137
138
139
140
141
	return &CheckpointOutcome{
		CheckpointID:  req.CheckpointID,
		CheckpointDir: checkpointDir,
		Data:          data,
	}, nil
}
142

143
144
145
146
147
// introspect resolves the container and gathers all runtime state from containerd and /proc.
func (c *Checkpointer) introspect(ctx context.Context, containerID string) (*ContainerInfoSnapshot, error) {
	pid, ociSpec, err := c.discoveryClient.ResolveContainer(ctx, containerID)
	if err != nil {
		return nil, fmt.Errorf("failed to resolve container: %w", err)
148
149
	}

150
151
152
	rootFS, err := GetRootFS(pid)
	if err != nil {
		return nil, fmt.Errorf("failed to get rootfs: %w", err)
153
	}
154
155
156
	upperDir, err := GetOverlayUpperDir(pid)
	if err != nil {
		return nil, fmt.Errorf("failed to get overlay upperdir: %w", err)
157
	}
158
159
160
	mountInfo, err := ReadMountInfoFromHostProcPath(pid)
	if err != nil {
		return nil, fmt.Errorf("failed to parse mountinfo: %w", err)
161
	}
162
163
164
	namespaces, err := GetAllNamespaces(pid)
	if err != nil {
		return nil, fmt.Errorf("failed to get namespaces: %w", err)
165
166
	}

167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
	return &ContainerInfoSnapshot{
		PID:        pid,
		RootFS:     rootFS,
		UpperDir:   upperDir,
		OCISpec:    ociSpec,
		MountInfo:  mountInfo,
		Namespaces: namespaces,
	}, nil
}

// configure builds CRIU options and checkpoint manifest from runtime snapshot and spec.
func (c *Checkpointer) configure(
	state *ContainerInfoSnapshot,
	req CheckpointRequest,
	spec *CheckpointSpec,
	checkpointDir string,
	imageDirFD int32,
) (*criurpc.CriuOpts, *CheckpointManifest, error) {
	criuOpts, err := BuildCRIUDumpOptions(
		&spec.CRIU,
		state.PID,
		imageDirFD,
		state.RootFS,
		state.MountInfo,
		state.OCISpec,
		state.Namespaces,
	)
	if err != nil {
		return nil, nil, err
196
197
	}

198
199
200
201
	// Write CRIU config file (for options unavailable via RPC)
	configPath := filepath.Join(checkpointDir, CheckpointCRIUConfFilename)
	if err := os.WriteFile(configPath, []byte(spec.CRIU.GenerateCRIUConfContent()), 0644); err != nil {
		return nil, nil, fmt.Errorf("failed to write CRIU config file: %w", err)
202
	}
203
	criuOpts.ConfigFile = proto.String(configPath)
204

205
206
207
208
209
210
211
212
	// Build and save the checkpoint manifest.
	manifest := NewCheckpointManifest(
		req.CheckpointID,
		NewCRIUDumpManifest(criuOpts, spec.CRIU),
		NewSourcePodManifest(req, state.PID),
		NewFilesystemManifest(spec.RootfsExclusions, state.UpperDir, state.OCISpec),
		NewNamespaceManifestEntries(state.Namespaces),
	)
213

214
215
216
	if err := WriteCheckpointManifest(checkpointDir, manifest); err != nil {
		return nil, nil, fmt.Errorf("failed to write checkpoint manifest: %w", err)
	}
217

218
	return criuOpts, manifest, nil
219
220
}

221
222
223
224
225
226
227
228
229
// capture executes the CRIU dump and post-dump captures (/dev/shm, rootfs diff).
// Returns the CRIU dump duration for timing reporting.
func (c *Checkpointer) capture(
	criuOpts *criurpc.CriuOpts,
	data *CheckpointManifest,
	state *ContainerInfoSnapshot,
	checkpointDir string,
) (time.Duration, error) {
	criuDumpDuration, err := ExecuteCRIUDump(criuOpts, checkpointDir, c.log)
230
	if err != nil {
231
		return 0, err
232
233
	}

234
235
236
	// Capture /dev/shm contents (must happen after dump for final process state)
	if err := CaptureDevShm(state.PID, checkpointDir, c.log); err != nil {
		c.log.WithError(err).Warn("Failed to capture /dev/shm contents")
237
238
	}

239
240
	// Capture rootfs diff and deleted files
	CaptureRootfsState(state.UpperDir, checkpointDir, data, c.log)
241

242
	return criuDumpDuration, nil
243
}