"vscode:/vscode.git/clone" did not exist on "d46455046317589e741331c0a2ec6a3873c50e1e"
predicate.go 10.3 KB
Newer Older
1
/*
2
 * SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
 * SPDX-License-Identifier: Apache-2.0
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package controller_common

import (
	"context"
	"strings"
23
	"time"
24

25
	commonconsts "github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
26
	"k8s.io/apimachinery/pkg/api/meta"
27
28
	"k8s.io/client-go/discovery"
	ctrl "sigs.k8s.io/controller-runtime"
29
30
31
32
33
	"sigs.k8s.io/controller-runtime/pkg/client"
	"sigs.k8s.io/controller-runtime/pkg/log"
	"sigs.k8s.io/controller-runtime/pkg/predicate"
)

34
35
36
37
38
// ExcludedNamespacesInterface defines the interface for checking namespace exclusions
type ExcludedNamespacesInterface interface {
	Contains(namespace string) bool
}

39
40
41
type GroveConfig struct {
	// Enabled is automatically determined by checking if Grove CRDs are installed in the cluster
	Enabled bool
42
	// TerminationDelay configures the termination delay for Grove PodCliqueSets
43
44
45
	TerminationDelay time.Duration
}

46
47
48
49
50
type LWSConfig struct {
	// Enabled is automatically determined by checking if LWS CRDs are installed in the cluster
	Enabled bool
}

51
52
53
54
55
type KaiSchedulerConfig struct {
	// Enabled is automatically determined by checking if Kai-scheduler CRDs are installed in the cluster
	Enabled bool
}

56
57
58
59
60
type MpiRunConfig struct {
	// SecretName is the name of the secret containing the SSH key for MPI Run
	SecretName string
}

61
62
63
type Config struct {
	// Enable resources filtering, only the resources belonging to the given namespace will be handled.
	RestrictedNamespace string
64
	Grove               GroveConfig
65
	LWS                 LWSConfig
66
	KaiScheduler        KaiSchedulerConfig
67
68
69
	EtcdAddress         string
	NatsAddress         string
	IngressConfig       IngressConfig
70
71
	// ModelExpressURL is the URL of the Model Express server to inject into all pods
	ModelExpressURL string
72
73
	// PrometheusEndpoint is the URL of the Prometheus endpoint to use for metrics
	PrometheusEndpoint string
74
	MpiRun             MpiRunConfig
75
76
	// RBAC configuration for cross-namespace resource management
	RBAC RBACConfig
77
78
	// ExcludedNamespaces is a thread-safe set of namespaces to exclude (cluster-wide mode only)
	ExcludedNamespaces ExcludedNamespacesInterface
79

80
	// DiscoveryBackend is the discovery backend to use. Default is "kubernetes" for Kubernetes API service discovery. Set to "etcd" to use ETCD for discovery.
81
	DiscoveryBackend string
82
83
84
85
86

	// WebhooksEnabled indicates whether admission webhooks are enabled
	// When true, controllers skip validation (webhooks handle it)
	// When false, controllers perform validation (defense in depth)
	WebhooksEnabled bool
87

88
89
90
91
	// GPUDiscoveryEnabled indicates whether Helm provisioned node read access for the namespace-scoped operator.
	// Only relevant for namespace-scoped operators (RestrictedNamespace != "").
	GPUDiscoveryEnabled bool

92
93
	// Checkpoint configuration for checkpoint/restore functionality
	Checkpoint CheckpointConfig
94
95
96
97
98
99
}

// RBACConfig holds configuration for RBAC management
type RBACConfig struct {
	// PlannerClusterRoleName is the name of the ClusterRole for planner (cluster-wide mode only)
	PlannerClusterRoleName string
100
101
	// DGDRProfilingClusterRoleName is the name of the ClusterRole for DGDR profiling jobs (cluster-wide mode only)
	DGDRProfilingClusterRoleName string
102
103
	// EPPClusterRoleName is the name of the ClusterRole for EPP (cluster-wide mode only)
	EPPClusterRoleName string
104
105
}

106
107
108
109
110
111
112
113
114
// CheckpointConfig holds configuration for checkpoint/restore functionality
type CheckpointConfig struct {
	// Enabled indicates if checkpoint functionality is enabled
	Enabled bool
	// Storage holds storage backend configuration
	Storage CheckpointStorageConfig
	// InitContainerImage is the image used for init containers (e.g., signal file cleanup)
	// Defaults to "busybox:latest" if not specified
	InitContainerImage string
115
116
117
118
	// ReadyForCheckpointFilePath is the file path used to signal model readiness for checkpoint jobs
	ReadyForCheckpointFilePath string
	// RestoreMarkerFilePath is the marker file path written after successful restore
	RestoreMarkerFilePath string
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
}

// Checkpoint storage type constants
const (
	CheckpointStorageTypePVC = "pvc"
	CheckpointStorageTypeS3  = "s3"
	CheckpointStorageTypeOCI = "oci"
)

// CheckpointStorageConfig holds storage backend configuration for checkpoints
type CheckpointStorageConfig struct {
	// Type is the storage backend type: pvc, s3, or oci
	Type string
	// SignalHostPath is the host path for signal files (used for checkpoint job coordination)
	SignalHostPath string
	// PVC configuration (used when Type=pvc)
	PVC CheckpointPVCConfig
	// S3 configuration (used when Type=s3)
	S3 CheckpointS3Config
	// OCI configuration (used when Type=oci)
	OCI CheckpointOCIConfig
}

// CheckpointPVCConfig holds PVC storage configuration
type CheckpointPVCConfig struct {
	// PVCName is the name of the PVC
	PVCName string
	// BasePath is the base directory within the PVC
	BasePath string
}

// CheckpointS3Config holds S3 storage configuration
type CheckpointS3Config struct {
	// URI is the S3 URI (s3://[endpoint/]bucket/prefix)
	URI string
	// CredentialsSecretRef is the name of the credentials secret
	// (should contain AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, and optionally AWS_REGION)
	CredentialsSecretRef string
}

// CheckpointOCIConfig holds OCI registry storage configuration
type CheckpointOCIConfig struct {
	// URI is the OCI URI (oci://registry/repository)
	URI string
	// CredentialsSecretRef is the name of the docker config secret
	CredentialsSecretRef string
}

167
168
169
170
171
172
173
174
175
type IngressConfig struct {
	VirtualServiceGateway      string
	IngressControllerClassName string
	IngressControllerTLSSecret string
	IngressHostSuffix          string
}

func (i *IngressConfig) UseVirtualService() bool {
	return i.VirtualServiceGateway != ""
176
177
}

178
179
180
// DetectGroveAvailability checks if Grove is available by checking if the Grove API group is registered
// This approach uses the discovery client which is simpler and more reliable
func DetectGroveAvailability(ctx context.Context, mgr ctrl.Manager) bool {
181
182
183
184
185
186
187
188
189
	return detectAPIGroupAvailability(ctx, mgr, "grove.io")
}

// DetectLWSAvailability checks if LWS is available by checking if the LWS API group is registered
// This approach uses the discovery client which is simpler and more reliable
func DetectLWSAvailability(ctx context.Context, mgr ctrl.Manager) bool {
	return detectAPIGroupAvailability(ctx, mgr, "leaderworkerset.x-k8s.io")
}

190
191
192
193
194
195
// detectVolcanoAvailability checks if Volcano is available by checking if the Volcano API group is registered
// This approach uses the discovery client which is simpler and more reliable
func DetectVolcanoAvailability(ctx context.Context, mgr ctrl.Manager) bool {
	return detectAPIGroupAvailability(ctx, mgr, "scheduling.volcano.sh")
}

196
197
198
199
200
201
// DetectKaiSchedulerAvailability checks if Kai-scheduler is available by checking if the scheduling.run.ai API group is registered
// This approach uses the discovery client which is simpler and more reliable
func DetectKaiSchedulerAvailability(ctx context.Context, mgr ctrl.Manager) bool {
	return detectAPIGroupAvailability(ctx, mgr, "scheduling.run.ai")
}

202
203
204
205
206
207
// DetectInferencePoolAvailability checks if the Gateway API Inference Extension is available
// by checking if the inference.networking.k8s.io API group is registered
func DetectInferencePoolAvailability(ctx context.Context, mgr ctrl.Manager) bool {
	return detectAPIGroupAvailability(ctx, mgr, "inference.networking.k8s.io")
}

208
209
// detectAPIGroupAvailability checks if a specific API group is registered in the cluster
func detectAPIGroupAvailability(ctx context.Context, mgr ctrl.Manager, groupName string) bool {
210
211
212
213
	logger := log.FromContext(ctx)

	cfg := mgr.GetConfig()
	if cfg == nil {
214
		logger.Info("detection failed, no discovery client available", "group", groupName)
215
216
217
218
219
		return false
	}

	discoveryClient, err := discovery.NewDiscoveryClientForConfig(cfg)
	if err != nil {
220
		logger.Error(err, "detection failed, could not create discovery client", "group", groupName)
221
222
223
224
225
		return false
	}

	apiGroups, err := discoveryClient.ServerGroups()
	if err != nil {
226
		logger.Error(err, "detection failed, could not list server groups", "group", groupName)
227
228
229
230
		return false
	}

	for _, group := range apiGroups.Groups {
231
232
		if group.Name == groupName {
			logger.Info("API group is available", "group", groupName)
233
234
235
236
			return true
		}
	}

237
	logger.Info("API group not available", "group", groupName)
238
239
	return false
}
240

241
242
243
244
245
246
247
248
249
250
251
252
253
// For DGD, pass in the meta annotations
// For DCD, pass in the spec annotations
func (c Config) IsK8sDiscoveryEnabled(annotations map[string]string) bool {
	return c.GetDiscoveryBackend(annotations) == "kubernetes"
}

func (c Config) GetDiscoveryBackend(annotations map[string]string) string {
	if dgdDiscoveryBackend, exists := annotations[commonconsts.KubeAnnotationDynamoDiscoveryBackend]; exists {
		return dgdDiscoveryBackend
	}
	return c.DiscoveryBackend
}

254
255
256
257
258
259
260
261
262
263
264
265
func EphemeralDeploymentEventFilter(config Config) predicate.Predicate {
	return predicate.NewPredicateFuncs(func(o client.Object) bool {
		l := log.FromContext(context.Background())
		objMeta, err := meta.Accessor(o)
		if err != nil {
			l.Error(err, "Error extracting object metadata")
			return false
		}
		if config.RestrictedNamespace != "" {
			// in case of a restricted namespace, we only want to process the events that are in the restricted namespace
			return objMeta.GetNamespace() == config.RestrictedNamespace
		}
266
267
268
269
270
271
272
273
274
275

		// Cluster-wide mode: check if namespace is excluded
		if config.ExcludedNamespaces != nil && config.ExcludedNamespaces.Contains(objMeta.GetNamespace()) {
			l.V(1).Info("Skipping resource - namespace is excluded",
				"namespace", objMeta.GetNamespace(),
				"resource", objMeta.GetName(),
				"kind", o.GetObjectKind().GroupVersionKind().Kind)
			return false
		}

276
277
278
279
280
281
282
		// in all other cases, discard the event if it is destined to an ephemeral deployment
		if strings.Contains(objMeta.GetNamespace(), "ephemeral") {
			return false
		}
		return true
	})
}