observation.go 2.25 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package checkpointjob

import (
	snapshotprotocol "github.com/ai-dynamo/dynamo/deploy/snapshot/protocol"
	batchv1 "k8s.io/api/batch/v1"
	corev1 "k8s.io/api/core/v1"
)

type ObservationPhase string

const (
	ObservationPhaseRunning                ObservationPhase = "running"
	ObservationPhaseWaitingForConfirmation ObservationPhase = "waiting_for_confirmation"
	ObservationPhaseReady                  ObservationPhase = "ready"
	ObservationPhaseFailed                 ObservationPhase = "failed"
)

type Observation struct {
	Phase   ObservationPhase
	Reason  string
	Message string
}

func Observe(job *batchv1.Job, checkpointWorkerActive bool) Observation {
	jobComplete := false
	jobFailed := false
	for _, condition := range job.Status.Conditions {
		if condition.Status != corev1.ConditionTrue {
			continue
		}
		if condition.Type == batchv1.JobComplete {
			jobComplete = true
			continue
		}
		if condition.Type == batchv1.JobFailed {
			jobFailed = true
		}
	}

	status := job.Annotations[snapshotprotocol.CheckpointStatusAnnotation]
	if status == snapshotprotocol.CheckpointStatusFailed {
		observation := Observation{
			Phase:   ObservationPhaseFailed,
			Reason:  "JobFailed",
			Message: "Checkpoint job failed",
		}
		if jobComplete {
			observation.Reason = "CheckpointVerificationFailed"
			observation.Message = "Checkpoint job completed but snapshot-agent reported checkpoint failure"
		}
		return observation
	}

	if jobComplete {
		if status == snapshotprotocol.CheckpointStatusCompleted {
			return Observation{
				Phase:   ObservationPhaseReady,
				Reason:  "JobSucceeded",
				Message: "Checkpoint job completed successfully",
			}
		}
		if checkpointWorkerActive {
			return Observation{Phase: ObservationPhaseWaitingForConfirmation}
		}
		return Observation{
			Phase:   ObservationPhaseFailed,
			Reason:  "CheckpointVerificationFailed",
			Message: "Checkpoint job completed without snapshot-agent completion confirmation",
		}
	}

	if jobFailed {
		return Observation{
			Phase:   ObservationPhaseFailed,
			Reason:  "JobFailed",
			Message: "Checkpoint job failed",
		}
	}

	return Observation{Phase: ObservationPhaseRunning}
}