"components/vscode:/vscode.git/clone" did not exist on "100819299f6b7fa55b8b56e368ddc9f863a9fb48"
checkpoint_job.go 6.43 KB
Newer Older
1
2
3
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

4
package controller
5
6

import (
7
	"context"
8
	"fmt"
9
	"strings"
10
11
12
13
14

	configv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/config/v1alpha1"
	nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
	"github.com/ai-dynamo/dynamo/deploy/operator/internal/checkpoint"
	"github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
15
	"github.com/ai-dynamo/dynamo/deploy/operator/internal/discovery"
16
	"github.com/ai-dynamo/dynamo/deploy/operator/internal/dra"
17
18
19
20
	"github.com/ai-dynamo/dynamo/deploy/operator/internal/dynamo"
	snapshotprotocol "github.com/ai-dynamo/dynamo/deploy/snapshot/protocol"
	batchv1 "k8s.io/api/batch/v1"
	corev1 "k8s.io/api/core/v1"
21
	ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
)

func buildCheckpointWorkerDefaultEnv(
	ckpt *nvidiacomv1alpha1.DynamoCheckpoint,
	podTemplate *corev1.PodTemplateSpec,
) []corev1.EnvVar {
	componentType := consts.ComponentTypeWorker
	dynamoNamespace := consts.GlobalDynamoNamespace
	parentGraphDeploymentName := podTemplate.Labels[consts.KubeLabelDynamoGraphDeploymentName]
	workerHashSuffix := podTemplate.Labels[consts.KubeLabelDynamoWorkerHash]
	discoveryBackend := configv1alpha1.DiscoveryBackendKubernetes

	if podTemplate.Labels[consts.KubeLabelDynamoNamespace] != "" {
		dynamoNamespace = podTemplate.Labels[consts.KubeLabelDynamoNamespace]
	}
	if podTemplate.Labels[consts.KubeLabelDynamoComponentType] != "" &&
		dynamo.IsWorkerComponent(podTemplate.Labels[consts.KubeLabelDynamoComponentType]) {
		componentType = podTemplate.Labels[consts.KubeLabelDynamoComponentType]
	}

	defaultContainer, _ := dynamo.NewWorkerDefaults().GetBaseContainer(dynamo.ComponentContext{
		ComponentType:                  componentType,
		DynamoNamespace:                dynamoNamespace,
		ParentGraphDeploymentName:      parentGraphDeploymentName,
		ParentGraphDeploymentNamespace: ckpt.Namespace,
47
48
49
50
51
		Discovery: dynamo.DiscoveryContext{
			Backend: discoveryBackend,
			Mode:    configv1alpha1.KubeDiscoveryModePod,
		},
		WorkerHashSuffix: workerHashSuffix,
52
53
54
55
	})
	return defaultContainer.Env
}

56
func buildCheckpointJob(
57
58
	ctx context.Context,
	reader ctrlclient.Reader,
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
	config *configv1alpha1.OperatorConfiguration,
	ckpt *nvidiacomv1alpha1.DynamoCheckpoint,
	jobName string,
) (*batchv1.Job, error) {
	podTemplate := ckpt.Spec.Job.PodTemplateSpec.DeepCopy()
	hash := ckpt.Status.IdentityHash
	if hash == "" {
		var err error
		hash, err = checkpoint.ComputeIdentityHash(ckpt.Spec.Identity)
		if err != nil {
			return nil, fmt.Errorf("failed to compute identity hash: %w", err)
		}
	}

	if podTemplate.Labels == nil {
		podTemplate.Labels = make(map[string]string)
	}
	if podTemplate.Annotations == nil {
		podTemplate.Annotations = make(map[string]string)
	}
79
80
81
	if podTemplate.Spec.ServiceAccountName == "" {
		podTemplate.Spec.ServiceAccountName = discovery.GetK8sDiscoveryServiceAccountName(ckpt.Name)
	}
82
83
84

	checkpoint.EnsurePodInfoVolume(&podTemplate.Spec)

85
86
	if len(podTemplate.Spec.Containers) == 0 {
		return nil, fmt.Errorf("checkpoint job requires at least one container")
87
	}
88
	mainContainer := &podTemplate.Spec.Containers[0]
89
90
91
92
93
94
95
96
97
98
99
100
101
	mainContainer.Env = dynamo.MergeEnvs(
		buildCheckpointWorkerDefaultEnv(ckpt, podTemplate),
		mainContainer.Env,
	)
	dynamo.AddStandardEnvVars(mainContainer, config)
	mainContainer.Env = append(mainContainer.Env, corev1.EnvVar{
		Name:  consts.EnvReadyForCheckpointFile,
		Value: config.Checkpoint.ReadyForCheckpointFilePath,
	})
	mainContainer.ReadinessProbe = &corev1.Probe{
		ProbeHandler: corev1.ProbeHandler{
			Exec: &corev1.ExecAction{
				Command: []string{"cat", config.Checkpoint.ReadyForCheckpointFilePath},
102
			},
103
104
105
106
107
108
		},
		InitialDelaySeconds: 15,
		PeriodSeconds:       2,
	}
	mainContainer.LivenessProbe = nil
	mainContainer.StartupProbe = nil
109
110

	// The snapshot agent sends SIGUSR1 to PID 1 of the main container after
111
112
113
114
	checkpoint.EnsurePodInfoMount(mainContainer)
	dynamo.ApplySharedMemoryVolumeAndMount(&podTemplate.Spec, mainContainer, ckpt.Spec.Job.SharedMemory)

	if ckpt.Spec.GPUMemoryService != nil && ckpt.Spec.GPUMemoryService.Enabled {
115
116
117
118
119
		claimTemplateName := dra.ResourceClaimTemplateName("checkpoint-"+hash, "worker")
		if err := dra.ApplyClaim(&podTemplate.Spec, claimTemplateName); err != nil {
			return nil, fmt.Errorf("failed to apply DRA claim for GMS checkpoint: %w", err)
		}
		storage, err := snapshotprotocol.DiscoverAndResolveStorage(
120
121
122
123
124
125
126
127
128
			ctx,
			reader,
			ckpt.Namespace,
			hash,
			ckpt.Annotations[snapshotprotocol.CheckpointArtifactVersionAnnotation],
		)
		if err != nil {
			return nil, err
		}
129
		if err := checkpoint.EnsureGMSCheckpointJobSidecars(&podTemplate.Spec, mainContainer, storage); err != nil {
130
			return nil, err
131
		}
132
133
134
		// Re-acquire pointer: append in EnsureGMSCheckpointJobSidecars may
		// have reallocated the Containers slice.
		mainContainer = &podTemplate.Spec.Containers[0]
135
136
137
138
139
140
141
142
	}

	activeDeadlineSeconds := ckpt.Spec.Job.ActiveDeadlineSeconds
	if activeDeadlineSeconds == nil {
		defaultDeadline := int64(3600)
		activeDeadlineSeconds = &defaultDeadline
	}

143
144
145
146
147
148
149
150
151
152
	// Wrap with cuda-checkpoint --launch-job for multi-GPU jobs (TP*PP > 1).
	// Use checkpoint identity (not container limits) because DRA may have
	// already removed nvidia.com/gpu from the template.
	tp := ckpt.Spec.Identity.TensorParallelSize
	pp := ckpt.Spec.Identity.PipelineParallelSize
	if tp == 0 {
		tp = 1
	}
	if pp == 0 {
		pp = 1
153
	}
154
155
156
157
158
159
160
161
162
163
164
165
	wrapLaunchJob := tp*pp > 1

	// For single-GPU jobs (no cuda-checkpoint wrapper), unwrap /bin/sh -c so
	// the actual process is PID 1 and receives SIGUSR1 from the snapshot agent.
	if !wrapLaunchJob && len(mainContainer.Command) >= 2 &&
		mainContainer.Command[len(mainContainer.Command)-1] == "-c" &&
		len(mainContainer.Args) == 1 {
		parts := strings.Fields(mainContainer.Args[0])
		mainContainer.Command = parts[:1]
		mainContainer.Args = parts[1:]
	}

166
167
168
169
170
171
	ttlSecondsAfterFinish := snapshotprotocol.DefaultCheckpointJobTTLSeconds

	return snapshotprotocol.NewCheckpointJob(podTemplate, snapshotprotocol.CheckpointJobOptions{
		Namespace:             ckpt.Namespace,
		CheckpointID:          hash,
		ArtifactVersion:       snapshotprotocol.ArtifactVersion(ckpt.Annotations[snapshotprotocol.CheckpointArtifactVersionAnnotation]),
172
		SeccompProfile:        snapshotprotocol.DefaultSeccompLocalhostProfile,
173
174
175
176
177
178
		Name:                  jobName,
		ActiveDeadlineSeconds: activeDeadlineSeconds,
		TTLSecondsAfterFinish: &ttlSecondsAfterFinish,
		WrapLaunchJob:         wrapLaunchJob,
	})
}