podspec.go 3.9 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/*
 * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
 * SPDX-License-Identifier: Apache-2.0
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package checkpoint

import (
21
	"context"
22
23
24
	"fmt"

	commonconsts "github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
25
	snapshotprotocol "github.com/ai-dynamo/dynamo/deploy/snapshot/protocol"
26
	corev1 "k8s.io/api/core/v1"
27
	ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
28
29
30
)

func ApplyRestorePodMetadata(labels map[string]string, annotations map[string]string, checkpointInfo *CheckpointInfo) {
31
32
33
34
35
36
37
38
	enabled := checkpointInfo != nil && checkpointInfo.Enabled && checkpointInfo.Ready
	hash := ""
	artifactVersion := ""
	if enabled {
		hash = checkpointInfo.Hash
		artifactVersion = checkpointInfo.ArtifactVersion
	}
	snapshotprotocol.ApplyRestoreTargetMetadata(labels, annotations, enabled, hash, artifactVersion)
39
40
}

41
42
43
44
45
46
47
48
49
50
51
52
53
// resolveMainContainer finds the container named "main" in the pod spec.
// ExtraPodSpec.PodSpec.Containers can inject user containers before the main
// container (mergo merge happens before main is appended), so index 0 is
// not guaranteed to be the main container here.
func resolveMainContainer(podSpec *corev1.PodSpec) *corev1.Container {
	for i := range podSpec.Containers {
		if podSpec.Containers[i].Name == commonconsts.MainContainerName {
			return &podSpec.Containers[i]
		}
	}
	return nil
}

54
func InjectCheckpointIntoPodSpec(
55
56
57
	ctx context.Context,
	reader ctrlclient.Reader,
	namespace string,
58
59
60
	podSpec *corev1.PodSpec,
	checkpointInfo *CheckpointInfo,
) error {
61
62
63
64
65
66
67
68
69
	// Only mutate the worker pod spec once the checkpoint is Ready. Before
	// the checkpoint exists, the worker must cold-start normally without
	// the snapshot-control volume, DYN_SNAPSHOT_CONTROL_DIR, checkpoint PVC
	// mount, or localhost seccomp profile — otherwise the Python worker
	// enters checkpoint mode on env-var presence and sits quiesced waiting
	// for a sentinel that only the checkpoint Job and restore-target path
	// produce. The checkpoint Job itself is built separately through
	// buildCheckpointJob + NewCheckpointJob and does get these.
	if checkpointInfo == nil || !checkpointInfo.Enabled || !checkpointInfo.Ready {
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
		return nil
	}

	info := checkpointInfo
	if info.Hash == "" {
		if info.Identity == nil {
			return fmt.Errorf("checkpoint enabled but identity is nil and hash is not set")
		}

		hash, err := ComputeIdentityHash(*info.Identity)
		if err != nil {
			return fmt.Errorf("failed to compute identity hash: %w", err)
		}
		info.Hash = hash
	}

86
	mainContainer := resolveMainContainer(podSpec)
87
	if mainContainer == nil {
88
		return fmt.Errorf("no container named %q found in pod spec", commonconsts.MainContainerName)
89
90
91
92
93
94
95
96
97
98
99
100
	}
	if reader == nil {
		return fmt.Errorf("checkpoint client is required")
	}
	if err := snapshotprotocol.PrepareRestorePodSpecForCheckpoint(
		ctx,
		reader,
		namespace,
		podSpec,
		mainContainer,
		info.Hash,
		info.ArtifactVersion,
101
		snapshotprotocol.DefaultSeccompLocalhostProfile,
102
103
		info.Ready,
	); err != nil {
104
105
106
		return err
	}

107
108
	EnsurePodInfoVolume(podSpec)
	EnsurePodInfoMount(mainContainer)
109
	if info.Ready && info.GPUMemoryService != nil && info.GPUMemoryService.Enabled {
110
		storage, err := snapshotprotocol.DiscoverAndResolveStorage(
111
112
113
114
115
116
117
118
119
			ctx,
			reader,
			namespace,
			info.Hash,
			info.ArtifactVersion,
		)
		if err != nil {
			return err
		}
120
		EnsureGMSRestoreSidecars(podSpec, mainContainer, storage)
121
122
	}

123
124
	return nil
}