"vllm/tool_parsers/deepseekv31_tool_parser.py" did not exist on "4fc1bf813ad80172c1db31264beaef7d93fe0601"
kube.go 3.5 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package main

import (
	"context"
	"fmt"
	"os"
	"strings"
	"time"

	corev1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/tools/clientcmd"
	"sigs.k8s.io/yaml"

	snapshotprotocol "github.com/ai-dynamo/dynamo/deploy/snapshot/protocol"
)

func loadRunContext(ctx context.Context, manifestPath string, namespaceOverride string, kubeContext string) (*corev1.Pod, kubernetes.Interface, string, snapshotprotocol.Storage, error) {
	pod, err := loadPod(manifestPath)
	if err != nil {
		return nil, nil, "", snapshotprotocol.Storage{}, err
	}

	clientset, currentNamespace, err := loadClientset(kubeContext)
	if err != nil {
		return nil, nil, "", snapshotprotocol.Storage{}, err
	}

	namespace := currentNamespace
	if namespace == "" {
		namespace = corev1.NamespaceDefault
	}
	if pod.Namespace != "" {
		namespace = pod.Namespace
	}
	if namespaceOverride != "" {
		namespace = namespaceOverride
	}

	storage, err := discoverSnapshotStorage(ctx, clientset, namespace)
	if err != nil {
		return nil, nil, "", snapshotprotocol.Storage{}, err
	}
	return pod, clientset, namespace, storage, nil
}

func loadClientset(kubeContext string) (kubernetes.Interface, string, error) {
	loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
	clientConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, &clientcmd.ConfigOverrides{
		CurrentContext: strings.TrimSpace(kubeContext),
	})
	restConfig, err := clientConfig.ClientConfig()
	if err != nil {
		return nil, "", fmt.Errorf("load kubeconfig: %w", err)
	}
	restConfig.Timeout = 30 * time.Second

	namespace, _, err := clientConfig.Namespace()
	if err != nil {
		return nil, "", fmt.Errorf("resolve current namespace: %w", err)
	}
	if strings.TrimSpace(namespace) == "" {
		namespace = corev1.NamespaceDefault
	}

	clientset, err := kubernetes.NewForConfig(restConfig)
	if err != nil {
		return nil, "", fmt.Errorf("create kubernetes client: %w", err)
	}
	return clientset, namespace, nil
}

func discoverSnapshotStorage(ctx context.Context, clientset kubernetes.Interface, namespace string) (snapshotprotocol.Storage, error) {
	daemonSets, err := clientset.AppsV1().DaemonSets(namespace).List(ctx, metav1.ListOptions{
		LabelSelector: snapshotprotocol.SnapshotAgentLabelSelector,
	})
	if err != nil {
		return snapshotprotocol.Storage{}, fmt.Errorf("list snapshot-agent daemonsets in %s: %w", namespace, err)
	}

	return snapshotprotocol.DiscoverStorageFromDaemonSets(namespace, daemonSets.Items)
}

func loadPod(manifestPath string) (*corev1.Pod, error) {
	content, err := os.ReadFile(manifestPath)
	if err != nil {
		return nil, fmt.Errorf("read manifest %s: %w", manifestPath, err)
	}

	var pod corev1.Pod
	if err := yaml.Unmarshal(content, &pod); err != nil {
		return nil, fmt.Errorf("parse manifest %s: %w", manifestPath, err)
	}
	if kind := strings.TrimSpace(pod.Kind); kind != "" && kind != "Pod" {
		return nil, fmt.Errorf("manifest %s is kind %q, expected Pod", manifestPath, kind)
	}
	if len(pod.Spec.Containers) != 1 {
		return nil, fmt.Errorf(
			"manifest %s has %d containers; snapshotctl requires exactly one worker container",
			manifestPath,
			len(pod.Spec.Containers),
		)
	}
	if strings.TrimSpace(pod.Spec.Containers[0].Image) == "" {
		return nil, fmt.Errorf("manifest %s: worker container image is required", manifestPath)
	}
	if strings.TrimSpace(pod.Name) == "" {
		return nil, fmt.Errorf("manifest %s: metadata.name is required", manifestPath)
	}

	pod.Namespace = strings.TrimSpace(pod.Namespace)
	return &pod, nil
}