predicate.go 6.33 KB
Newer Older
1
/*
2
 * SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
 * SPDX-License-Identifier: Apache-2.0
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package controller_common

import (
	"context"
	"strings"

24
	configv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/config/v1alpha1"
25
	commonconsts "github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
26
	"k8s.io/apimachinery/pkg/api/meta"
27
28
	"k8s.io/client-go/discovery"
	ctrl "sigs.k8s.io/controller-runtime"
29
30
31
32
33
	"sigs.k8s.io/controller-runtime/pkg/client"
	"sigs.k8s.io/controller-runtime/pkg/log"
	"sigs.k8s.io/controller-runtime/pkg/predicate"
)

34
35
36
37
38
// ExcludedNamespacesInterface defines the interface for checking namespace exclusions
type ExcludedNamespacesInterface interface {
	Contains(namespace string) bool
}

39
40
// DetectGroveAvailability checks if Grove is available by checking if the Grove API group is registered
func DetectGroveAvailability(ctx context.Context, mgr ctrl.Manager) bool {
41
42
43
44
45
46
47
48
	return detectAPIGroupAvailability(ctx, mgr, "grove.io")
}

// DetectLWSAvailability checks if LWS is available by checking if the LWS API group is registered
func DetectLWSAvailability(ctx context.Context, mgr ctrl.Manager) bool {
	return detectAPIGroupAvailability(ctx, mgr, "leaderworkerset.x-k8s.io")
}

49
// DetectVolcanoAvailability checks if Volcano is available by checking if the Volcano API group is registered
50
51
52
53
func DetectVolcanoAvailability(ctx context.Context, mgr ctrl.Manager) bool {
	return detectAPIGroupAvailability(ctx, mgr, "scheduling.volcano.sh")
}

54
55
56
57
58
// DetectKaiSchedulerAvailability checks if Kai-scheduler is available by checking if the scheduling.run.ai API group is registered
func DetectKaiSchedulerAvailability(ctx context.Context, mgr ctrl.Manager) bool {
	return detectAPIGroupAvailability(ctx, mgr, "scheduling.run.ai")
}

59
60
61
62
63
64
// DetectInferencePoolAvailability checks if the Gateway API Inference Extension is available
// by checking if the inference.networking.k8s.io API group is registered
func DetectInferencePoolAvailability(ctx context.Context, mgr ctrl.Manager) bool {
	return detectAPIGroupAvailability(ctx, mgr, "inference.networking.k8s.io")
}

65
66
67
68
69
70
// DetectDRAAvailability checks if Dynamic Resource Allocation is available
// by checking if the resource.k8s.io API group is registered (Kubernetes 1.32+)
func DetectDRAAvailability(ctx context.Context, mgr ctrl.Manager) bool {
	return detectAPIGroupAvailability(ctx, mgr, "resource.k8s.io")
}

71
72
// detectAPIGroupAvailability checks if a specific API group is registered in the cluster
func detectAPIGroupAvailability(ctx context.Context, mgr ctrl.Manager, groupName string) bool {
73
74
75
76
	logger := log.FromContext(ctx)

	cfg := mgr.GetConfig()
	if cfg == nil {
77
		logger.Info("detection failed, no discovery client available", "group", groupName)
78
79
80
81
82
		return false
	}

	discoveryClient, err := discovery.NewDiscoveryClientForConfig(cfg)
	if err != nil {
83
		logger.Error(err, "detection failed, could not create discovery client", "group", groupName)
84
85
86
87
88
		return false
	}

	apiGroups, err := discoveryClient.ServerGroups()
	if err != nil {
89
		logger.Error(err, "detection failed, could not list server groups", "group", groupName)
90
91
92
93
		return false
	}

	for _, group := range apiGroups.Groups {
94
95
		if group.Name == groupName {
			logger.Info("API group is available", "group", groupName)
96
97
98
99
			return true
		}
	}

100
	logger.Info("API group not available", "group", groupName)
101
102
	return false
}
103

104
105
106
107
// GetDiscoveryBackend returns the discovery backend for the given annotations,
// falling back to the configured default.
// For DGD, pass in the meta annotations; for DCD, pass in the spec annotations.
func GetDiscoveryBackend(discoveryBackend configv1alpha1.DiscoveryBackend, annotations map[string]string) configv1alpha1.DiscoveryBackend {
108
	if dgdDiscoveryBackend, exists := annotations[commonconsts.KubeAnnotationDynamoDiscoveryBackend]; exists {
109
		return configv1alpha1.DiscoveryBackend(dgdDiscoveryBackend)
110
	}
111
112
113
114
115
116
	return discoveryBackend
}

// IsK8sDiscoveryEnabled returns whether Kubernetes discovery is enabled for the given annotations.
func IsK8sDiscoveryEnabled(discoveryBackend configv1alpha1.DiscoveryBackend, annotations map[string]string) bool {
	return GetDiscoveryBackend(discoveryBackend, annotations) == configv1alpha1.DiscoveryBackendKubernetes
117
118
}

119
120
121
122
123
124
125
126
// GetKubeDiscoveryMode returns the kube discovery mode from annotations, defaulting to pod mode.
func GetKubeDiscoveryMode(annotations map[string]string) configv1alpha1.KubeDiscoveryMode {
	if mode, exists := annotations[commonconsts.KubeAnnotationDynamoKubeDiscoveryMode]; exists {
		return configv1alpha1.KubeDiscoveryMode(mode)
	}
	return configv1alpha1.KubeDiscoveryModePod
}

127
128
// EphemeralDeploymentEventFilter returns a predicate that filters events based on namespace configuration.
func EphemeralDeploymentEventFilter(config *configv1alpha1.OperatorConfiguration, runtimeConfig *RuntimeConfig) predicate.Predicate {
129
130
131
132
133
134
135
	return predicate.NewPredicateFuncs(func(o client.Object) bool {
		l := log.FromContext(context.Background())
		objMeta, err := meta.Accessor(o)
		if err != nil {
			l.Error(err, "Error extracting object metadata")
			return false
		}
136
		if config.Namespace.Restricted != "" {
137
			// in case of a restricted namespace, we only want to process the events that are in the restricted namespace
138
			return objMeta.GetNamespace() == config.Namespace.Restricted
139
		}
140
141

		// Cluster-wide mode: check if namespace is excluded
142
		if runtimeConfig.ExcludedNamespaces != nil && runtimeConfig.ExcludedNamespaces.Contains(objMeta.GetNamespace()) {
143
144
145
146
147
148
149
			l.V(1).Info("Skipping resource - namespace is excluded",
				"namespace", objMeta.GetNamespace(),
				"resource", objMeta.GetName(),
				"kind", o.GetObjectKind().GroupVersionKind().Kind)
			return false
		}

150
151
152
153
154
155
156
		// in all other cases, discard the event if it is destined to an ephemeral deployment
		if strings.Contains(objMeta.GetNamespace(), "ephemeral") {
			return false
		}
		return true
	})
}