main.go 2.15 KB
Newer Older
1
// Package main provides the snapshot DaemonSet agent.
2
3
// The agent watches for pods with checkpoint/restore labels on its node
// and triggers operations via the orchestrators.
4
5
6
7
8
9
10
11
package main

import (
	"context"
	"os"
	"os/signal"
	"syscall"

12
13
14
	"github.com/containerd/containerd"
	"github.com/go-logr/logr"

15
16
17
	"github.com/ai-dynamo/dynamo/deploy/snapshot/pkg/common"
	"github.com/ai-dynamo/dynamo/deploy/snapshot/pkg/logging"
	"github.com/ai-dynamo/dynamo/deploy/snapshot/pkg/watcher"
18
19
20
)

func main() {
21
22
23
	rootLog := logging.ConfigureLogger("stdout")
	agentLog := rootLog.WithName("agent")

24
25
	cfg, err := LoadConfigOrDefault(ConfigMapPath)
	if err != nil {
26
		fatal(agentLog, err, "Failed to load configuration")
27
	}
28
29
	if err := cfg.Validate(); err != nil {
		fatal(agentLog, err, "Invalid configuration")
30
31
	}

32
	ctrd, err := containerd.New(common.ContainerdSocket)
33
	if err != nil {
34
		fatal(agentLog, err, "Failed to connect to containerd")
35
	}
36
	defer ctrd.Close()
37
38
39
40
41
42
43

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	sigChan := make(chan os.Signal, 1)
	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

44
	agentLog.Info("Starting snapshot agent",
45
46
47
48
		"node", cfg.NodeName,
		"checkpoint_dir", cfg.BasePath,
		"watch_namespace", cfg.RestrictedNamespace,
	)
49

50
51
52
53
	podWatcher, err := watcher.NewWatcher(cfg, ctrd, rootLog.WithName("watcher"))
	if err != nil {
		fatal(agentLog, err, "Failed to create pod watcher")
	}
54

55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
	// Run watcher in the background
	watcherDone := make(chan error, 1)
	go func() {
		agentLog.Info("Pod watcher started")
		watcherDone <- podWatcher.Start(ctx)
	}()

	// Wait for signal or watcher exit
	select {
	case <-sigChan:
		agentLog.Info("Shutting down")
		cancel()
		select {
		case err := <-watcherDone:
			if err != nil {
				agentLog.Error(err, "Pod watcher exited with error during shutdown")
71
			}
72
		default:
73
		}
74
	case err := <-watcherDone:
75
		if err != nil {
76
			fatal(agentLog, err, "Pod watcher exited with error")
77
		}
78
	}
79

80
81
	agentLog.Info("Agent stopped")
}
82

83
84
85
86
87
func fatal(log logr.Logger, err error, msg string, keysAndValues ...interface{}) {
	if err != nil {
		log.Error(err, msg, keysAndValues...)
	} else {
		log.Info(msg, keysAndValues...)
88
	}
89
	os.Exit(1)
90
}