observation_test.go 3.07 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package checkpointjob

import (
	"testing"

	snapshotprotocol "github.com/ai-dynamo/dynamo/deploy/snapshot/protocol"
	batchv1 "k8s.io/api/batch/v1"
	corev1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestObserve(t *testing.T) {
	makeJob := func(annotation string, conditions ...batchv1.JobCondition) *batchv1.Job {
		job := &batchv1.Job{
			ObjectMeta: metav1.ObjectMeta{
				Annotations: map[string]string{},
			},
			Status: batchv1.JobStatus{
				Conditions: conditions,
			},
		}
		if annotation != "" {
			job.Annotations[snapshotprotocol.CheckpointStatusAnnotation] = annotation
		}
		return job
	}

	tests := []struct {
		name                   string
		job                    *batchv1.Job
		checkpointWorkerActive bool
		wantPhase              ObservationPhase
		wantReason             string
		wantMessage            string
	}{
		{
			name:      "running job stays running",
			job:       makeJob(""),
			wantPhase: ObservationPhaseRunning,
		},
		{
			name: "completed job with completion annotation is ready",
			job: makeJob(
				snapshotprotocol.CheckpointStatusCompleted,
				batchv1.JobCondition{Type: batchv1.JobComplete, Status: corev1.ConditionTrue},
			),
			wantPhase:   ObservationPhaseReady,
			wantReason:  "JobSucceeded",
			wantMessage: "Checkpoint job completed successfully",
		},
		{
			name: "completed job waits for terminal confirmation while worker is active",
			job: makeJob(
				"",
				batchv1.JobCondition{Type: batchv1.JobComplete, Status: corev1.ConditionTrue},
			),
			checkpointWorkerActive: true,
			wantPhase:              ObservationPhaseWaitingForConfirmation,
		},
		{
			name: "completed job fails without confirmation once worker is inactive",
			job: makeJob(
				"",
				batchv1.JobCondition{Type: batchv1.JobComplete, Status: corev1.ConditionTrue},
			),
			wantPhase:   ObservationPhaseFailed,
			wantReason:  "CheckpointVerificationFailed",
			wantMessage: "Checkpoint job completed without snapshot-agent completion confirmation",
		},
		{
			name: "failed checkpoint annotation wins over completed job",
			job: makeJob(
				snapshotprotocol.CheckpointStatusFailed,
				batchv1.JobCondition{Type: batchv1.JobComplete, Status: corev1.ConditionTrue},
			),
			checkpointWorkerActive: true,
			wantPhase:              ObservationPhaseFailed,
			wantReason:             "CheckpointVerificationFailed",
			wantMessage:            "Checkpoint job completed but snapshot-agent reported checkpoint failure",
		},
	}

	for _, tc := range tests {
		t.Run(tc.name, func(t *testing.T) {
			observation := Observe(tc.job, tc.checkpointWorkerActive)
			if observation.Phase != tc.wantPhase {
				t.Fatalf("phase = %q, want %q", observation.Phase, tc.wantPhase)
			}
			if observation.Reason != tc.wantReason {
				t.Fatalf("reason = %q, want %q", observation.Reason, tc.wantReason)
			}
			if observation.Message != tc.wantMessage {
				t.Fatalf("message = %q, want %q", observation.Message, tc.wantMessage)
			}
		})
	}
}