"tests/entrypoints/instrumentator/test_sleep.py" did not exist on "74bc397b0a8c092089bdd21e3ec9130336797471"
cuda.go 6.5 KB
Newer Older
1
2
3
4
5
6
7
// Package cuda provides CUDA checkpoint and restore operations.
package cuda

import (
	"context"
	"fmt"
	"os/exec"
8
	"regexp"
9
10
11
12
13
14
15
16
17
18
	"strconv"
	"strings"

	"github.com/go-logr/logr"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"

	podresourcesv1 "k8s.io/kubelet/pkg/apis/podresources/v1"
)

19
20
21
22
const (
	nvidiaGPUResource  = "nvidia.com/gpu"
	nvidiaGPUDRADriver = "gpu.nvidia.com"
)
23
24

var podResourcesSocketPath = "/var/lib/kubelet/pod-resources/kubelet.sock"
25

26
27
28
29
var gpuUUIDPattern = regexp.MustCompile(`^GPU-[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}$`)

// GetPodGPUUUIDs resolves GPU UUIDs for a pod/container from kubelet
// PodResources (nvidia.com/gpu entries in GetDevices()).
30
31
32
33
34
func GetPodGPUUUIDs(ctx context.Context, podName, podNamespace, containerName string) ([]string, error) {
	if podName == "" || podNamespace == "" {
		return nil, nil
	}

35
36
	conn, err := grpc.NewClient(
		"unix://"+podResourcesSocketPath,
37
38
39
40
41
42
43
44
45
46
47
48
49
		grpc.WithTransportCredentials(insecure.NewCredentials()),
	)
	if err != nil {
		return nil, err
	}
	defer conn.Close()

	client := podresourcesv1.NewPodResourcesListerClient(conn)
	resp, err := client.List(ctx, &podresourcesv1.ListPodResourcesRequest{})
	if err != nil {
		return nil, err
	}

50
	var uuids []string
51
52
53
54
55
56
57
58
59
60
	for _, pod := range resp.GetPodResources() {
		if pod.GetName() != podName || pod.GetNamespace() != podNamespace {
			continue
		}
		for _, container := range pod.GetContainers() {
			if containerName != "" && container.GetName() != containerName {
				continue
			}
			for _, device := range container.GetDevices() {
				if device.GetResourceName() == nvidiaGPUResource {
61
					uuids = append(uuids, device.GetDeviceIds()...)
62
63
				}
			}
64

65
66
67
		}
	}

68
	return uuids, nil
69
70
}

71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
// GetGPUUUIDsViaNvidiaSmi discovers GPU UUIDs by running nvidia-smi inside the
// container's mount namespace. This is the fallback path when the kubelet
// PodResources API does not report GPU devices (e.g. when GPUs are allocated
// via DRA instead of the NVIDIA device plugin).
func GetGPUUUIDsViaNvidiaSmi(ctx context.Context, hostProcPath string, pid int) ([]string, error) {
	mountPath := fmt.Sprintf("%s/%d/ns/mnt", strings.TrimRight(hostProcPath, "/"), pid)
	cmd := exec.CommandContext(
		ctx,
		"nsenter",
		fmt.Sprintf("--mount=%s", mountPath),
		"--",
		"nvidia-smi", "--query-gpu=gpu_uuid", "--format=csv,noheader",
	)
	output, err := cmd.Output()
	if err != nil {
		return nil, fmt.Errorf("nvidia-smi via nsenter (pid %d) failed: %w", pid, err)
	}
	var uuids []string
	for _, line := range strings.Split(strings.TrimSpace(string(output)), "\n") {
		line = strings.TrimSpace(line)
		if line != "" {
			uuids = append(uuids, line)
		}
	}
	return uuids, nil
}

98
99
100
101
102
// FilterProcesses returns the subset of candidate PIDs that hold actual CUDA contexts.
// Uses --get-restore-tid (the same technique as the CRIU CUDA plugin) instead of
// --get-state, because --get-state incorrectly matches coordinator processes like
// cuda-checkpoint --launch-job that share a /proc namespace with CUDA processes but
// don't hold CUDA contexts themselves.
103
104
105
106
107
108
func FilterProcesses(ctx context.Context, allPIDs []int, log logr.Logger) []int {
	cudaPIDs := make([]int, 0, len(allPIDs))
	for _, pid := range allPIDs {
		if pid <= 0 {
			continue
		}
109
		cmd := exec.CommandContext(ctx, cudaCheckpointHelperBinary, "--get-restore-tid", "--pid", strconv.Itoa(pid))
110
111
		output, err := cmd.CombinedOutput()
		if err != nil {
112
113
114
			if ctx.Err() != nil {
				break
			}
115
			log.V(1).Info("CUDA restore-tid probe negative", "pid", pid)
116
117
			continue
		}
118
119
		tid := strings.TrimSpace(string(output))
		log.V(1).Info("CUDA restore-tid probe positive", "pid", pid, "tid", tid)
120
121
122
123
124
		cudaPIDs = append(cudaPIDs, pid)
	}
	return cudaPIDs
}

125
// BuildDeviceMap creates a cuda-checkpoint-helper --device-map value from source and target GPU UUID lists.
126
127
128
// When a source UUID exists in the target set, it maps to itself (identity mapping) to avoid
// unnecessary cross-GPU restore on same-node restores where kubelet returns GPUs in different order.
// Remaining unmatched source UUIDs are paired with remaining unmatched target UUIDs positionally.
129
func BuildDeviceMap(sourceUUIDs, targetUUIDs []string, log logr.Logger) (string, error) {
130
131
132
133
134
135
	if len(sourceUUIDs) != len(targetUUIDs) {
		return "", fmt.Errorf("GPU count mismatch: source has %d, target has %d", len(sourceUUIDs), len(targetUUIDs))
	}
	if len(sourceUUIDs) == 0 {
		return "", fmt.Errorf("GPU UUID list is empty")
	}
136
	log.V(1).Info("BuildDeviceMap inputs", "source_uuids", sourceUUIDs, "target_uuids", targetUUIDs)
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167

	targetSet := make(map[string]bool, len(targetUUIDs))
	for _, t := range targetUUIDs {
		targetSet[t] = true
	}

	// First pass: identity-map any source UUID that exists in the target set
	mapping := make(map[string]string, len(sourceUUIDs))
	usedTargets := make(map[string]bool, len(targetUUIDs))
	for _, src := range sourceUUIDs {
		if targetSet[src] {
			mapping[src] = src
			usedTargets[src] = true
		}
	}

	// Second pass: pair remaining source UUIDs with remaining target UUIDs positionally
	var remainingTargets []string
	for _, t := range targetUUIDs {
		if !usedTargets[t] {
			remainingTargets = append(remainingTargets, t)
		}
	}
	idx := 0
	for _, src := range sourceUUIDs {
		if _, ok := mapping[src]; !ok {
			mapping[src] = remainingTargets[idx]
			idx++
		}
	}

168
	pairs := make([]string, len(sourceUUIDs))
169
170
	for i, src := range sourceUUIDs {
		pairs[i] = src + "=" + mapping[src]
171
172
173
174
175
	}
	return strings.Join(pairs, ","), nil
}

// LockAndCheckpointProcessTree locks and checkpoints CUDA state for all given PIDs.
176
// On failure, the caller is expected to fail the operation and terminate the workload.
177
178
179
func LockAndCheckpointProcessTree(ctx context.Context, cudaPIDs []int, log logr.Logger) error {
	for _, pid := range cudaPIDs {
		if err := lock(ctx, pid, log); err != nil {
180
			return err
181
182
183
184
185
		}
	}

	for _, pid := range cudaPIDs {
		if err := checkpoint(ctx, pid, log); err != nil {
186
			return err
187
188
189
190
191
192
193
194
195
196
		}
	}

	return nil
}

// RestoreAndUnlockProcessTree restores and unlocks CUDA state for the given PIDs.
func RestoreAndUnlockProcessTree(ctx context.Context, cudaPIDs []int, deviceMap string, log logr.Logger) error {
	for _, pid := range cudaPIDs {
		if err := restoreProcess(ctx, pid, deviceMap, log); err != nil {
197
			return err
198
199
200
201
202
203
		}
	}
	for _, pid := range cudaPIDs {
		if err := unlock(ctx, pid, log); err != nil {
			state, stateErr := getState(ctx, pid)
			if stateErr == nil && state == "running" {
204
				log.Info("cuda-checkpoint-helper unlock returned error but process is already running", "pid", pid)
205
206
				continue
			}
207
			return err
208
209
210
211
		}
	}
	return nil
}