"examples/backends/vllm/launch/agg_omni_image.sh" did not exist on "a77558d442bb35f577cf20be02d7d2b59562cea6"
checkpoint_observation_test.go 3.03 KB
Newer Older
1
2
3
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

4
package protocol
5
6
7
8
9
10
11
12
13

import (
	"testing"

	batchv1 "k8s.io/api/batch/v1"
	corev1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

14
func TestObserveCheckpointJob(t *testing.T) {
15
16
17
18
19
20
21
22
23
24
	makeJob := func(annotation string, conditions ...batchv1.JobCondition) *batchv1.Job {
		job := &batchv1.Job{
			ObjectMeta: metav1.ObjectMeta{
				Annotations: map[string]string{},
			},
			Status: batchv1.JobStatus{
				Conditions: conditions,
			},
		}
		if annotation != "" {
25
			job.Annotations[CheckpointStatusAnnotation] = annotation
26
27
28
29
30
31
32
33
		}
		return job
	}

	tests := []struct {
		name                   string
		job                    *batchv1.Job
		checkpointWorkerActive bool
34
		wantPhase              CheckpointObservationPhase
35
36
37
38
39
40
		wantReason             string
		wantMessage            string
	}{
		{
			name:      "running job stays running",
			job:       makeJob(""),
41
			wantPhase: CheckpointObservationPhaseRunning,
42
43
44
45
		},
		{
			name: "completed job with completion annotation is ready",
			job: makeJob(
46
				CheckpointStatusCompleted,
47
48
				batchv1.JobCondition{Type: batchv1.JobComplete, Status: corev1.ConditionTrue},
			),
49
			wantPhase:   CheckpointObservationPhaseReady,
50
51
52
53
54
55
56
57
58
59
			wantReason:  "JobSucceeded",
			wantMessage: "Checkpoint job completed successfully",
		},
		{
			name: "completed job waits for terminal confirmation while worker is active",
			job: makeJob(
				"",
				batchv1.JobCondition{Type: batchv1.JobComplete, Status: corev1.ConditionTrue},
			),
			checkpointWorkerActive: true,
60
			wantPhase:              CheckpointObservationPhaseWaitingForConfirmation,
61
62
63
64
65
66
67
		},
		{
			name: "completed job fails without confirmation once worker is inactive",
			job: makeJob(
				"",
				batchv1.JobCondition{Type: batchv1.JobComplete, Status: corev1.ConditionTrue},
			),
68
			wantPhase:   CheckpointObservationPhaseFailed,
69
70
71
72
73
74
			wantReason:  "CheckpointVerificationFailed",
			wantMessage: "Checkpoint job completed without snapshot-agent completion confirmation",
		},
		{
			name: "failed checkpoint annotation wins over completed job",
			job: makeJob(
75
				CheckpointStatusFailed,
76
77
78
				batchv1.JobCondition{Type: batchv1.JobComplete, Status: corev1.ConditionTrue},
			),
			checkpointWorkerActive: true,
79
			wantPhase:              CheckpointObservationPhaseFailed,
80
81
82
83
84
85
86
			wantReason:             "CheckpointVerificationFailed",
			wantMessage:            "Checkpoint job completed but snapshot-agent reported checkpoint failure",
		},
	}

	for _, tc := range tests {
		t.Run(tc.name, func(t *testing.T) {
87
			observation := ObserveCheckpointJob(tc.job, tc.checkpointWorkerActive)
88
89
90
91
92
93
94
95
96
97
98
99
			if observation.Phase != tc.wantPhase {
				t.Fatalf("phase = %q, want %q", observation.Phase, tc.wantPhase)
			}
			if observation.Reason != tc.wantReason {
				t.Fatalf("reason = %q, want %q", observation.Reason, tc.wantReason)
			}
			if observation.Message != tc.wantMessage {
				t.Fatalf("message = %q, want %q", observation.Message, tc.wantMessage)
			}
		})
	}
}