podspec.go 3.32 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/*
 * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
 * SPDX-License-Identifier: Apache-2.0
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package checkpoint

import (
21
	"context"
22
23
24
	"fmt"

	commonconsts "github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
25
	snapshotprotocol "github.com/ai-dynamo/dynamo/deploy/snapshot/protocol"
26
	corev1 "k8s.io/api/core/v1"
27
	ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
28
29
30
)

func ApplyRestorePodMetadata(labels map[string]string, annotations map[string]string, checkpointInfo *CheckpointInfo) {
31
32
33
34
35
36
37
38
	enabled := checkpointInfo != nil && checkpointInfo.Enabled && checkpointInfo.Ready
	hash := ""
	artifactVersion := ""
	if enabled {
		hash = checkpointInfo.Hash
		artifactVersion = checkpointInfo.ArtifactVersion
	}
	snapshotprotocol.ApplyRestoreTargetMetadata(labels, annotations, enabled, hash, artifactVersion)
39
40
}

41
42
43
44
45
46
47
48
49
50
51
52
53
// resolveMainContainer finds the container named "main" in the pod spec.
// ExtraPodSpec.PodSpec.Containers can inject user containers before the main
// container (mergo merge happens before main is appended), so index 0 is
// not guaranteed to be the main container here.
func resolveMainContainer(podSpec *corev1.PodSpec) *corev1.Container {
	for i := range podSpec.Containers {
		if podSpec.Containers[i].Name == commonconsts.MainContainerName {
			return &podSpec.Containers[i]
		}
	}
	return nil
}

54
func InjectCheckpointIntoPodSpec(
55
56
57
	ctx context.Context,
	reader ctrlclient.Reader,
	namespace string,
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
	podSpec *corev1.PodSpec,
	checkpointInfo *CheckpointInfo,
) error {
	if checkpointInfo == nil || !checkpointInfo.Enabled {
		return nil
	}

	info := checkpointInfo
	if info.Hash == "" {
		if info.Identity == nil {
			return fmt.Errorf("checkpoint enabled but identity is nil and hash is not set")
		}

		hash, err := ComputeIdentityHash(*info.Identity)
		if err != nil {
			return fmt.Errorf("failed to compute identity hash: %w", err)
		}
		info.Hash = hash
	}

78
	mainContainer := resolveMainContainer(podSpec)
79
	if mainContainer == nil {
80
		return fmt.Errorf("no container named %q found in pod spec", commonconsts.MainContainerName)
81
82
83
84
85
86
87
88
89
90
91
92
	}
	if reader == nil {
		return fmt.Errorf("checkpoint client is required")
	}
	if err := snapshotprotocol.PrepareRestorePodSpecForCheckpoint(
		ctx,
		reader,
		namespace,
		podSpec,
		mainContainer,
		info.Hash,
		info.ArtifactVersion,
93
		snapshotprotocol.DefaultSeccompLocalhostProfile,
94
95
		info.Ready,
	); err != nil {
96
97
98
		return err
	}

99
100
	EnsurePodInfoVolume(podSpec)
	EnsurePodInfoMount(mainContainer)
101
	if info.Ready && info.GPUMemoryService != nil && info.GPUMemoryService.Enabled {
102
		storage, err := snapshotprotocol.DiscoverAndResolveStorage(
103
104
105
106
107
108
109
110
111
			ctx,
			reader,
			namespace,
			info.Hash,
			info.ArtifactVersion,
		)
		if err != nil {
			return err
		}
112
		EnsureGMSRestoreSidecars(podSpec, mainContainer, storage)
113
114
	}

115
116
	return nil
}