"deploy/cloud/vscode:/vscode.git/clone" did not exist on "9b893c9340daa1e6b55c5dae34153964ecd4316e"
gms.go 4.28 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
/*
 * SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
 * SPDX-License-Identifier: Apache-2.0
 */

package checkpoint

import (
	"fmt"
	"path/filepath"

12
	gms "github.com/ai-dynamo/dynamo/deploy/operator/internal/gms"
13
14
	snapshotprotocol "github.com/ai-dynamo/dynamo/deploy/snapshot/protocol"
	corev1 "k8s.io/api/core/v1"
15
	"k8s.io/utils/ptr"
16
17
18
19
20
21
22
23
24
)

const (
	GMSLoaderContainer = "gms-loader"
	GMSSaverContainer  = "gms-saver"

	gmsCheckpointLoaderModule = "gpu_memory_service.cli.snapshot.loader"
	gmsCheckpointSaverModule  = "gpu_memory_service.cli.snapshot.saver"

25
26
27
28
	// envCheckpointDir is the environment variable name for the GMS
	// checkpoint artifact directory on the snapshot PVC.
	envCheckpointDir = "GMS_CHECKPOINT_DIR"
)
29

30
31
32
33
34
// EnsureGMSRestoreSidecars adds GMS server + loader containers to the pod spec
// for a checkpoint restore. The server runs as a regular container (not init)
// because the CRIU-restored main process already has GPU memory mapped and
// all containers must start in parallel.
func EnsureGMSRestoreSidecars(
35
36
37
	podSpec *corev1.PodSpec,
	mainContainer *corev1.Container,
	storage snapshotprotocol.Storage,
38
) {
39
	if podSpec == nil || mainContainer == nil {
40
		return
41
42
	}

43
44
45
	// The DGD path adds the GMS server as an init sidecar (blocks until
	// sockets are ready). For restore, move it to a regular container so
	// all containers start in parallel.
46
	for i := range podSpec.InitContainers {
47
		if podSpec.InitContainers[i].Name == gms.ServerContainerName {
48
49
50
51
			podSpec.InitContainers = append(podSpec.InitContainers[:i], podSpec.InitContainers[i+1:]...)
			break
		}
	}
52
53
54
	gms.EnsureSharedVolume(podSpec, mainContainer)

	snapshotprotocol.InjectCheckpointVolume(podSpec, storage.PVCName)
55

56
57
	server := gms.Container(gms.ServerContainerName, gms.ServerModule, mainContainer.Image)
	server.RestartPolicy = ptr.To(corev1.ContainerRestartPolicyAlways)
58

59
	loader := gms.Container(GMSLoaderContainer, gmsCheckpointLoaderModule, mainContainer.Image)
60
	loader.VolumeMounts = append(loader.VolumeMounts, corev1.VolumeMount{Name: snapshotprotocol.CheckpointVolumeName, MountPath: storage.BasePath})
61
62
	loader.Env = append(loader.Env, corev1.EnvVar{Name: envCheckpointDir, Value: resolveGMSArtifactDir(storage)})
	loader.RestartPolicy = ptr.To(corev1.ContainerRestartPolicyAlways)
63

64
	podSpec.InitContainers = append(podSpec.InitContainers, server, loader)
65
66
}

67
68
69
// EnsureGMSCheckpointJobSidecars adds GMS server (init) + saver containers
// to the pod spec for a checkpoint job.
func EnsureGMSCheckpointJobSidecars(
70
71
72
	podSpec *corev1.PodSpec,
	mainContainer *corev1.Container,
	storage snapshotprotocol.Storage,
73
) error {
74
	if podSpec == nil || mainContainer == nil {
75
		return nil
76
77
	}
	if len(mainContainer.Resources.Claims) == 0 {
78
		return fmt.Errorf("gms sidecars require main container resource claims (DRA must be enabled)")
79
80
	}
	if storage.PVCName == "" || storage.BasePath == "" || storage.Location == "" {
81
		return fmt.Errorf("gms checkpoint jobs require resolved checkpoint storage")
82
83
	}

84
	gmsArtifactDir := resolveGMSArtifactDir(storage)
85

86
87
	gms.EnsureServerSidecar(podSpec, mainContainer)
	snapshotprotocol.InjectCheckpointVolume(podSpec, storage.PVCName)
88

89
90
91
92
93
94
95
96
97
	saver := gms.Container(GMSSaverContainer, gmsCheckpointSaverModule, mainContainer.Image)
	saver.VolumeMounts = append(saver.VolumeMounts, corev1.VolumeMount{Name: snapshotprotocol.CheckpointVolumeName, MountPath: storage.BasePath})
	saver.Env = append(saver.Env, corev1.EnvVar{Name: envCheckpointDir, Value: gmsArtifactDir})
	// The saver is an init sidecar (restartPolicy=Always) so it doesn't
	// affect pod Ready (only the worker's probe matters) and doesn't block
	// Job completion. It saves, then sleeps until the pod terminates.
	saver.RestartPolicy = ptr.To(corev1.ContainerRestartPolicyAlways)
	podSpec.InitContainers = append(podSpec.InitContainers, saver)
	return nil
98
99
100
101
102
103
104
105
106
107
}

func resolveGMSArtifactDir(storage snapshotprotocol.Storage) string {
	// GMS data lives under /checkpoints/gms/<hash>/versions/<version>
	// separate from the CRIU tree (/checkpoints/<hash>/versions/<version>)
	// so the non-root saver can create directories at the PVC root.
	artifactVersion := filepath.Base(storage.Location)
	checkpointID := filepath.Base(filepath.Dir(filepath.Dir(storage.Location)))
	return filepath.Join(storage.BasePath, "gms", checkpointID, "versions", artifactVersion)
}