checkpoint_job.go 5.39 KB
Newer Older
1
2
3
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

4
package controller
5
6

import (
7
	"context"
8
9
10
11
12
13
	"fmt"

	configv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/config/v1alpha1"
	nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
	"github.com/ai-dynamo/dynamo/deploy/operator/internal/checkpoint"
	"github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
14
	"github.com/ai-dynamo/dynamo/deploy/operator/internal/discovery"
15
	"github.com/ai-dynamo/dynamo/deploy/operator/internal/dra"
16
17
18
19
	"github.com/ai-dynamo/dynamo/deploy/operator/internal/dynamo"
	snapshotprotocol "github.com/ai-dynamo/dynamo/deploy/snapshot/protocol"
	batchv1 "k8s.io/api/batch/v1"
	corev1 "k8s.io/api/core/v1"
20
	ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
)

func buildCheckpointWorkerDefaultEnv(
	ckpt *nvidiacomv1alpha1.DynamoCheckpoint,
	podTemplate *corev1.PodTemplateSpec,
) []corev1.EnvVar {
	componentType := consts.ComponentTypeWorker
	dynamoNamespace := consts.GlobalDynamoNamespace
	parentGraphDeploymentName := podTemplate.Labels[consts.KubeLabelDynamoGraphDeploymentName]
	workerHashSuffix := podTemplate.Labels[consts.KubeLabelDynamoWorkerHash]
	discoveryBackend := configv1alpha1.DiscoveryBackendKubernetes

	if podTemplate.Labels[consts.KubeLabelDynamoNamespace] != "" {
		dynamoNamespace = podTemplate.Labels[consts.KubeLabelDynamoNamespace]
	}
	if podTemplate.Labels[consts.KubeLabelDynamoComponentType] != "" &&
		dynamo.IsWorkerComponent(podTemplate.Labels[consts.KubeLabelDynamoComponentType]) {
		componentType = podTemplate.Labels[consts.KubeLabelDynamoComponentType]
	}

	defaultContainer, _ := dynamo.NewWorkerDefaults().GetBaseContainer(dynamo.ComponentContext{
		ComponentType:                  componentType,
		DynamoNamespace:                dynamoNamespace,
		ParentGraphDeploymentName:      parentGraphDeploymentName,
		ParentGraphDeploymentNamespace: ckpt.Namespace,
46
47
48
49
50
		Discovery: dynamo.DiscoveryContext{
			Backend: discoveryBackend,
			Mode:    configv1alpha1.KubeDiscoveryModePod,
		},
		WorkerHashSuffix: workerHashSuffix,
51
52
53
54
	})
	return defaultContainer.Env
}

55
func buildCheckpointJob(
56
57
	ctx context.Context,
	reader ctrlclient.Reader,
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
	config *configv1alpha1.OperatorConfiguration,
	ckpt *nvidiacomv1alpha1.DynamoCheckpoint,
	jobName string,
) (*batchv1.Job, error) {
	podTemplate := ckpt.Spec.Job.PodTemplateSpec.DeepCopy()
	hash := ckpt.Status.IdentityHash
	if hash == "" {
		var err error
		hash, err = checkpoint.ComputeIdentityHash(ckpt.Spec.Identity)
		if err != nil {
			return nil, fmt.Errorf("failed to compute identity hash: %w", err)
		}
	}

	if podTemplate.Labels == nil {
		podTemplate.Labels = make(map[string]string)
	}
	if podTemplate.Annotations == nil {
		podTemplate.Annotations = make(map[string]string)
	}
78
79
80
	if podTemplate.Spec.ServiceAccountName == "" {
		podTemplate.Spec.ServiceAccountName = discovery.GetK8sDiscoveryServiceAccountName(ckpt.Name)
	}
81
82
83

	checkpoint.EnsurePodInfoVolume(&podTemplate.Spec)

84
85
	if len(podTemplate.Spec.Containers) == 0 {
		return nil, fmt.Errorf("checkpoint job requires at least one container")
86
	}
87
	mainContainer := &podTemplate.Spec.Containers[0]
88
89
90
91
92
	mainContainer.Env = dynamo.MergeEnvs(
		buildCheckpointWorkerDefaultEnv(ckpt, podTemplate),
		mainContainer.Env,
	)
	dynamo.AddStandardEnvVars(mainContainer, config)
93

94
95
	checkpoint.EnsurePodInfoMount(mainContainer)
	dynamo.ApplySharedMemoryVolumeAndMount(&podTemplate.Spec, mainContainer, ckpt.Spec.Job.SharedMemory)
96
97
	// NewCheckpointJob handles control volume + readiness probe from the
	// snapshot contract.
98
99

	if ckpt.Spec.GPUMemoryService != nil && ckpt.Spec.GPUMemoryService.Enabled {
100
101
102
103
104
		claimTemplateName := dra.ResourceClaimTemplateName("checkpoint-"+hash, "worker")
		if err := dra.ApplyClaim(&podTemplate.Spec, claimTemplateName); err != nil {
			return nil, fmt.Errorf("failed to apply DRA claim for GMS checkpoint: %w", err)
		}
		storage, err := snapshotprotocol.DiscoverAndResolveStorage(
105
106
107
108
109
110
111
112
113
			ctx,
			reader,
			ckpt.Namespace,
			hash,
			ckpt.Annotations[snapshotprotocol.CheckpointArtifactVersionAnnotation],
		)
		if err != nil {
			return nil, err
		}
114
		if err := checkpoint.EnsureGMSCheckpointJobSidecars(&podTemplate.Spec, mainContainer, storage); err != nil {
115
			return nil, err
116
117
118
119
120
121
122
123
124
		}
	}

	activeDeadlineSeconds := ckpt.Spec.Job.ActiveDeadlineSeconds
	if activeDeadlineSeconds == nil {
		defaultDeadline := int64(3600)
		activeDeadlineSeconds = &defaultDeadline
	}

125
126
127
128
129
130
131
132
133
134
	// Wrap with cuda-checkpoint --launch-job for multi-GPU jobs (TP*PP > 1).
	// Use checkpoint identity (not container limits) because DRA may have
	// already removed nvidia.com/gpu from the template.
	tp := ckpt.Spec.Identity.TensorParallelSize
	pp := ckpt.Spec.Identity.PipelineParallelSize
	if tp == 0 {
		tp = 1
	}
	if pp == 0 {
		pp = 1
135
	}
136
137
	wrapLaunchJob := tp*pp > 1

138
139
140
141
142
143
	ttlSecondsAfterFinish := snapshotprotocol.DefaultCheckpointJobTTLSeconds

	return snapshotprotocol.NewCheckpointJob(podTemplate, snapshotprotocol.CheckpointJobOptions{
		Namespace:             ckpt.Namespace,
		CheckpointID:          hash,
		ArtifactVersion:       snapshotprotocol.ArtifactVersion(ckpt.Annotations[snapshotprotocol.CheckpointArtifactVersionAnnotation]),
144
		SeccompProfile:        snapshotprotocol.DefaultSeccompLocalhostProfile,
145
146
147
148
149
150
		Name:                  jobName,
		ActiveDeadlineSeconds: activeDeadlineSeconds,
		TTLSecondsAfterFinish: &ttlSecondsAfterFinish,
		WrapLaunchJob:         wrapLaunchJob,
	})
}