checkpoint_job.go 5.43 KB
Newer Older
1
2
3
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

4
package controller
5
6

import (
7
	"context"
8
9
10
11
12
13
	"fmt"

	configv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/config/v1alpha1"
	nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
	"github.com/ai-dynamo/dynamo/deploy/operator/internal/checkpoint"
	"github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
14
	"github.com/ai-dynamo/dynamo/deploy/operator/internal/discovery"
15
16
17
18
19
	"github.com/ai-dynamo/dynamo/deploy/operator/internal/dynamo"
	snapshotprotocol "github.com/ai-dynamo/dynamo/deploy/snapshot/protocol"
	batchv1 "k8s.io/api/batch/v1"
	corev1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/api/resource"
20
	ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
)

func buildCheckpointWorkerDefaultEnv(
	ckpt *nvidiacomv1alpha1.DynamoCheckpoint,
	podTemplate *corev1.PodTemplateSpec,
) []corev1.EnvVar {
	componentType := consts.ComponentTypeWorker
	dynamoNamespace := consts.GlobalDynamoNamespace
	parentGraphDeploymentName := podTemplate.Labels[consts.KubeLabelDynamoGraphDeploymentName]
	workerHashSuffix := podTemplate.Labels[consts.KubeLabelDynamoWorkerHash]
	discoveryBackend := configv1alpha1.DiscoveryBackendKubernetes

	if podTemplate.Labels[consts.KubeLabelDynamoNamespace] != "" {
		dynamoNamespace = podTemplate.Labels[consts.KubeLabelDynamoNamespace]
	}
	if podTemplate.Labels[consts.KubeLabelDynamoComponentType] != "" &&
		dynamo.IsWorkerComponent(podTemplate.Labels[consts.KubeLabelDynamoComponentType]) {
		componentType = podTemplate.Labels[consts.KubeLabelDynamoComponentType]
	}

	defaultContainer, _ := dynamo.NewWorkerDefaults().GetBaseContainer(dynamo.ComponentContext{
		ComponentType:                  componentType,
		DynamoNamespace:                dynamoNamespace,
		ParentGraphDeploymentName:      parentGraphDeploymentName,
		ParentGraphDeploymentNamespace: ckpt.Namespace,
46
47
48
49
50
		Discovery: dynamo.DiscoveryContext{
			Backend: discoveryBackend,
			Mode:    configv1alpha1.KubeDiscoveryModePod,
		},
		WorkerHashSuffix: workerHashSuffix,
51
52
53
54
	})
	return defaultContainer.Env
}

55
func buildCheckpointJob(
56
57
	ctx context.Context,
	reader ctrlclient.Reader,
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
	config *configv1alpha1.OperatorConfiguration,
	ckpt *nvidiacomv1alpha1.DynamoCheckpoint,
	jobName string,
) (*batchv1.Job, error) {
	podTemplate := ckpt.Spec.Job.PodTemplateSpec.DeepCopy()
	hash := ckpt.Status.IdentityHash
	if hash == "" {
		var err error
		hash, err = checkpoint.ComputeIdentityHash(ckpt.Spec.Identity)
		if err != nil {
			return nil, fmt.Errorf("failed to compute identity hash: %w", err)
		}
	}

	if podTemplate.Labels == nil {
		podTemplate.Labels = make(map[string]string)
	}
	if podTemplate.Annotations == nil {
		podTemplate.Annotations = make(map[string]string)
	}
78
79
80
	if podTemplate.Spec.ServiceAccountName == "" {
		podTemplate.Spec.ServiceAccountName = discovery.GetK8sDiscoveryServiceAccountName(ckpt.Name)
	}
81
82
83

	checkpoint.EnsurePodInfoVolume(&podTemplate.Spec)

84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
	mainContainer, err := snapshotprotocol.ResolveCheckpointWorkerContainer(&podTemplate.Spec)
	if err != nil {
		return nil, err
	}
	mainContainer.Env = dynamo.MergeEnvs(
		buildCheckpointWorkerDefaultEnv(ckpt, podTemplate),
		mainContainer.Env,
	)
	dynamo.AddStandardEnvVars(mainContainer, config)
	mainContainer.Env = append(mainContainer.Env, corev1.EnvVar{
		Name:  consts.EnvReadyForCheckpointFile,
		Value: config.Checkpoint.ReadyForCheckpointFilePath,
	})
	mainContainer.ReadinessProbe = &corev1.Probe{
		ProbeHandler: corev1.ProbeHandler{
			Exec: &corev1.ExecAction{
				Command: []string{"cat", config.Checkpoint.ReadyForCheckpointFilePath},
101
			},
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
		},
		InitialDelaySeconds: 15,
		PeriodSeconds:       2,
	}
	mainContainer.LivenessProbe = nil
	mainContainer.StartupProbe = nil
	checkpoint.EnsurePodInfoMount(mainContainer)
	dynamo.ApplySharedMemoryVolumeAndMount(&podTemplate.Spec, mainContainer, ckpt.Spec.Job.SharedMemory)

	var gmsSidecars []corev1.Container
	if ckpt.Spec.GPUMemoryService != nil && ckpt.Spec.GPUMemoryService.Enabled {
		storage, err := checkpoint.ResolveGMSCheckpointStorage(
			ctx,
			reader,
			ckpt.Namespace,
			hash,
			ckpt.Annotations[snapshotprotocol.CheckpointArtifactVersionAnnotation],
		)
		if err != nil {
			return nil, err
		}
		gmsSidecars, err = checkpoint.BuildGMSCheckpointJobSidecars(&podTemplate.Spec, mainContainer, storage)
		if err != nil {
			return nil, err
126
127
		}
	}
128
	podTemplate.Spec.Containers = append(podTemplate.Spec.Containers, gmsSidecars...)
129
130
131
132
133
134
135
136

	activeDeadlineSeconds := ckpt.Spec.Job.ActiveDeadlineSeconds
	if activeDeadlineSeconds == nil {
		defaultDeadline := int64(3600)
		activeDeadlineSeconds = &defaultDeadline
	}

	wrapLaunchJob := false
137
138
	if gpus, ok := mainContainer.Resources.Limits[corev1.ResourceName(consts.KubeResourceGPUNvidia)]; ok {
		wrapLaunchJob = gpus.Cmp(*resource.NewQuantity(1, resource.DecimalSI)) > 0
139
140
141
142
143
144
145
	}
	ttlSecondsAfterFinish := snapshotprotocol.DefaultCheckpointJobTTLSeconds

	return snapshotprotocol.NewCheckpointJob(podTemplate, snapshotprotocol.CheckpointJobOptions{
		Namespace:             ckpt.Namespace,
		CheckpointID:          hash,
		ArtifactVersion:       snapshotprotocol.ArtifactVersion(ckpt.Annotations[snapshotprotocol.CheckpointArtifactVersionAnnotation]),
146
		SeccompProfile:        snapshotprotocol.DefaultSeccompLocalhostProfile,
147
148
149
150
151
152
		Name:                  jobName,
		ActiveDeadlineSeconds: activeDeadlineSeconds,
		TTLSecondsAfterFinish: &ttlSecondsAfterFinish,
		WrapLaunchJob:         wrapLaunchJob,
	})
}