Unverified Commit c0f34b15 authored by Thomas Montfort's avatar Thomas Montfort Committed by GitHub
Browse files

feat(operator): operator config versioning and injected as configmap (#6464)

parent 59c5f6f1
......@@ -44,6 +44,7 @@ spec:
{{- include "dynamo-operator.selectorLabels" . | nindent 8 }}
annotations:
kubectl.kubernetes.io/default-container: manager
checksum/config: {{ include (print $.Template.BasePath "/operator-config.yaml") . | sha256sum }}
spec:
{{- with .Values.imagePullSecrets }}
imagePullSecrets:
......@@ -77,95 +78,8 @@ spec:
securityContext: {{- toYaml .Values.controllerManager.kubeRbacProxy.containerSecurityContext
| nindent 10 }}
- args:
{{- range .Values.controllerManager.manager.args }}
- {{ . }}
{{- end }}
- --leader-elect
- --leader-election-id={{ default "dynamo.nvidia.com" .Values.controllerManager.leaderElection.id }}
{{- if .Values.namespaceRestriction.enabled }}
{{- $restrictedNs := default .Release.Namespace .Values.namespaceRestriction.targetNamespace }}
- --restrictedNamespace={{ $restrictedNs }}
- --leader-election-namespace={{ $restrictedNs }}
{{- else }}
- --leader-election-namespace={{ default "kube-system" .Values.controllerManager.leaderElection.namespace }}
{{- end }}
{{- if .Values.natsAddr }}
- --natsAddr={{ .Values.natsAddr }}
{{- else if and .Values.nats .Values.nats.enabled }}
- --natsAddr=nats://{{ .Release.Name }}-nats.{{ .Release.Namespace }}.svc.cluster.local:4222
{{- end }}
{{- if .Values.etcdAddr }}
- --etcdAddr={{ .Values.etcdAddr }}
{{- else if and .Values.global .Values.global.etcd .Values.global.etcd.install }}
- --etcdAddr={{ .Release.Name }}-etcd.{{ .Release.Namespace }}.svc.cluster.local:2379
{{- end }}
{{- if and .Values.dynamo.istio.enabled .Values.dynamo.istio.gateway }}
- --istio-virtual-service-gateway={{ .Values.dynamo.istio.gateway }}
{{- end }}
{{- if .Values.dynamo.ingress.enabled }}
{{- if .Values.dynamo.ingress.className }}
- --ingress-controller-class-name={{ .Values.dynamo.ingress.className }}
{{- end }}
{{- if .Values.dynamo.ingress.tlsSecretName }}
- --ingress-controller-tls-secret-name={{ .Values.dynamo.ingress.tlsSecretName }}
{{- end }}
{{- end }}
{{- if .Values.dynamo.ingressHostSuffix }}
- --ingress-host-suffix={{ .Values.dynamo.ingressHostSuffix }}
{{- end }}
{{- if .Values.dynamo.virtualServiceSupportsHTTPS }}
- --virtual-service-supports-https={{ .Values.dynamo.virtualServiceSupportsHTTPS }}
{{- end }}
{{- if .Values.dynamo.groveTerminationDelay }}
- --grove-termination-delay={{ .Values.dynamo.groveTerminationDelay }}
{{- end }}
{{- if .Values.modelExpressURL }}
- --model-express-url={{ .Values.modelExpressURL }}
{{- end }}
{{- if .Values.dynamo.metrics.prometheusEndpoint }}
- --prometheus-endpoint={{ .Values.dynamo.metrics.prometheusEndpoint }}
{{- end }}
{{- if .Values.dynamo.mpiRun.secretName }}
- --mpi-run-ssh-secret-name={{ .Values.dynamo.mpiRun.secretName }}
- --mpi-run-ssh-secret-namespace={{ .Release.Namespace }}
{{- end }}
{{- if not .Values.namespaceRestriction.enabled }}
- --dgdr-profiling-cluster-role-name={{ include "dynamo-operator.fullname" . }}-dgdr-profiling
- --planner-cluster-role-name={{ include "dynamo-operator.fullname" . }}-planner
- --epp-cluster-role-name={{ include "dynamo-operator.fullname" . }}-epp
{{- end }}
{{- if .Values.discoveryBackend }}
- --discovery-backend={{ .Values.discoveryBackend }}
{{- end }}
{{- if .Values.namespaceRestriction.enabled }}
{{- if .Values.namespaceRestriction.lease }}
- --namespace-scope-lease-duration={{ .Values.namespaceRestriction.lease.duration }}
- --namespace-scope-lease-renew-interval={{ .Values.namespaceRestriction.lease.renewInterval }}
{{- end }}
- --gpu-discovery-enabled={{ .Values.gpuDiscovery.enabled }}
{{- end }}
- --operator-version={{ .Chart.AppVersion }}
{{- if .Values.checkpoint.enabled }}
- --checkpoint-enabled=true
- --checkpoint-storage-type={{ .Values.checkpoint.storage.type }}
- --checkpoint-ready-for-checkpoint-file-path={{ .Values.checkpoint.readyForCheckpointFilePath }}
{{- if eq .Values.checkpoint.storage.type "pvc" }}
- --checkpoint-pvc-name={{ .Values.checkpoint.storage.pvc.pvcName }}
- --checkpoint-pvc-base-path={{ .Values.checkpoint.storage.pvc.basePath }}
{{- end }}
{{- if eq .Values.checkpoint.storage.type "s3" }}
- --checkpoint-s3-uri={{ .Values.checkpoint.storage.s3.uri }}
{{- if .Values.checkpoint.storage.s3.credentialsSecretRef }}
- --checkpoint-s3-credentials-secret={{ .Values.checkpoint.storage.s3.credentialsSecretRef }}
{{- end }}
{{- end }}
{{- if eq .Values.checkpoint.storage.type "oci" }}
- --checkpoint-oci-uri={{ .Values.checkpoint.storage.oci.uri }}
{{- if .Values.checkpoint.storage.oci.credentialsSecretRef }}
- --checkpoint-oci-credentials-secret={{ .Values.checkpoint.storage.oci.credentialsSecretRef }}
{{- end }}
{{- end }}
{{- end }}
- --config=/etc/dynamo-operator/config.yaml
- --operator-version={{ .Chart.AppVersion }}
command:
- /manager
env:
......@@ -196,6 +110,9 @@ spec:
securityContext: {{- toYaml .Values.controllerManager.manager.containerSecurityContext
| nindent 10 }}
volumeMounts:
- mountPath: /etc/dynamo-operator
name: operator-config
readOnly: true
- mountPath: /tmp/k8s-webhook-server/serving-certs
name: cert
readOnly: true
......@@ -204,6 +121,9 @@ spec:
serviceAccountName: {{ include "dynamo-operator.fullname" . }}-controller-manager
terminationGracePeriodSeconds: 30
volumes:
- name: operator-config
configMap:
name: {{ include "dynamo-operator.fullname" . }}-config
- name: cert
secret:
defaultMode: 420
......
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
apiVersion: v1
kind: ConfigMap
metadata:
name: {{ include "dynamo-operator.fullname" . }}-config
labels:
{{- include "dynamo-operator.labels" . | nindent 4 }}
data:
config.yaml: |
apiVersion: operator.config.dynamo.nvidia.com/v1alpha1
kind: OperatorConfiguration
server:
metrics:
bindAddress: "127.0.0.1"
leaderElection:
enabled: true
id: {{ default "dynamo.nvidia.com" .Values.controllerManager.leaderElection.id | quote }}
{{- if .Values.namespaceRestriction.enabled }}
namespace: {{ default .Release.Namespace .Values.namespaceRestriction.targetNamespace | quote }}
{{- else }}
namespace: {{ default "kube-system" .Values.controllerManager.leaderElection.namespace | quote }}
{{- end }}
{{- if .Values.namespaceRestriction.enabled }}
namespace:
restricted: {{ default .Release.Namespace .Values.namespaceRestriction.targetNamespace | quote }}
{{- if .Values.namespaceRestriction.lease }}
{{- if or (ne (.Values.namespaceRestriction.lease.duration | toString) "30s") (ne (.Values.namespaceRestriction.lease.renewInterval | toString) "10s") }}
scope:
{{- if ne (.Values.namespaceRestriction.lease.duration | toString) "30s" }}
leaseDuration: {{ .Values.namespaceRestriction.lease.duration | quote }}
{{- end }}
{{- if ne (.Values.namespaceRestriction.lease.renewInterval | toString) "10s" }}
leaseRenewInterval: {{ .Values.namespaceRestriction.lease.renewInterval | quote }}
{{- end }}
{{- end }}
{{- end }}
{{- end }}
{{- if and .Values.dynamo.groveTerminationDelay (ne (.Values.dynamo.groveTerminationDelay | toString) "15m") }}
orchestrators:
grove:
terminationDelay: {{ .Values.dynamo.groveTerminationDelay | quote }}
{{- end }}
{{- $natsAddr := "" }}
{{- if .Values.natsAddr }}
{{- $natsAddr = .Values.natsAddr }}
{{- else if and .Values.nats .Values.nats.enabled }}
{{- $natsAddr = printf "nats://%s-nats.%s.svc.cluster.local:4222" .Release.Name .Release.Namespace }}
{{- end }}
{{- $etcdAddr := "" }}
{{- if .Values.etcdAddr }}
{{- $etcdAddr = .Values.etcdAddr }}
{{- else if and .Values.global .Values.global.etcd .Values.global.etcd.install }}
{{- $etcdAddr = printf "%s-etcd.%s.svc.cluster.local:2379" .Release.Name .Release.Namespace }}
{{- end }}
{{- if or $natsAddr $etcdAddr .Values.modelExpressURL .Values.dynamo.metrics.prometheusEndpoint }}
infrastructure:
{{- if $natsAddr }}
natsAddress: {{ $natsAddr | quote }}
{{- end }}
{{- if $etcdAddr }}
etcdAddress: {{ $etcdAddr | quote }}
{{- end }}
{{- if .Values.modelExpressURL }}
modelExpressURL: {{ .Values.modelExpressURL | quote }}
{{- end }}
{{- if .Values.dynamo.metrics.prometheusEndpoint }}
prometheusEndpoint: {{ .Values.dynamo.metrics.prometheusEndpoint | quote }}
{{- end }}
{{- end }}
{{- $vsGateway := "" }}
{{- if and (hasKey .Values.dynamo "istio") .Values.dynamo.istio .Values.dynamo.istio.enabled .Values.dynamo.istio.gateway }}
{{- $vsGateway = .Values.dynamo.istio.gateway }}
{{- end }}
{{- $ingressClassName := "" }}
{{- $ingressTLSSecret := "" }}
{{- if and (hasKey .Values.dynamo "ingress") .Values.dynamo.ingress .Values.dynamo.ingress.enabled }}
{{- $ingressClassName = .Values.dynamo.ingress.className }}
{{- $ingressTLSSecret = .Values.dynamo.ingress.tlsSecretName }}
{{- end }}
{{- $ingressHostSuffix := "" }}
{{- if and (hasKey .Values.dynamo "ingressHostSuffix") .Values.dynamo.ingressHostSuffix }}
{{- $ingressHostSuffix = .Values.dynamo.ingressHostSuffix }}
{{- end }}
{{- if or $vsGateway $ingressClassName $ingressTLSSecret $ingressHostSuffix }}
ingress:
{{- if $vsGateway }}
virtualServiceGateway: {{ $vsGateway | quote }}
{{- end }}
{{- if $ingressClassName }}
controllerClassName: {{ $ingressClassName | quote }}
{{- end }}
{{- if $ingressTLSSecret }}
controllerTLSSecretName: {{ $ingressTLSSecret | quote }}
{{- end }}
{{- if $ingressHostSuffix }}
hostSuffix: {{ $ingressHostSuffix | quote }}
{{- end }}
{{- end }}
{{- if not .Values.namespaceRestriction.enabled }}
rbac:
plannerClusterRoleName: {{ include "dynamo-operator.fullname" . }}-planner
dgdrProfilingClusterRoleName: {{ include "dynamo-operator.fullname" . }}-dgdr-profiling
eppClusterRoleName: {{ include "dynamo-operator.fullname" . }}-epp
{{- end }}
{{- if .Values.dynamo.mpiRun.secretName }}
mpi:
sshSecretName: {{ .Values.dynamo.mpiRun.secretName | quote }}
sshSecretNamespace: {{ .Release.Namespace | quote }}
{{- end }}
{{- if .Values.checkpoint.enabled }}
checkpoint:
enabled: true
{{- if ne (.Values.checkpoint.readyForCheckpointFilePath | toString) "/tmp/ready-for-checkpoint" }}
readyForCheckpointFilePath: {{ .Values.checkpoint.readyForCheckpointFilePath | quote }}
{{- end }}
storage:
{{- if ne (.Values.checkpoint.storage.type | toString) "pvc" }}
type: {{ .Values.checkpoint.storage.type | quote }}
{{- end }}
{{- if eq .Values.checkpoint.storage.type "pvc" }}
{{- if or (ne (.Values.checkpoint.storage.pvc.pvcName | toString) "chrek-pvc") (ne (.Values.checkpoint.storage.pvc.basePath | toString) "/checkpoints") }}
pvc:
{{- if ne (.Values.checkpoint.storage.pvc.pvcName | toString) "chrek-pvc" }}
pvcName: {{ .Values.checkpoint.storage.pvc.pvcName | quote }}
{{- end }}
{{- if ne (.Values.checkpoint.storage.pvc.basePath | toString) "/checkpoints" }}
basePath: {{ .Values.checkpoint.storage.pvc.basePath | quote }}
{{- end }}
{{- end }}
{{- end }}
{{- if eq .Values.checkpoint.storage.type "s3" }}
s3:
uri: {{ .Values.checkpoint.storage.s3.uri | quote }}
{{- if .Values.checkpoint.storage.s3.credentialsSecretRef }}
credentialsSecretRef: {{ .Values.checkpoint.storage.s3.credentialsSecretRef | quote }}
{{- end }}
{{- end }}
{{- if eq .Values.checkpoint.storage.type "oci" }}
oci:
uri: {{ .Values.checkpoint.storage.oci.uri | quote }}
{{- if .Values.checkpoint.storage.oci.credentialsSecretRef }}
credentialsSecretRef: {{ .Values.checkpoint.storage.oci.credentialsSecretRef | quote }}
{{- end }}
{{- end }}
{{- end }}
{{- if and .Values.discoveryBackend (ne (.Values.discoveryBackend | toString) "kubernetes") }}
discovery:
backend: {{ .Values.discoveryBackend | quote }}
{{- end }}
{{- if .Values.namespaceRestriction.enabled }}
gpu:
discoveryEnabled: {{ .Values.gpuDiscovery.enabled }}
{{- end }}
......@@ -81,11 +81,6 @@ controllerManager:
cpu: 5m
memory: 64Mi
manager:
args:
- --health-probe-bind-address=:8081
- --metrics-bind-address=127.0.0.1:8080
- --leader-elect
- --leader-election-id=dynamo.nvidia.com
containerSecurityContext:
allowPrivilegeEscalation: false
capabilities:
......
/*
* SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package v1alpha1
import (
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/ptr"
)
// SetDefaultsOperatorConfiguration sets default values for OperatorConfiguration.
// IMPORTANT: When changing defaults here, also update the corresponding
// +kubebuilder:default= markers in types.go to keep API docs in sync.
func SetDefaultsOperatorConfiguration(obj *OperatorConfiguration) {
// Server defaults
if obj.Server.Metrics.Port == 0 {
obj.Server.Metrics.Port = 8080
}
if obj.Server.Metrics.BindAddress == "" {
obj.Server.Metrics.BindAddress = "127.0.0.1"
}
if obj.Server.HealthProbe.Port == 0 {
obj.Server.HealthProbe.Port = 8081
}
if obj.Server.HealthProbe.BindAddress == "" {
obj.Server.HealthProbe.BindAddress = "0.0.0.0"
}
if obj.Server.Webhook.Host == "" {
obj.Server.Webhook.Host = "0.0.0.0"
}
if obj.Server.Webhook.Port == 0 {
obj.Server.Webhook.Port = 9443
}
if obj.Server.Webhook.CertDir == "" {
obj.Server.Webhook.CertDir = "/tmp/k8s-webhook-server/serving-certs"
}
// Orchestrator defaults
if obj.Orchestrators.Grove.TerminationDelay.Duration == 0 {
obj.Orchestrators.Grove.TerminationDelay = metav1.Duration{Duration: 15 * time.Minute}
}
// Namespace scope defaults
if obj.Namespace.Scope.LeaseDuration.Duration == 0 {
obj.Namespace.Scope.LeaseDuration = metav1.Duration{Duration: 30 * time.Second}
}
if obj.Namespace.Scope.LeaseRenewInterval.Duration == 0 {
obj.Namespace.Scope.LeaseRenewInterval = metav1.Duration{Duration: 10 * time.Second}
}
// Discovery defaults
if obj.Discovery.Backend == "" {
obj.Discovery.Backend = DiscoveryBackendKubernetes
}
// GPU discovery defaults
if obj.GPU.DiscoveryEnabled == nil {
obj.GPU.DiscoveryEnabled = ptr.To(true)
}
// Checkpoint defaults
if obj.Checkpoint.ReadyForCheckpointFilePath == "" {
obj.Checkpoint.ReadyForCheckpointFilePath = "/tmp/ready-for-checkpoint"
}
if obj.Checkpoint.Storage.Type == "" {
obj.Checkpoint.Storage.Type = CheckpointStorageTypePVC
}
if obj.Checkpoint.Storage.PVC.PVCName == "" {
obj.Checkpoint.Storage.PVC.PVCName = "chrek-pvc"
}
if obj.Checkpoint.Storage.PVC.BasePath == "" {
obj.Checkpoint.Storage.PVC.BasePath = "/checkpoints"
}
// Logging defaults
if obj.Logging.Level == "" {
obj.Logging.Level = "info"
}
if obj.Logging.Format == "" {
obj.Logging.Format = "json"
}
}
/*
* SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// +kubebuilder:object:generate=true
// +groupName=operator.config.dynamo.nvidia.com
package v1alpha1
import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
)
var (
// SchemeGroupVersion is the group version used to register these objects.
SchemeGroupVersion = schema.GroupVersion{
Group: "operator.config.dynamo.nvidia.com",
Version: "v1alpha1",
}
// SchemeBuilder is used to add go types to the GroupVersionResource scheme.
SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes, addDefaultingFuncs)
// AddToScheme adds the types in this group-version to the given scheme.
AddToScheme = SchemeBuilder.AddToScheme
)
func addKnownTypes(scheme *runtime.Scheme) error {
scheme.AddKnownTypes(SchemeGroupVersion,
&OperatorConfiguration{},
)
return nil
}
func addDefaultingFuncs(scheme *runtime.Scheme) error {
return RegisterDefaults(scheme)
}
// RegisterDefaults adds defaulters functions to the given scheme.
// Public to allow building arbitrary schemes.
func RegisterDefaults(scheme *runtime.Scheme) error {
scheme.AddTypeDefaultingFunc(&OperatorConfiguration{}, func(obj interface{}) {
SetDefaultsOperatorConfiguration(obj.(*OperatorConfiguration))
})
return nil
}
/*
* SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package v1alpha1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// Checkpoint storage type constants
const (
CheckpointStorageTypePVC = "pvc"
CheckpointStorageTypeS3 = "s3"
CheckpointStorageTypeOCI = "oci"
)
// +kubebuilder:object:root=true
// OperatorConfiguration is the Schema for the operator configuration.
type OperatorConfiguration struct {
metav1.TypeMeta `json:",inline"`
// Server configuration (metrics, health probes, webhooks)
Server ServerConfiguration `json:"server"`
// Leader election configuration
LeaderElection LeaderElectionConfiguration `json:"leaderElection"`
// Namespace configuration (restricted vs cluster-wide)
Namespace NamespaceConfiguration `json:"namespace"`
// Orchestrator configuration with optional overrides
Orchestrators OrchestratorConfiguration `json:"orchestrators"`
// Service mesh and infrastructure addresses
Infrastructure InfrastructureConfiguration `json:"infrastructure"`
// Ingress configuration
Ingress IngressConfiguration `json:"ingress"`
// RBAC configuration for cross-namespace resource management (cluster-wide mode)
RBAC RBACConfiguration `json:"rbac"`
// MPI SSH secret configuration
MPI MPIConfiguration `json:"mpi"`
// Checkpoint/restore configuration
Checkpoint CheckpointConfiguration `json:"checkpoint"`
// Discovery backend configuration
Discovery DiscoveryConfiguration `json:"discovery"`
// GPU discovery configuration
GPU GPUConfiguration `json:"gpu"`
// Logging configuration
Logging LoggingConfiguration `json:"logging"`
// HTTP/2 and TLS settings
Security SecurityConfiguration `json:"security"`
}
// ServerConfiguration holds server bind addresses and ports.
type ServerConfiguration struct {
// Metrics server configuration
// +kubebuilder:default={bindAddress: "127.0.0.1", port: 8080}
Metrics MetricsServer `json:"metrics"`
// Health probe server configuration
// +kubebuilder:default={bindAddress: "0.0.0.0", port: 8081}
HealthProbe Server `json:"healthProbe"`
// Webhook server configuration
// +kubebuilder:default={host: "0.0.0.0", port: 9443, certDir: "/tmp/k8s-webhook-server/serving-certs"}
Webhook WebhookServer `json:"webhook"`
}
// Server holds a bind address and port.
type Server struct {
// BindAddress is the address the server binds to
BindAddress string `json:"bindAddress"`
// Port is the port the server listens on
Port int `json:"port"`
}
// MetricsServer extends Server with secure serving option.
type MetricsServer struct {
Server `json:",inline"`
// Secure enables secure serving for the metrics endpoint
Secure bool `json:"secure"`
}
// WebhookServer extends Server with host and certificate directory.
type WebhookServer struct {
Server `json:",inline"`
// Host is the address the webhook server binds to
Host string `json:"host"`
// CertDir is the directory containing TLS certificates
CertDir string `json:"certDir"`
}
// LeaderElectionConfiguration holds leader election settings.
type LeaderElectionConfiguration struct {
// Enabled enables leader election for controller manager
// +kubebuilder:default=false
Enabled bool `json:"enabled"`
// ID is the leader election resource identity
ID string `json:"id"`
// Namespace is the namespace for the leader election resource
Namespace string `json:"namespace"`
}
// NamespaceConfiguration determines operator namespace mode.
type NamespaceConfiguration struct {
// Restricted is the namespace to restrict to. Empty = cluster-wide mode.
Restricted string `json:"restricted"`
// Scope holds namespace scope lease settings (namespace-restricted mode only)
Scope NamespaceScopeConfiguration `json:"scope"`
}
// NamespaceScopeConfiguration holds lease settings for namespace-restricted mode.
type NamespaceScopeConfiguration struct {
// LeaseDuration is the duration of namespace scope marker lease before expiration
// +kubebuilder:default="30s"
LeaseDuration metav1.Duration `json:"leaseDuration"`
// LeaseRenewInterval is the interval for renewing namespace scope marker lease
// +kubebuilder:default="10s"
LeaseRenewInterval metav1.Duration `json:"leaseRenewInterval"`
}
// OrchestratorConfiguration holds orchestrator override settings.
type OrchestratorConfiguration struct {
// Grove orchestrator configuration
Grove GroveConfiguration `json:"grove"`
// LWS orchestrator configuration
LWS LWSConfiguration `json:"lws"`
// KaiScheduler configuration
KaiScheduler KaiSchedulerConfiguration `json:"kaiScheduler"`
}
// GroveConfiguration holds Grove orchestrator settings.
type GroveConfiguration struct {
// Enabled overrides auto-detection. nil = auto-detect.
Enabled *bool `json:"enabled,omitempty"`
// TerminationDelay configures the termination delay for Grove PodCliqueSets
// +kubebuilder:default="15m"
TerminationDelay metav1.Duration `json:"terminationDelay"`
}
// LWSConfiguration holds LWS orchestrator settings.
type LWSConfiguration struct {
// Enabled overrides auto-detection. nil = auto-detect.
Enabled *bool `json:"enabled,omitempty"`
}
// KaiSchedulerConfiguration holds Kai-scheduler settings.
type KaiSchedulerConfiguration struct {
// Enabled overrides auto-detection. nil = auto-detect.
Enabled *bool `json:"enabled,omitempty"`
}
// InfrastructureConfiguration holds service mesh and backend addresses.
type InfrastructureConfiguration struct {
// NATSAddress is the address of the NATS server
NATSAddress string `json:"natsAddress"`
// ETCDAddress is the address of the etcd server
ETCDAddress string `json:"etcdAddress"`
// ModelExpressURL is the URL of the Model Express server to inject into all pods
ModelExpressURL string `json:"modelExpressURL"`
// PrometheusEndpoint is the URL of the Prometheus endpoint to use for metrics
PrometheusEndpoint string `json:"prometheusEndpoint"`
}
// IngressConfiguration holds ingress settings.
type IngressConfiguration struct {
// VirtualServiceGateway is the name of the Istio virtual service gateway
VirtualServiceGateway string `json:"virtualServiceGateway"`
// ControllerClassName is the ingress controller class name
ControllerClassName string `json:"controllerClassName"`
// ControllerTLSSecretName is the TLS secret for the ingress controller
ControllerTLSSecretName string `json:"controllerTLSSecretName"`
// HostSuffix is the suffix for ingress hostnames
HostSuffix string `json:"hostSuffix"`
}
// UseVirtualService returns true if a VirtualService gateway is configured.
func (i *IngressConfiguration) UseVirtualService() bool {
return i.VirtualServiceGateway != ""
}
// RBACConfiguration holds RBAC settings for cluster-wide mode.
type RBACConfiguration struct {
// PlannerClusterRoleName is the ClusterRole for planner
PlannerClusterRoleName string `json:"plannerClusterRoleName"`
// DGDRProfilingClusterRoleName is the ClusterRole for DGDR profiling jobs
DGDRProfilingClusterRoleName string `json:"dgdrProfilingClusterRoleName"`
// EPPClusterRoleName is the ClusterRole for EPP
EPPClusterRoleName string `json:"eppClusterRoleName"`
}
// MPIConfiguration holds MPI SSH secret settings.
type MPIConfiguration struct {
// SSHSecretName is the name of the secret containing the SSH key for MPI
SSHSecretName string `json:"sshSecretName"`
// SSHSecretNamespace is the namespace where the MPI SSH secret is located
SSHSecretNamespace string `json:"sshSecretNamespace"`
}
// CheckpointConfiguration holds checkpoint/restore settings.
type CheckpointConfiguration struct {
// Enabled indicates if checkpoint functionality is enabled
Enabled bool `json:"enabled"`
// ReadyForCheckpointFilePath signals model readiness for checkpoint jobs
// +kubebuilder:default="/tmp/ready-for-checkpoint"
ReadyForCheckpointFilePath string `json:"readyForCheckpointFilePath"`
// Storage holds storage backend configuration
Storage CheckpointStorageConfiguration `json:"storage"`
}
// CheckpointStorageConfiguration holds storage backend configuration for checkpoints.
type CheckpointStorageConfiguration struct {
// Type is the storage backend type: pvc, s3, or oci
// +kubebuilder:default="pvc"
Type string `json:"type"`
// PVC configuration (used when Type=pvc)
PVC CheckpointPVCConfig `json:"pvc"`
// S3 configuration (used when Type=s3)
S3 CheckpointS3Config `json:"s3"`
// OCI configuration (used when Type=oci)
OCI CheckpointOCIConfig `json:"oci"`
}
// CheckpointPVCConfig holds PVC storage configuration.
type CheckpointPVCConfig struct {
// PVCName is the name of the PVC
// +kubebuilder:default="chrek-pvc"
PVCName string `json:"pvcName"`
// BasePath is the base directory within the PVC
// +kubebuilder:default="/checkpoints"
BasePath string `json:"basePath"`
}
// CheckpointS3Config holds S3 storage configuration.
type CheckpointS3Config struct {
// URI is the S3 URI (s3://[endpoint/]bucket/prefix)
URI string `json:"uri"`
// CredentialsSecretRef is the name of the credentials secret
CredentialsSecretRef string `json:"credentialsSecretRef"`
}
// CheckpointOCIConfig holds OCI registry storage configuration.
type CheckpointOCIConfig struct {
// URI is the OCI URI (oci://registry/repository)
URI string `json:"uri"`
// CredentialsSecretRef is the name of the docker config secret
CredentialsSecretRef string `json:"credentialsSecretRef"`
}
// DiscoveryConfiguration holds discovery backend settings.
type DiscoveryConfiguration struct {
// Backend is the discovery backend: "kubernetes" or "etcd"
// +kubebuilder:default="kubernetes"
Backend DiscoveryBackend `json:"backend"`
}
// DiscoveryBackend is the type for the discovery backend.
type DiscoveryBackend string
const (
// DiscoveryBackendKubernetes is the Kubernetes discovery backend
DiscoveryBackendKubernetes DiscoveryBackend = "kubernetes"
// DiscoveryBackendEtcd is the etcd discovery backend
DiscoveryBackendEtcd DiscoveryBackend = "etcd"
)
// GPUConfiguration holds GPU discovery settings.
type GPUConfiguration struct {
// DiscoveryEnabled indicates whether GPU discovery is enabled
// +kubebuilder:default=true
DiscoveryEnabled *bool `json:"discoveryEnabled,omitempty"`
}
// LoggingConfiguration holds logging settings.
type LoggingConfiguration struct {
// Level is the log level (e.g., "info", "debug")
// +kubebuilder:default="info"
Level string `json:"level"`
// Format is the log format (e.g., "json", "text")
// +kubebuilder:default="json"
Format string `json:"format"`
}
// SecurityConfiguration holds HTTP/2 and TLS settings.
type SecurityConfiguration struct {
// EnableHTTP2 enables HTTP/2 for metrics and webhook servers
// +kubebuilder:default=false
EnableHTTP2 bool `json:"enableHTTP2"`
}
//go:build !ignore_autogenerated
/*
SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
Copyright 2024.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by controller-gen. DO NOT EDIT.
package v1alpha1
import (
"k8s.io/apimachinery/pkg/runtime"
)
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *CheckpointConfiguration) DeepCopyInto(out *CheckpointConfiguration) {
*out = *in
out.Storage = in.Storage
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CheckpointConfiguration.
func (in *CheckpointConfiguration) DeepCopy() *CheckpointConfiguration {
if in == nil {
return nil
}
out := new(CheckpointConfiguration)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *CheckpointOCIConfig) DeepCopyInto(out *CheckpointOCIConfig) {
*out = *in
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CheckpointOCIConfig.
func (in *CheckpointOCIConfig) DeepCopy() *CheckpointOCIConfig {
if in == nil {
return nil
}
out := new(CheckpointOCIConfig)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *CheckpointPVCConfig) DeepCopyInto(out *CheckpointPVCConfig) {
*out = *in
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CheckpointPVCConfig.
func (in *CheckpointPVCConfig) DeepCopy() *CheckpointPVCConfig {
if in == nil {
return nil
}
out := new(CheckpointPVCConfig)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *CheckpointS3Config) DeepCopyInto(out *CheckpointS3Config) {
*out = *in
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CheckpointS3Config.
func (in *CheckpointS3Config) DeepCopy() *CheckpointS3Config {
if in == nil {
return nil
}
out := new(CheckpointS3Config)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *CheckpointStorageConfiguration) DeepCopyInto(out *CheckpointStorageConfiguration) {
*out = *in
out.PVC = in.PVC
out.S3 = in.S3
out.OCI = in.OCI
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CheckpointStorageConfiguration.
func (in *CheckpointStorageConfiguration) DeepCopy() *CheckpointStorageConfiguration {
if in == nil {
return nil
}
out := new(CheckpointStorageConfiguration)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *DiscoveryConfiguration) DeepCopyInto(out *DiscoveryConfiguration) {
*out = *in
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DiscoveryConfiguration.
func (in *DiscoveryConfiguration) DeepCopy() *DiscoveryConfiguration {
if in == nil {
return nil
}
out := new(DiscoveryConfiguration)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *GPUConfiguration) DeepCopyInto(out *GPUConfiguration) {
*out = *in
if in.DiscoveryEnabled != nil {
in, out := &in.DiscoveryEnabled, &out.DiscoveryEnabled
*out = new(bool)
**out = **in
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new GPUConfiguration.
func (in *GPUConfiguration) DeepCopy() *GPUConfiguration {
if in == nil {
return nil
}
out := new(GPUConfiguration)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *GroveConfiguration) DeepCopyInto(out *GroveConfiguration) {
*out = *in
if in.Enabled != nil {
in, out := &in.Enabled, &out.Enabled
*out = new(bool)
**out = **in
}
out.TerminationDelay = in.TerminationDelay
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new GroveConfiguration.
func (in *GroveConfiguration) DeepCopy() *GroveConfiguration {
if in == nil {
return nil
}
out := new(GroveConfiguration)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *InfrastructureConfiguration) DeepCopyInto(out *InfrastructureConfiguration) {
*out = *in
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new InfrastructureConfiguration.
func (in *InfrastructureConfiguration) DeepCopy() *InfrastructureConfiguration {
if in == nil {
return nil
}
out := new(InfrastructureConfiguration)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *IngressConfiguration) DeepCopyInto(out *IngressConfiguration) {
*out = *in
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new IngressConfiguration.
func (in *IngressConfiguration) DeepCopy() *IngressConfiguration {
if in == nil {
return nil
}
out := new(IngressConfiguration)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *KaiSchedulerConfiguration) DeepCopyInto(out *KaiSchedulerConfiguration) {
*out = *in
if in.Enabled != nil {
in, out := &in.Enabled, &out.Enabled
*out = new(bool)
**out = **in
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new KaiSchedulerConfiguration.
func (in *KaiSchedulerConfiguration) DeepCopy() *KaiSchedulerConfiguration {
if in == nil {
return nil
}
out := new(KaiSchedulerConfiguration)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *LWSConfiguration) DeepCopyInto(out *LWSConfiguration) {
*out = *in
if in.Enabled != nil {
in, out := &in.Enabled, &out.Enabled
*out = new(bool)
**out = **in
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LWSConfiguration.
func (in *LWSConfiguration) DeepCopy() *LWSConfiguration {
if in == nil {
return nil
}
out := new(LWSConfiguration)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *LeaderElectionConfiguration) DeepCopyInto(out *LeaderElectionConfiguration) {
*out = *in
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LeaderElectionConfiguration.
func (in *LeaderElectionConfiguration) DeepCopy() *LeaderElectionConfiguration {
if in == nil {
return nil
}
out := new(LeaderElectionConfiguration)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *LoggingConfiguration) DeepCopyInto(out *LoggingConfiguration) {
*out = *in
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LoggingConfiguration.
func (in *LoggingConfiguration) DeepCopy() *LoggingConfiguration {
if in == nil {
return nil
}
out := new(LoggingConfiguration)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *MPIConfiguration) DeepCopyInto(out *MPIConfiguration) {
*out = *in
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MPIConfiguration.
func (in *MPIConfiguration) DeepCopy() *MPIConfiguration {
if in == nil {
return nil
}
out := new(MPIConfiguration)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *MetricsServer) DeepCopyInto(out *MetricsServer) {
*out = *in
out.Server = in.Server
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MetricsServer.
func (in *MetricsServer) DeepCopy() *MetricsServer {
if in == nil {
return nil
}
out := new(MetricsServer)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *NamespaceConfiguration) DeepCopyInto(out *NamespaceConfiguration) {
*out = *in
out.Scope = in.Scope
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NamespaceConfiguration.
func (in *NamespaceConfiguration) DeepCopy() *NamespaceConfiguration {
if in == nil {
return nil
}
out := new(NamespaceConfiguration)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *NamespaceScopeConfiguration) DeepCopyInto(out *NamespaceScopeConfiguration) {
*out = *in
out.LeaseDuration = in.LeaseDuration
out.LeaseRenewInterval = in.LeaseRenewInterval
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NamespaceScopeConfiguration.
func (in *NamespaceScopeConfiguration) DeepCopy() *NamespaceScopeConfiguration {
if in == nil {
return nil
}
out := new(NamespaceScopeConfiguration)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *OperatorConfiguration) DeepCopyInto(out *OperatorConfiguration) {
*out = *in
out.TypeMeta = in.TypeMeta
out.Server = in.Server
out.LeaderElection = in.LeaderElection
out.Namespace = in.Namespace
in.Orchestrators.DeepCopyInto(&out.Orchestrators)
out.Infrastructure = in.Infrastructure
out.Ingress = in.Ingress
out.RBAC = in.RBAC
out.MPI = in.MPI
out.Checkpoint = in.Checkpoint
out.Discovery = in.Discovery
in.GPU.DeepCopyInto(&out.GPU)
out.Logging = in.Logging
out.Security = in.Security
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OperatorConfiguration.
func (in *OperatorConfiguration) DeepCopy() *OperatorConfiguration {
if in == nil {
return nil
}
out := new(OperatorConfiguration)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *OperatorConfiguration) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *OrchestratorConfiguration) DeepCopyInto(out *OrchestratorConfiguration) {
*out = *in
in.Grove.DeepCopyInto(&out.Grove)
in.LWS.DeepCopyInto(&out.LWS)
in.KaiScheduler.DeepCopyInto(&out.KaiScheduler)
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OrchestratorConfiguration.
func (in *OrchestratorConfiguration) DeepCopy() *OrchestratorConfiguration {
if in == nil {
return nil
}
out := new(OrchestratorConfiguration)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *RBACConfiguration) DeepCopyInto(out *RBACConfiguration) {
*out = *in
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RBACConfiguration.
func (in *RBACConfiguration) DeepCopy() *RBACConfiguration {
if in == nil {
return nil
}
out := new(RBACConfiguration)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *SecurityConfiguration) DeepCopyInto(out *SecurityConfiguration) {
*out = *in
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SecurityConfiguration.
func (in *SecurityConfiguration) DeepCopy() *SecurityConfiguration {
if in == nil {
return nil
}
out := new(SecurityConfiguration)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Server) DeepCopyInto(out *Server) {
*out = *in
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Server.
func (in *Server) DeepCopy() *Server {
if in == nil {
return nil
}
out := new(Server)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ServerConfiguration) DeepCopyInto(out *ServerConfiguration) {
*out = *in
out.Metrics = in.Metrics
out.HealthProbe = in.HealthProbe
out.Webhook = in.Webhook
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ServerConfiguration.
func (in *ServerConfiguration) DeepCopy() *ServerConfiguration {
if in == nil {
return nil
}
out := new(ServerConfiguration)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *WebhookServer) DeepCopyInto(out *WebhookServer) {
*out = *in
out.Server = in.Server
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new WebhookServer.
func (in *WebhookServer) DeepCopy() *WebhookServer {
if in == nil {
return nil
}
out := new(WebhookServer)
in.DeepCopyInto(out)
return out
}
/*
* SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package validation
import (
"net/url"
configv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/config/v1alpha1"
"k8s.io/apimachinery/pkg/util/validation/field"
)
// ValidateOperatorConfiguration validates an OperatorConfiguration object.
func ValidateOperatorConfiguration(config *configv1alpha1.OperatorConfiguration) field.ErrorList {
if config == nil {
return field.ErrorList{field.Required(field.NewPath(""), "operator configuration is required")}
}
allErrs := field.ErrorList{}
allErrs = append(allErrs, validateServer(&config.Server, field.NewPath("server"))...)
allErrs = append(allErrs, validateLeaderElection(&config.LeaderElection, field.NewPath("leaderElection"))...)
allErrs = append(allErrs, validateNamespace(&config.Namespace, field.NewPath("namespace"))...)
allErrs = append(allErrs, validateMPI(&config.MPI, field.NewPath("mpi"))...)
allErrs = append(allErrs, validateInfrastructure(&config.Infrastructure, field.NewPath("infrastructure"))...)
allErrs = append(allErrs, validateDiscovery(&config.Discovery, field.NewPath("discovery"))...)
allErrs = append(allErrs, validateCheckpoint(&config.Checkpoint, field.NewPath("checkpoint"))...)
allErrs = append(allErrs, validateRBAC(config)...)
allErrs = append(allErrs, validateOrchestrators(&config.Orchestrators, field.NewPath("orchestrators"))...)
allErrs = append(allErrs, validateIngress(&config.Ingress, field.NewPath("ingress"))...)
return allErrs
}
func validateServer(server *configv1alpha1.ServerConfiguration, fldPath *field.Path) field.ErrorList {
allErrs := field.ErrorList{}
if server.Metrics.Port < 0 || server.Metrics.Port > 65535 {
allErrs = append(allErrs, field.Invalid(fldPath.Child("metrics", "port"), server.Metrics.Port, "must be between 0 and 65535"))
}
if server.HealthProbe.Port < 0 || server.HealthProbe.Port > 65535 {
allErrs = append(allErrs, field.Invalid(fldPath.Child("healthProbe", "port"), server.HealthProbe.Port, "must be between 0 and 65535"))
}
if server.Webhook.Port < 0 || server.Webhook.Port > 65535 {
allErrs = append(allErrs, field.Invalid(fldPath.Child("webhook", "port"), server.Webhook.Port, "must be between 0 and 65535"))
}
return allErrs
}
func validateLeaderElection(le *configv1alpha1.LeaderElectionConfiguration, fldPath *field.Path) field.ErrorList {
allErrs := field.ErrorList{}
if le.Enabled && le.ID == "" {
allErrs = append(allErrs, field.Required(fldPath.Child("id"), "leader election ID is required when leader election is enabled"))
}
return allErrs
}
func validateNamespace(ns *configv1alpha1.NamespaceConfiguration, fldPath *field.Path) field.ErrorList {
allErrs := field.ErrorList{}
// Namespace-restricted mode validations
if ns.Restricted != "" {
scopePath := fldPath.Child("scope")
if ns.Scope.LeaseDuration.Duration <= 0 {
allErrs = append(allErrs, field.Invalid(scopePath.Child("leaseDuration"), ns.Scope.LeaseDuration.Duration, "must be greater than 0 in namespace-restricted mode"))
}
if ns.Scope.LeaseRenewInterval.Duration <= 0 {
allErrs = append(allErrs, field.Invalid(scopePath.Child("leaseRenewInterval"), ns.Scope.LeaseRenewInterval.Duration, "must be greater than 0 in namespace-restricted mode"))
}
if ns.Scope.LeaseRenewInterval.Duration > 0 && ns.Scope.LeaseDuration.Duration > 0 &&
ns.Scope.LeaseRenewInterval.Duration >= ns.Scope.LeaseDuration.Duration {
allErrs = append(allErrs, field.Invalid(scopePath.Child("leaseRenewInterval"), ns.Scope.LeaseRenewInterval.Duration, "must be less than leaseDuration"))
}
}
return allErrs
}
func validateMPI(mpi *configv1alpha1.MPIConfiguration, fldPath *field.Path) field.ErrorList {
allErrs := field.ErrorList{}
if mpi.SSHSecretName == "" {
allErrs = append(allErrs, field.Required(fldPath.Child("sshSecretName"), "MPI SSH secret name is required"))
}
if mpi.SSHSecretNamespace == "" {
allErrs = append(allErrs, field.Required(fldPath.Child("sshSecretNamespace"), "MPI SSH secret namespace is required"))
}
return allErrs
}
func validateInfrastructure(infra *configv1alpha1.InfrastructureConfiguration, fldPath *field.Path) field.ErrorList {
allErrs := field.ErrorList{}
if infra.ModelExpressURL != "" {
if _, err := url.Parse(infra.ModelExpressURL); err != nil {
allErrs = append(allErrs, field.Invalid(fldPath.Child("modelExpressURL"), infra.ModelExpressURL, "must be a valid URL"))
}
}
return allErrs
}
func validateDiscovery(discovery *configv1alpha1.DiscoveryConfiguration, fldPath *field.Path) field.ErrorList {
allErrs := field.ErrorList{}
if discovery.Backend != configv1alpha1.DiscoveryBackendKubernetes && discovery.Backend != configv1alpha1.DiscoveryBackendEtcd {
allErrs = append(allErrs, field.NotSupported(fldPath.Child("backend"), discovery.Backend, []string{"kubernetes", "etcd"}))
}
return allErrs
}
func validateCheckpoint(checkpoint *configv1alpha1.CheckpointConfiguration, fldPath *field.Path) field.ErrorList {
allErrs := field.ErrorList{}
if !checkpoint.Enabled {
return allErrs
}
storagePath := fldPath.Child("storage")
switch checkpoint.Storage.Type {
case configv1alpha1.CheckpointStorageTypePVC:
// PVC is the default, no additional required fields
case configv1alpha1.CheckpointStorageTypeS3:
if checkpoint.Storage.S3.URI == "" {
allErrs = append(allErrs, field.Required(storagePath.Child("s3", "uri"), "S3 URI is required when storage type is s3"))
}
case configv1alpha1.CheckpointStorageTypeOCI:
if checkpoint.Storage.OCI.URI == "" {
allErrs = append(allErrs, field.Required(storagePath.Child("oci", "uri"), "OCI URI is required when storage type is oci"))
}
default:
allErrs = append(allErrs, field.NotSupported(storagePath.Child("type"), checkpoint.Storage.Type,
[]string{configv1alpha1.CheckpointStorageTypePVC, configv1alpha1.CheckpointStorageTypeS3, configv1alpha1.CheckpointStorageTypeOCI}))
}
return allErrs
}
// validateRBAC is mode-aware: validates RBAC fields based on namespace mode.
func validateRBAC(config *configv1alpha1.OperatorConfiguration) field.ErrorList {
allErrs := field.ErrorList{}
// RBAC validation only applies in cluster-wide mode
if config.Namespace.Restricted != "" {
return allErrs
}
fldPath := field.NewPath("rbac")
if config.Namespace.Restricted == "" && config.RBAC.PlannerClusterRoleName == "" {
allErrs = append(allErrs, field.Required(fldPath.Child("plannerClusterRoleName"), "planner ClusterRole name is required in cluster-wide mode"))
}
if config.Namespace.Restricted == "" && config.RBAC.DGDRProfilingClusterRoleName == "" {
allErrs = append(allErrs, field.Required(fldPath.Child("dgdrProfilingClusterRoleName"), "DGDR profiling ClusterRole name is required in cluster-wide mode"))
}
if config.Namespace.Restricted == "" && config.RBAC.EPPClusterRoleName == "" {
allErrs = append(allErrs, field.Required(fldPath.Child("eppClusterRoleName"), "EPP ClusterRole name is required in cluster-wide mode"))
}
return allErrs
}
func validateOrchestrators(orch *configv1alpha1.OrchestratorConfiguration, fldPath *field.Path) field.ErrorList {
allErrs := field.ErrorList{}
if orch.Grove.TerminationDelay.Duration < 0 {
allErrs = append(allErrs, field.Invalid(fldPath.Child("grove", "terminationDelay"), orch.Grove.TerminationDelay.Duration, "must not be negative"))
}
return allErrs
}
func validateIngress(ingress *configv1alpha1.IngressConfiguration, fldPath *field.Path) field.ErrorList {
// No required fields — all ingress configuration is optional
_ = fldPath
_ = ingress
return nil
}
/*
* SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package validation
import (
"testing"
"time"
configv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/config/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// validConfig returns a minimal valid OperatorConfiguration for cluster-wide mode.
func validConfig() *configv1alpha1.OperatorConfiguration {
cfg := &configv1alpha1.OperatorConfiguration{}
configv1alpha1.SetDefaultsOperatorConfiguration(cfg)
cfg.MPI.SSHSecretName = "mpi-ssh"
cfg.MPI.SSHSecretNamespace = "default"
cfg.RBAC.PlannerClusterRoleName = "planner-role"
return cfg
}
// validNamespaceScopedConfig returns a minimal valid OperatorConfiguration for namespace-restricted mode.
func validNamespaceScopedConfig() *configv1alpha1.OperatorConfiguration {
cfg := validConfig()
cfg.Namespace.Restricted = "my-namespace"
// RBAC not required in namespace mode
cfg.RBAC.PlannerClusterRoleName = ""
return cfg
}
func TestValidateOperatorConfiguration_Valid(t *testing.T) {
errs := ValidateOperatorConfiguration(validConfig())
if len(errs) != 0 {
t.Errorf("expected no errors for valid config, got: %v", errs)
}
}
func TestValidateOperatorConfiguration_ValidNamespaceScoped(t *testing.T) {
errs := ValidateOperatorConfiguration(validNamespaceScopedConfig())
if len(errs) != 0 {
t.Errorf("expected no errors for valid namespace-scoped config, got: %v", errs)
}
}
func TestValidateOperatorConfiguration_MissingMPISecret(t *testing.T) {
cfg := validConfig()
cfg.MPI.SSHSecretName = ""
cfg.MPI.SSHSecretNamespace = ""
errs := ValidateOperatorConfiguration(cfg)
if len(errs) != 2 {
t.Errorf("expected 2 errors for missing MPI secret, got %d: %v", len(errs), errs)
}
}
func TestValidateOperatorConfiguration_InvalidDiscoveryBackend(t *testing.T) {
cfg := validConfig()
cfg.Discovery.Backend = "consul"
errs := ValidateOperatorConfiguration(cfg)
if len(errs) != 1 {
t.Errorf("expected 1 error for invalid discovery backend, got %d: %v", len(errs), errs)
}
}
func TestValidateOperatorConfiguration_ClusterWideMissingPlannerRole(t *testing.T) {
cfg := validConfig()
cfg.RBAC.PlannerClusterRoleName = ""
errs := ValidateOperatorConfiguration(cfg)
if len(errs) != 1 {
t.Errorf("expected 1 error for missing planner role in cluster-wide mode, got %d: %v", len(errs), errs)
}
}
func TestValidateOperatorConfiguration_NamespaceScopedNoRBACRequired(t *testing.T) {
cfg := validNamespaceScopedConfig()
// Verify that RBAC is not required in namespace mode
errs := ValidateOperatorConfiguration(cfg)
if len(errs) != 0 {
t.Errorf("expected no errors for namespace-scoped config without RBAC, got: %v", errs)
}
}
func TestValidateOperatorConfiguration_NamespaceScopedInvalidLease(t *testing.T) {
cfg := validNamespaceScopedConfig()
cfg.Namespace.Scope.LeaseDuration = metav1.Duration{Duration: 0}
cfg.Namespace.Scope.LeaseRenewInterval = metav1.Duration{Duration: 0}
errs := ValidateOperatorConfiguration(cfg)
if len(errs) != 2 {
t.Errorf("expected 2 errors for zero lease values, got %d: %v", len(errs), errs)
}
}
func TestValidateOperatorConfiguration_NamespaceScopedLeaseRenewExceedsDuration(t *testing.T) {
cfg := validNamespaceScopedConfig()
cfg.Namespace.Scope.LeaseDuration = metav1.Duration{Duration: 10 * time.Second}
cfg.Namespace.Scope.LeaseRenewInterval = metav1.Duration{Duration: 15 * time.Second}
errs := ValidateOperatorConfiguration(cfg)
if len(errs) != 1 {
t.Errorf("expected 1 error for lease renew > duration, got %d: %v", len(errs), errs)
}
}
func TestValidateOperatorConfiguration_CheckpointS3MissingURI(t *testing.T) {
cfg := validConfig()
cfg.Checkpoint.Enabled = true
cfg.Checkpoint.Storage.Type = configv1alpha1.CheckpointStorageTypeS3
cfg.Checkpoint.Storage.S3.URI = ""
errs := ValidateOperatorConfiguration(cfg)
if len(errs) != 1 {
t.Errorf("expected 1 error for missing S3 URI, got %d: %v", len(errs), errs)
}
}
func TestValidateOperatorConfiguration_CheckpointOCIMissingURI(t *testing.T) {
cfg := validConfig()
cfg.Checkpoint.Enabled = true
cfg.Checkpoint.Storage.Type = configv1alpha1.CheckpointStorageTypeOCI
cfg.Checkpoint.Storage.OCI.URI = ""
errs := ValidateOperatorConfiguration(cfg)
if len(errs) != 1 {
t.Errorf("expected 1 error for missing OCI URI, got %d: %v", len(errs), errs)
}
}
func TestValidateOperatorConfiguration_CheckpointInvalidStorageType(t *testing.T) {
cfg := validConfig()
cfg.Checkpoint.Enabled = true
cfg.Checkpoint.Storage.Type = "nfs"
errs := ValidateOperatorConfiguration(cfg)
if len(errs) != 1 {
t.Errorf("expected 1 error for invalid storage type, got %d: %v", len(errs), errs)
}
}
func TestValidateOperatorConfiguration_CheckpointDisabledSkipsValidation(t *testing.T) {
cfg := validConfig()
cfg.Checkpoint.Enabled = false
cfg.Checkpoint.Storage.Type = "invalid"
errs := ValidateOperatorConfiguration(cfg)
if len(errs) != 0 {
t.Errorf("expected no errors when checkpoint is disabled, got: %v", errs)
}
}
func TestValidateOperatorConfiguration_InvalidModelExpressURL(t *testing.T) {
cfg := validConfig()
cfg.Infrastructure.ModelExpressURL = "://bad-url"
errs := ValidateOperatorConfiguration(cfg)
if len(errs) != 1 {
t.Errorf("expected 1 error for invalid model express URL, got %d: %v", len(errs), errs)
}
}
func TestValidateOperatorConfiguration_InvalidPort(t *testing.T) {
cfg := validConfig()
cfg.Server.Metrics.Port = 99999
errs := ValidateOperatorConfiguration(cfg)
if len(errs) != 1 {
t.Errorf("expected 1 error for invalid port, got %d: %v", len(errs), errs)
}
}
func TestValidateOperatorConfiguration_LeaderElectionEnabledMissingID(t *testing.T) {
cfg := validConfig()
cfg.LeaderElection.Enabled = true
cfg.LeaderElection.ID = ""
errs := ValidateOperatorConfiguration(cfg)
if len(errs) != 1 {
t.Errorf("expected 1 error for missing leader election ID, got %d: %v", len(errs), errs)
}
}
func TestValidateOperatorConfiguration_NegativeTerminationDelay(t *testing.T) {
cfg := validConfig()
cfg.Orchestrators.Grove.TerminationDelay = metav1.Duration{Duration: -1 * time.Second}
errs := ValidateOperatorConfiguration(cfg)
if len(errs) != 1 {
t.Errorf("expected 1 error for negative termination delay, got %d: %v", len(errs), errs)
}
}
......@@ -23,7 +23,7 @@ import (
"context"
"crypto/tls"
"flag"
"net/url"
"fmt"
"os"
"time"
......@@ -40,9 +40,11 @@ import (
"k8s.io/client-go/restmapper"
"k8s.io/client-go/scale"
k8sCache "k8s.io/client-go/tools/cache"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/cache"
"k8s.io/apimachinery/pkg/runtime"
k8sruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
......@@ -55,9 +57,10 @@ import (
volcanoscheme "volcano.sh/apis/pkg/client/clientset/versioned/scheme"
semver "github.com/Masterminds/semver/v3"
configv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/config/v1alpha1"
configvalidation "github.com/ai-dynamo/dynamo/deploy/operator/api/config/validation"
nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
nvidiacomv1beta1 "github.com/ai-dynamo/dynamo/deploy/operator/api/v1beta1"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/controller"
commonController "github.com/ai-dynamo/dynamo/deploy/operator/internal/controller_common"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/modelendpoint"
......@@ -76,10 +79,33 @@ import (
)
var (
scheme = runtime.NewScheme()
setupLog = ctrl.Log.WithName("setup")
crdScheme = k8sruntime.NewScheme()
setupLog = ctrl.Log.WithName("setup")
configScheme = k8sruntime.NewScheme()
)
// LoadAndValidateOperatorConfig loads the operator configuration from a file,
// applies defaults via the scheme, and validates it.
func LoadAndValidateOperatorConfig(path string) (*configv1alpha1.OperatorConfiguration, error) {
data, err := os.ReadFile(path)
if err != nil {
return nil, fmt.Errorf("failed to read config file %s: %w", path, err)
}
codecFactory := serializer.NewCodecFactory(configScheme)
cfg := &configv1alpha1.OperatorConfiguration{}
if err := k8sruntime.DecodeInto(codecFactory.UniversalDecoder(), data, cfg); err != nil {
return nil, fmt.Errorf("failed to decode config file %s: %w", path, err)
}
// Validate the configuration
if errs := configvalidation.ValidateOperatorConfiguration(cfg); len(errs) > 0 {
return nil, fmt.Errorf("config validation failed: %s", errs.ToAggregate().Error())
}
return cfg, nil
}
func createScalesGetter(mgr ctrl.Manager) (scale.ScalesGetter, error) {
config := mgr.GetConfig()
......@@ -108,141 +134,41 @@ func createScalesGetter(mgr ctrl.Manager) (scale.ScalesGetter, error) {
return scalesGetter, nil
}
func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
func initCRDSchemes() {
utilruntime.Must(clientgoscheme.AddToScheme(crdScheme))
utilruntime.Must(nvidiacomv1alpha1.AddToScheme(crdScheme))
utilruntime.Must(nvidiacomv1alpha1.AddToScheme(scheme))
utilruntime.Must(nvidiacomv1beta1.AddToScheme(crdScheme))
utilruntime.Must(lwsscheme.AddToScheme(scheme))
utilruntime.Must(lwsscheme.AddToScheme(crdScheme))
utilruntime.Must(volcanoscheme.AddToScheme(scheme))
utilruntime.Must(volcanoscheme.AddToScheme(crdScheme))
utilruntime.Must(grovev1alpha1.AddToScheme(scheme))
utilruntime.Must(grovev1alpha1.AddToScheme(crdScheme))
utilruntime.Must(apiextensionsv1.AddToScheme(scheme))
utilruntime.Must(apiextensionsv1.AddToScheme(crdScheme))
utilruntime.Must(istioclientsetscheme.AddToScheme(scheme))
utilruntime.Must(istioclientsetscheme.AddToScheme(crdScheme))
utilruntime.Must(gaiev1.Install(scheme))
utilruntime.Must(nvidiacomv1beta1.AddToScheme(scheme))
utilruntime.Must(gaiev1.Install(crdScheme))
//+kubebuilder:scaffold:scheme
}
func initConfigScheme() {
utilruntime.Must(configv1alpha1.AddToScheme(configScheme))
}
//nolint:gocyclo
func main() {
var metricsAddr string
var enableLeaderElection bool
var probeAddr string
var secureMetrics bool
var enableHTTP2 bool
var restrictedNamespace string
var leaderElectionID string
var leaderElectionNamespace string
var natsAddr string
var etcdAddr string
var istioVirtualServiceGateway string
var virtualServiceSupportsHTTPS bool
var ingressControllerClassName string
var ingressControllerTLSSecretName string
var ingressHostSuffix string
var groveTerminationDelay time.Duration
var modelExpressURL string
var prometheusEndpoint string
var mpiRunSecretName string
var mpiRunSecretNamespace string
var plannerClusterRoleName string
var dgdrProfilingClusterRoleName string
var eppClusterRoleName string
var namespaceScopeLeaseDuration time.Duration
var namespaceScopeLeaseRenewInterval time.Duration
initCRDSchemes()
initConfigScheme()
var configFile string
var operatorVersion string
var discoveryBackend string
var gpuDiscoveryEnabled bool
// Checkpoint configuration
var checkpointEnabled bool
var checkpointStorageType string
var checkpointPVCName string
var checkpointPVCBasePath string
var checkpointS3URI string
var checkpointS3CredentialsSecret string
var checkpointOCIURI string
var checkpointOCICredentialsSecret string
var checkpointReadyForCheckpointFilePath string
flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
flag.BoolVar(&enableLeaderElection, "leader-elect", false,
"Enable leader election for controller manager. "+
"Enabling this will ensure there is only one active controller manager.")
flag.BoolVar(&secureMetrics, "metrics-secure", false,
"If set the metrics endpoint is served securely")
flag.BoolVar(&enableHTTP2, "enable-http2", false,
"If set, HTTP/2 will be enabled for the metrics and webhook servers")
flag.BoolVar(&gpuDiscoveryEnabled, "gpu-discovery-enabled", true,
"Whether GPU discovery is enabled for namespace-scoped operators. When true (default), "+
"the Helm chart has provisioned a ClusterRole granting node read access for GPU hardware discovery.")
flag.StringVar(&restrictedNamespace, "restrictedNamespace", "",
"Enable resources filtering, only the resources belonging to the given namespace will be handled.")
flag.StringVar(&leaderElectionID, "leader-election-id", "", "Leader election id"+
"Id to use for the leader election.")
flag.StringVar(&leaderElectionNamespace,
"leader-election-namespace", "",
"Namespace where the leader election resource will be created (default: same as operator namespace)")
flag.StringVar(&natsAddr, "natsAddr", "", "address of the NATS server")
flag.StringVar(&etcdAddr, "etcdAddr", "", "address of the etcd server")
flag.StringVar(&istioVirtualServiceGateway, "istio-virtual-service-gateway", "",
"The name of the istio virtual service gateway to use")
flag.BoolVar(&virtualServiceSupportsHTTPS, "virtual-service-supports-https", false,
"If set, assume VirtualService endpoints are HTTPS")
flag.StringVar(&ingressControllerClassName, "ingress-controller-class-name", "",
"The name of the ingress controller class to use")
flag.StringVar(&ingressControllerTLSSecretName, "ingress-controller-tls-secret-name", "",
"The name of the ingress controller TLS secret to use")
flag.StringVar(&ingressHostSuffix, "ingress-host-suffix", "",
"The suffix to use for the ingress host")
flag.DurationVar(&groveTerminationDelay, "grove-termination-delay", consts.DefaultGroveTerminationDelay,
"The termination delay for Grove PodCliqueSets")
flag.StringVar(&modelExpressURL, "model-express-url", "",
"URL of the Model Express server to inject into all pods")
flag.StringVar(&prometheusEndpoint, "prometheus-endpoint", "",
"URL of the Prometheus endpoint to use for metrics")
flag.StringVar(&mpiRunSecretName, "mpi-run-ssh-secret-name", "",
"Name of the secret containing the SSH key for MPI Run (required)")
flag.StringVar(&mpiRunSecretNamespace, "mpi-run-ssh-secret-namespace", "",
"Namespace where the MPI SSH secret is located (required)")
flag.StringVar(&plannerClusterRoleName, "planner-cluster-role-name", "",
"Name of the ClusterRole for planner (cluster-wide mode only)")
flag.StringVar(&dgdrProfilingClusterRoleName, "dgdr-profiling-cluster-role-name", "",
"Name of the ClusterRole for DGDR profiling jobs (cluster-wide mode only)")
flag.StringVar(&eppClusterRoleName, "epp-cluster-role-name", "",
"Name of the ClusterRole for EPP (cluster-wide mode only)")
flag.DurationVar(&namespaceScopeLeaseDuration, "namespace-scope-lease-duration", 30*time.Second,
"Duration of namespace scope marker lease before expiration (namespace-restricted mode only)")
flag.DurationVar(&namespaceScopeLeaseRenewInterval, "namespace-scope-lease-renew-interval", 10*time.Second,
"Interval for renewing namespace scope marker lease (namespace-restricted mode only)")
flag.StringVar(&configFile, "config", "", "Path to operator configuration file (required)")
flag.StringVar(&operatorVersion, "operator-version", "unknown",
"Version of the operator (used in lease holder identity)")
flag.StringVar(&discoveryBackend, "discovery-backend", "kubernetes",
"Discovery backend to use: 'kubernetes' (default, uses Kubernetes API) or 'etcd' (uses ETCD)")
// Checkpoint flags
flag.BoolVar(&checkpointEnabled, "checkpoint-enabled", false,
"Enable checkpoint/restore functionality")
flag.StringVar(&checkpointStorageType, "checkpoint-storage-type", commonController.CheckpointStorageTypePVC,
"Checkpoint storage backend type: pvc, s3, or oci")
flag.StringVar(&checkpointPVCName, "checkpoint-pvc-name", "chrek-pvc",
"Name of the PVC for checkpoint storage (used when storage-type=pvc)")
flag.StringVar(&checkpointPVCBasePath, "checkpoint-pvc-base-path", "/checkpoints",
"Base path within the PVC for storing checkpoints (used when storage-type=pvc)")
flag.StringVar(&checkpointS3URI, "checkpoint-s3-uri", "",
"S3 URI for checkpoint storage: s3://[endpoint/]bucket/prefix (used when storage-type=s3)")
flag.StringVar(&checkpointS3CredentialsSecret, "checkpoint-s3-credentials-secret", "",
"Secret name containing AWS credentials (used when storage-type=s3)")
flag.StringVar(&checkpointOCIURI, "checkpoint-oci-uri", "",
"OCI URI for checkpoint storage: oci://registry/repository (used when storage-type=oci)")
flag.StringVar(&checkpointOCICredentialsSecret, "checkpoint-oci-credentials-secret", "",
"Docker config secret name for OCI registry auth (used when storage-type=oci)")
flag.StringVar(&checkpointReadyForCheckpointFilePath,
"checkpoint-ready-for-checkpoint-file-path", "/tmp/ready-for-checkpoint",
"Path written by the worker container when the model is loaded and ready for checkpointing")
opts := zap.Options{
Development: true,
}
......@@ -250,96 +176,29 @@ func main() {
flag.Parse()
ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))
if restrictedNamespace == "" && plannerClusterRoleName == "" {
setupLog.Error(nil, "planner-cluster-role-name is required in cluster-wide mode")
if configFile == "" {
setupLog.Error(nil, "--config flag is required")
os.Exit(1)
}
// Validate and normalize operator version to semver
if _, err := semver.NewVersion(operatorVersion); err != nil {
setupLog.Info("WARNING: operator-version is not valid semver, falling back to 0.0.0-unknown",
"provided", operatorVersion, "error", err.Error())
operatorVersion = "0.0.0-unknown"
}
setupLog.Info("Operator version configured", "version", operatorVersion)
// Validate discoveryBackend value
if discoveryBackend != "kubernetes" && discoveryBackend != "etcd" {
setupLog.Error(nil, "invalid discovery-backend value, must be 'kubernetes' or 'etcd'", "value", discoveryBackend)
os.Exit(1)
}
setupLog.Info("Discovery backend configured", "backend", discoveryBackend)
// Validate modelExpressURL if provided
if modelExpressURL != "" {
if _, err := url.Parse(modelExpressURL); err != nil {
setupLog.Error(err, "invalid model-express-url provided", "url", modelExpressURL)
os.Exit(1)
}
setupLog.Info("Model Express URL configured", "url", modelExpressURL)
}
if mpiRunSecretName == "" {
setupLog.Error(nil, "mpi-run-ssh-secret-name is required")
// Load, default, and validate operator configuration
operatorCfg, err := LoadAndValidateOperatorConfig(configFile)
if err != nil {
setupLog.Error(err, "failed to load operator configuration", "configFile", configFile)
os.Exit(1)
}
setupLog.Info("Operator configuration loaded successfully", "configFile", configFile)
if mpiRunSecretNamespace == "" {
setupLog.Error(nil, "mpi-run-ssh-secret-namespace is required")
// Validate and normalize operator version to semver
if _, err := semver.NewVersion(operatorVersion); err != nil {
setupLog.Error(err, "operator-version is not valid semver",
"provided", operatorVersion, "error", err.Error())
os.Exit(1)
}
setupLog.Info("Operator version configured", "version", operatorVersion)
ctrlConfig := commonController.Config{
RestrictedNamespace: restrictedNamespace,
Grove: commonController.GroveConfig{
Enabled: false, // Will be set after Grove discovery
TerminationDelay: groveTerminationDelay,
},
LWS: commonController.LWSConfig{
Enabled: false, // Will be set after LWS discovery
},
KaiScheduler: commonController.KaiSchedulerConfig{
Enabled: false, // Will be set after Kai-scheduler discovery
},
EtcdAddress: etcdAddr,
NatsAddress: natsAddr,
IngressConfig: commonController.IngressConfig{
VirtualServiceGateway: istioVirtualServiceGateway,
IngressControllerClassName: ingressControllerClassName,
IngressControllerTLSSecret: ingressControllerTLSSecretName,
IngressHostSuffix: ingressHostSuffix,
},
ModelExpressURL: modelExpressURL,
PrometheusEndpoint: prometheusEndpoint,
MpiRun: commonController.MpiRunConfig{
SecretName: mpiRunSecretName,
},
RBAC: commonController.RBACConfig{
PlannerClusterRoleName: plannerClusterRoleName,
DGDRProfilingClusterRoleName: dgdrProfilingClusterRoleName,
EPPClusterRoleName: eppClusterRoleName,
},
DiscoveryBackend: discoveryBackend,
Checkpoint: commonController.CheckpointConfig{
Enabled: checkpointEnabled,
ReadyForCheckpointFilePath: checkpointReadyForCheckpointFilePath,
Storage: commonController.CheckpointStorageConfig{
Type: checkpointStorageType,
PVC: commonController.CheckpointPVCConfig{
PVCName: checkpointPVCName,
BasePath: checkpointPVCBasePath,
},
S3: commonController.CheckpointS3Config{
URI: checkpointS3URI,
CredentialsSecretRef: checkpointS3CredentialsSecret,
},
OCI: commonController.CheckpointOCIConfig{
URI: checkpointOCIURI,
CredentialsSecretRef: checkpointOCICredentialsSecret,
},
},
},
}
// Initialize runtime config (will be populated after detection)
runtimeConfig := &commonController.RuntimeConfig{}
mainCtx := ctrl.SetupSignalHandler()
......@@ -355,44 +214,37 @@ func main() {
}
tlsOpts := []func(*tls.Config){}
if !enableHTTP2 {
if !operatorCfg.Security.EnableHTTP2 {
tlsOpts = append(tlsOpts, disableHTTP2)
}
webhookServer := webhook.NewServer(webhook.Options{
// Bind to all interfaces so the Service can reach the webhook server
Host: "0.0.0.0",
// Must match the port exposed by the manager container and targeted by the Service.
Port: 9443,
// Must match the mountPath of the webhook certificate secret in the Deployment.
CertDir: "/tmp/k8s-webhook-server/serving-certs",
Host: operatorCfg.Server.Webhook.Host,
Port: operatorCfg.Server.Webhook.Port,
CertDir: operatorCfg.Server.Webhook.CertDir,
TLSOpts: tlsOpts,
})
metricsBindAddr := fmt.Sprintf("%s:%d", operatorCfg.Server.Metrics.BindAddress, operatorCfg.Server.Metrics.Port)
healthProbeAddr := fmt.Sprintf(
"%s:%d", operatorCfg.Server.HealthProbe.BindAddress, operatorCfg.Server.HealthProbe.Port,
)
mgrOpts := ctrl.Options{
Scheme: scheme,
Scheme: crdScheme,
Metrics: metricsserver.Options{
BindAddress: metricsAddr,
SecureServing: secureMetrics,
BindAddress: metricsBindAddr,
SecureServing: operatorCfg.Server.Metrics.Secure,
TLSOpts: tlsOpts,
},
WebhookServer: webhookServer,
HealthProbeBindAddress: probeAddr,
LeaderElection: enableLeaderElection,
LeaderElectionID: leaderElectionID,
LeaderElectionNamespace: leaderElectionNamespace,
// LeaderElectionReleaseOnCancel defines if the leader should step down voluntarily
// when the Manager ends. This requires the binary to immediately end when the
// Manager is stopped, otherwise, this setting is unsafe. Setting this significantly
// speeds up voluntary leader transitions as the new leader don't have to wait
// LeaseDuration time first.
//
// In the default scaffold provided, the program ends immediately after
// the manager stops, so would be fine to enable this option. However,
// if you are doing or is intended to do any operation such as perform cleanups
// after the manager stops then its usage might be unsafe.
// LeaderElectionReleaseOnCancel: true,
HealthProbeBindAddress: healthProbeAddr,
LeaderElection: operatorCfg.LeaderElection.Enabled,
LeaderElectionID: operatorCfg.LeaderElection.ID,
LeaderElectionNamespace: operatorCfg.LeaderElection.Namespace,
}
restrictedNamespace := operatorCfg.Namespace.Restricted
if restrictedNamespace != "" {
mgrOpts.Cache.DefaultNamespaces = map[string]cache.Config{
restrictedNamespace: {},
......@@ -419,15 +271,15 @@ func main() {
// Namespace-restricted mode: Create and maintain namespace scope marker lease
setupLog.Info("Creating namespace scope marker lease manager",
"namespace", restrictedNamespace,
"leaseDuration", namespaceScopeLeaseDuration,
"renewInterval", namespaceScopeLeaseRenewInterval)
"leaseDuration", operatorCfg.Namespace.Scope.LeaseDuration.Duration,
"renewInterval", operatorCfg.Namespace.Scope.LeaseRenewInterval.Duration)
leaseManager, err = namespace_scope.NewLeaseManager(
mgr.GetConfig(),
restrictedNamespace,
operatorVersion,
namespaceScopeLeaseDuration,
namespaceScopeLeaseRenewInterval,
operatorCfg.Namespace.Scope.LeaseDuration.Duration,
operatorCfg.Namespace.Scope.LeaseRenewInterval.Duration,
)
if err != nil {
setupLog.Error(err, "unable to create namespace scope marker lease manager")
......@@ -481,34 +333,82 @@ func main() {
setupLog.Info("Namespace scope marker lease watcher started successfully")
// Pass leaseWatcher to controller config for namespace exclusion filtering
ctrlConfig.ExcludedNamespaces = leaseWatcher
// Pass leaseWatcher to runtime config for namespace exclusion filtering
runtimeConfig.ExcludedNamespaces = leaseWatcher
}
// Start resource counter background goroutine (after ExcludedNamespaces is set)
setupLog.Info("Starting resource counter")
go observability.StartResourceCounter(mainCtx, mgr.GetClient(), ctrlConfig.ExcludedNamespaces)
go observability.StartResourceCounter(mainCtx, mgr.GetClient(), runtimeConfig.ExcludedNamespaces)
// Detect orchestrators availability using discovery client
// Detect orchestrators availability using discovery client.
// Config overrides (*bool) take precedence over auto-detection:
// nil = auto-detect (backward compatible default)
// false = forcibly disabled regardless of API availability
// true = forcibly enabled; hard exit if API is not available (misconfiguration)
setupLog.Info("Detecting Grove availability...")
groveEnabled := commonController.DetectGroveAvailability(mainCtx, mgr)
ctrlConfig.Grove.Enabled = groveEnabled
groveDetected := commonController.DetectGroveAvailability(mainCtx, mgr)
switch {
case operatorCfg.Orchestrators.Grove.Enabled == nil:
runtimeConfig.GroveEnabled = groveDetected
case *operatorCfg.Orchestrators.Grove.Enabled:
if !groveDetected {
setupLog.Error(nil, "Grove is explicitly enabled in config but the Grove API group was not detected in the cluster")
os.Exit(1)
}
runtimeConfig.GroveEnabled = true
default:
setupLog.Info("Grove is explicitly disabled via config override")
runtimeConfig.GroveEnabled = false
}
setupLog.Info("Detecting LWS availability...")
lwsEnabled := commonController.DetectLWSAvailability(mainCtx, mgr)
lwsDetected := commonController.DetectLWSAvailability(mainCtx, mgr)
setupLog.Info("Detecting Volcano availability...")
volcanoEnabled := commonController.DetectVolcanoAvailability(mainCtx, mgr)
volcanoDetected := commonController.DetectVolcanoAvailability(mainCtx, mgr)
// LWS for multinode deployment usage depends on both LWS and Volcano availability
ctrlConfig.LWS.Enabled = lwsEnabled && volcanoEnabled
switch {
case operatorCfg.Orchestrators.LWS.Enabled == nil:
runtimeConfig.LWSEnabled = lwsDetected && volcanoDetected
case *operatorCfg.Orchestrators.LWS.Enabled:
if !lwsDetected {
setupLog.Error(nil, "LWS is explicitly enabled in config but the LWS API group was not detected in the cluster")
os.Exit(1)
}
if !volcanoDetected {
setupLog.Error(nil, "LWS is explicitly enabled in config but the Volcano API group was not detected in the cluster")
os.Exit(1)
}
runtimeConfig.LWSEnabled = true
default:
setupLog.Info("LWS is explicitly disabled via config override")
runtimeConfig.LWSEnabled = false
}
// Detect Kai-scheduler availability using discovery client
setupLog.Info("Detecting Kai-scheduler availability...")
kaiSchedulerEnabled := commonController.DetectKaiSchedulerAvailability(mainCtx, mgr)
ctrlConfig.KaiScheduler.Enabled = kaiSchedulerEnabled
kaiSchedulerDetected := commonController.DetectKaiSchedulerAvailability(mainCtx, mgr)
switch {
case operatorCfg.Orchestrators.KaiScheduler.Enabled == nil:
runtimeConfig.KaiSchedulerEnabled = kaiSchedulerDetected
case *operatorCfg.Orchestrators.KaiScheduler.Enabled:
if !kaiSchedulerDetected {
setupLog.Error(nil,
"Kai-scheduler is explicitly enabled in config but the scheduling.run.ai API group was not detected in the cluster",
)
os.Exit(1)
}
runtimeConfig.KaiSchedulerEnabled = true
default:
setupLog.Info("Kai-scheduler is explicitly disabled via config override")
runtimeConfig.KaiSchedulerEnabled = false
}
setupLog.Info("Detected orchestrators availability",
"grove", groveEnabled,
"lws", lwsEnabled,
"volcano", volcanoEnabled,
"kai-scheduler", kaiSchedulerEnabled,
"grove", runtimeConfig.GroveEnabled,
"lws", runtimeConfig.LWSEnabled,
"volcano", volcanoDetected,
"kai-scheduler", runtimeConfig.KaiSchedulerEnabled,
)
dockerSecretRetriever := secrets.NewDockerSecretIndexer(mgr.GetClient())
......@@ -601,14 +501,15 @@ func main() {
// Create MPI SSH SecretReplicator for cross-namespace secret replication
mpiSecretReplicator := secret.NewSecretReplicator(
mgr.GetClient(),
mpiRunSecretNamespace,
mpiRunSecretName,
operatorCfg.MPI.SSHSecretNamespace,
operatorCfg.MPI.SSHSecretName,
)
if err = (&controller.DynamoComponentDeploymentReconciler{
Client: mgr.GetClient(),
Recorder: mgr.GetEventRecorderFor("dynamocomponentdeployment"),
Config: ctrlConfig,
Config: operatorCfg,
RuntimeConfig: runtimeConfig,
DockerSecretRetriever: dockerSecretRetriever,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "DynamoComponentDeployment")
......@@ -627,7 +528,8 @@ func main() {
if err = (&controller.DynamoGraphDeploymentReconciler{
Client: mgr.GetClient(),
Recorder: mgr.GetEventRecorderFor("dynamographdeployment"),
Config: ctrlConfig,
Config: operatorCfg,
RuntimeConfig: runtimeConfig,
DockerSecretRetriever: dockerSecretRetriever,
ScaleClient: scaleClient,
MPISecretReplicator: mpiSecretReplicator,
......@@ -638,20 +540,22 @@ func main() {
}
if err = (&controller.DynamoGraphDeploymentScalingAdapterReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("dgdscalingadapter"),
Config: ctrlConfig,
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("dgdscalingadapter"),
Config: operatorCfg,
RuntimeConfig: runtimeConfig,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "DGDScalingAdapter")
os.Exit(1)
}
if err = (&controller.DynamoGraphDeploymentRequestReconciler{
Client: mgr.GetClient(),
Recorder: mgr.GetEventRecorderFor("dynamographdeploymentrequest"),
Config: ctrlConfig,
RBACManager: rbacManager,
Client: mgr.GetClient(),
Recorder: mgr.GetEventRecorderFor("dynamographdeploymentrequest"),
Config: operatorCfg,
RuntimeConfig: runtimeConfig,
RBACManager: rbacManager,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "DynamoGraphDeploymentRequest")
os.Exit(1)
......@@ -661,34 +565,31 @@ func main() {
Client: mgr.GetClient(),
Recorder: mgr.GetEventRecorderFor("dynamomodel"),
EndpointClient: modelendpoint.NewClient(),
Config: ctrlConfig,
Config: operatorCfg,
RuntimeConfig: runtimeConfig,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "DynamoModel")
os.Exit(1)
}
if err = (&controller.CheckpointReconciler{
Client: mgr.GetClient(),
Config: ctrlConfig,
Recorder: mgr.GetEventRecorderFor("checkpoint"),
Client: mgr.GetClient(),
Config: operatorCfg,
RuntimeConfig: runtimeConfig,
Recorder: mgr.GetEventRecorderFor("checkpoint"),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "DynamoCheckpoint")
os.Exit(1)
}
ctrlConfig.GPUDiscoveryEnabled = gpuDiscoveryEnabled
// Configure webhooks with lease-based namespace exclusion
// In cluster-wide mode, inject ctrlConfig.ExcludedNamespaces (leaseWatcher) so webhooks can defer
// to namespace-restricted operators. In namespace-restricted mode, webhooks validate without checking
// leases (ExcludedNamespaces is nil). The webhooks use LeaseAwareValidator wrapper to add coordination.
isClusterWide := ctrlConfig.RestrictedNamespace == ""
isClusterWide := operatorCfg.Namespace.Restricted == ""
if isClusterWide {
setupLog.Info("Configuring webhooks with lease-based namespace exclusion for cluster-wide mode")
internalwebhook.SetExcludedNamespaces(ctrlConfig.ExcludedNamespaces)
internalwebhook.SetExcludedNamespaces(runtimeConfig.ExcludedNamespaces)
} else {
setupLog.Info("Configuring webhooks for namespace-restricted mode (no lease checking)",
"restrictedNamespace", ctrlConfig.RestrictedNamespace)
"restrictedNamespace", operatorCfg.Namespace.Restricted)
internalwebhook.SetExcludedNamespaces(nil)
}
......@@ -713,7 +614,9 @@ func main() {
os.Exit(1)
}
dgdrHandler := webhookvalidation.NewDynamoGraphDeploymentRequestHandler(isClusterWide, gpuDiscoveryEnabled)
dgdrHandler := webhookvalidation.NewDynamoGraphDeploymentRequestHandler(
isClusterWide, ptr.Deref(operatorCfg.GPU.DiscoveryEnabled, true),
)
if err = dgdrHandler.RegisterWithManager(mgr); err != nil {
setupLog.Error(err, "unable to register webhook", "webhook", "DynamoGraphDeploymentRequest")
os.Exit(1)
......
......@@ -21,9 +21,9 @@ import (
"context"
"fmt"
configv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/config/v1alpha1"
nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
controller_common "github.com/ai-dynamo/dynamo/deploy/operator/internal/controller_common"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
......@@ -47,7 +47,7 @@ func getCheckpointInfoFromCheckpoint(ckpt *nvidiacomv1alpha1.DynamoCheckpoint) *
// getPVCBasePath returns the PVC base path from storage config.
// Only applicable for PVC storage type
func getPVCBasePath(storageConfig *controller_common.CheckpointStorageConfig) string {
func getPVCBasePath(storageConfig *configv1alpha1.CheckpointStorageConfiguration) string {
if storageConfig != nil && storageConfig.PVC.BasePath != "" {
return storageConfig.PVC.BasePath
}
......@@ -57,7 +57,7 @@ func getPVCBasePath(storageConfig *controller_common.CheckpointStorageConfig) st
// GetPVCBasePath returns the configured PVC base path from controller config.
// This is used by both CheckpointReconciler and DynamoGraphDeploymentReconciler.
// Only applicable for PVC storage type.
func GetPVCBasePath(config *controller_common.CheckpointConfig) string {
func GetPVCBasePath(config *configv1alpha1.CheckpointConfiguration) string {
if config != nil {
return getPVCBasePath(&config.Storage)
}
......@@ -154,7 +154,7 @@ func ResolveCheckpointForService(
// InjectCheckpointEnvVars adds checkpoint-related environment variables to a restored/DGD container.
// Sets PATH and HASH so the restored process knows its checkpoint identity.
// DYN_CHECKPOINT_LOCATION is reserved for future S3/OCI support.
func InjectCheckpointEnvVars(container *corev1.Container, info *CheckpointInfo, checkpointConfig *controller_common.CheckpointConfig) {
func InjectCheckpointEnvVars(container *corev1.Container, info *CheckpointInfo, checkpointConfig *configv1alpha1.CheckpointConfiguration) {
if !info.Enabled {
return
}
......@@ -163,13 +163,13 @@ func InjectCheckpointEnvVars(container *corev1.Container, info *CheckpointInfo,
// For PVC storage: inject base path so the restored process knows its checkpoint location.
// For S3/OCI (future): inject DYN_CHECKPOINT_LOCATION directly.
storageType := controller_common.CheckpointStorageTypePVC
storageType := configv1alpha1.CheckpointStorageTypePVC
if checkpointConfig != nil && checkpointConfig.Storage.Type != "" {
storageType = checkpointConfig.Storage.Type
}
switch storageType {
case controller_common.CheckpointStorageTypePVC:
case configv1alpha1.CheckpointStorageTypePVC:
basePath := ""
if checkpointConfig != nil {
basePath = getPVCBasePath(&checkpointConfig.Storage)
......@@ -340,7 +340,7 @@ func InjectPodInfoVolumeMount(container *corev1.Container) {
func InjectCheckpointIntoPodSpec(
podSpec *corev1.PodSpec,
checkpointInfo *CheckpointInfo,
checkpointConfig *controller_common.CheckpointConfig,
checkpointConfig *configv1alpha1.CheckpointConfiguration,
) error {
if checkpointInfo == nil || !checkpointInfo.Enabled {
return nil
......@@ -392,8 +392,8 @@ func InjectCheckpointIntoPodSpec(
}
// Determine storage type and compute location/path
storageType := controller_common.CheckpointStorageTypePVC // default
var storageConfig *controller_common.CheckpointStorageConfig
storageType := configv1alpha1.CheckpointStorageTypePVC // default
var storageConfig *configv1alpha1.CheckpointStorageConfiguration
if checkpointConfig != nil {
storageConfig = &checkpointConfig.Storage
if storageConfig.Type != "" {
......@@ -402,14 +402,14 @@ func InjectCheckpointIntoPodSpec(
}
switch storageType {
case controller_common.CheckpointStorageTypeS3:
case configv1alpha1.CheckpointStorageTypeS3:
info.StorageType = nvidiacomv1alpha1.DynamoCheckpointStorageType(storageType)
if storageConfig == nil || storageConfig.S3.URI == "" {
return fmt.Errorf("S3 storage type selected but no S3 URI configured (set checkpoint.storage.s3.uri)")
}
info.Location = fmt.Sprintf("%s/%s.tar", storageConfig.S3.URI, info.Hash)
case controller_common.CheckpointStorageTypeOCI:
case configv1alpha1.CheckpointStorageTypeOCI:
info.StorageType = nvidiacomv1alpha1.DynamoCheckpointStorageType(storageType)
if storageConfig == nil || storageConfig.OCI.URI == "" {
return fmt.Errorf("OCI storage type selected but no OCI URI configured (set checkpoint.storage.oci.uri)")
......
......@@ -21,9 +21,9 @@ import (
"context"
"testing"
configv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/config/v1alpha1"
nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
controller_common "github.com/ai-dynamo/dynamo/deploy/operator/internal/controller_common"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
......@@ -38,12 +38,12 @@ const (
testNamespace = "default"
)
func testPVCConfig() *controller_common.CheckpointConfig {
return &controller_common.CheckpointConfig{
func testPVCConfig() *configv1alpha1.CheckpointConfiguration {
return &configv1alpha1.CheckpointConfiguration{
Enabled: true,
Storage: controller_common.CheckpointStorageConfig{
Type: controller_common.CheckpointStorageTypePVC,
PVC: controller_common.CheckpointPVCConfig{
Storage: configv1alpha1.CheckpointStorageConfiguration{
Type: configv1alpha1.CheckpointStorageTypePVC,
PVC: configv1alpha1.CheckpointPVCConfig{
PVCName: "chrek-pvc",
BasePath: "/checkpoints",
},
......@@ -146,10 +146,10 @@ func TestInjectCheckpointEnvVars(t *testing.T) {
t.Run("S3 storage injects LOCATION and HASH", func(t *testing.T) {
container := &corev1.Container{}
info := &CheckpointInfo{Enabled: true, Hash: testHash, Location: "s3://bucket/" + testHash + ".tar"}
config := &controller_common.CheckpointConfig{
Storage: controller_common.CheckpointStorageConfig{
Type: controller_common.CheckpointStorageTypeS3,
S3: controller_common.CheckpointS3Config{URI: "s3://bucket"},
config := &configv1alpha1.CheckpointConfiguration{
Storage: configv1alpha1.CheckpointStorageConfiguration{
Type: configv1alpha1.CheckpointStorageTypeS3,
S3: configv1alpha1.CheckpointS3Config{URI: "s3://bucket"},
},
}
InjectCheckpointEnvVars(container, info, config)
......@@ -294,22 +294,22 @@ func TestInjectCheckpointIntoPodSpec(t *testing.T) {
t.Run("S3 and OCI storage set location", func(t *testing.T) {
for _, tc := range []struct {
storageType string
config controller_common.CheckpointStorageConfig
config configv1alpha1.CheckpointStorageConfiguration
wantLoc string
}{
{"s3", controller_common.CheckpointStorageConfig{
Type: controller_common.CheckpointStorageTypeS3,
S3: controller_common.CheckpointS3Config{URI: "s3://bucket/prefix"},
{"s3", configv1alpha1.CheckpointStorageConfiguration{
Type: configv1alpha1.CheckpointStorageTypeS3,
S3: configv1alpha1.CheckpointS3Config{URI: "s3://bucket/prefix"},
}, "s3://bucket/prefix/" + testHash + ".tar"},
{"oci", controller_common.CheckpointStorageConfig{
Type: controller_common.CheckpointStorageTypeOCI,
OCI: controller_common.CheckpointOCIConfig{URI: "oci://registry/repo"},
{"oci", configv1alpha1.CheckpointStorageConfiguration{
Type: configv1alpha1.CheckpointStorageTypeOCI,
OCI: configv1alpha1.CheckpointOCIConfig{URI: "oci://registry/repo"},
}, "oci://registry/repo:" + testHash},
} {
t.Run(tc.storageType, func(t *testing.T) {
podSpec := testPodSpec()
info := &CheckpointInfo{Enabled: true, Hash: testHash}
require.NoError(t, InjectCheckpointIntoPodSpec(podSpec, info, &controller_common.CheckpointConfig{Storage: tc.config}))
require.NoError(t, InjectCheckpointIntoPodSpec(podSpec, info, &configv1alpha1.CheckpointConfiguration{Storage: tc.config}))
assert.Equal(t, tc.wantLoc, info.Location)
})
}
......@@ -320,22 +320,22 @@ func TestInjectCheckpointIntoPodSpec(t *testing.T) {
name string
podSpec *corev1.PodSpec
info *CheckpointInfo
config *controller_common.CheckpointConfig
config *configv1alpha1.CheckpointConfiguration
errMsg string
}{
{"hash empty and identity nil", testPodSpec(), &CheckpointInfo{Enabled: true}, testPVCConfig(), "identity is nil"},
{"no containers", &corev1.PodSpec{}, testInfo(), testPVCConfig(), "no container found"},
{"PVC name missing", testPodSpec(), testInfo(), &controller_common.CheckpointConfig{
Storage: controller_common.CheckpointStorageConfig{Type: "pvc", PVC: controller_common.CheckpointPVCConfig{BasePath: "/checkpoints"}},
{"PVC name missing", testPodSpec(), testInfo(), &configv1alpha1.CheckpointConfiguration{
Storage: configv1alpha1.CheckpointStorageConfiguration{Type: "pvc", PVC: configv1alpha1.CheckpointPVCConfig{BasePath: "/checkpoints"}},
}, "no PVC name"},
{"PVC base path missing", testPodSpec(), testInfo(), &controller_common.CheckpointConfig{
Storage: controller_common.CheckpointStorageConfig{Type: "pvc", PVC: controller_common.CheckpointPVCConfig{PVCName: "chrek-pvc"}},
{"PVC base path missing", testPodSpec(), testInfo(), &configv1alpha1.CheckpointConfiguration{
Storage: configv1alpha1.CheckpointStorageConfiguration{Type: "pvc", PVC: configv1alpha1.CheckpointPVCConfig{PVCName: "chrek-pvc"}},
}, "no PVC base path"},
{"S3 URI missing", testPodSpec(), testInfo(), &controller_common.CheckpointConfig{
Storage: controller_common.CheckpointStorageConfig{Type: "s3"},
{"S3 URI missing", testPodSpec(), testInfo(), &configv1alpha1.CheckpointConfiguration{
Storage: configv1alpha1.CheckpointStorageConfiguration{Type: "s3"},
}, "S3"},
{"OCI URI missing", testPodSpec(), testInfo(), &controller_common.CheckpointConfig{
Storage: controller_common.CheckpointStorageConfig{Type: "oci"},
{"OCI URI missing", testPodSpec(), testInfo(), &configv1alpha1.CheckpointConfiguration{
Storage: configv1alpha1.CheckpointStorageConfiguration{Type: "oci"},
}, "OCI"},
} {
t.Run(tc.name, func(t *testing.T) {
......
......@@ -37,6 +37,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
configv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/config/v1alpha1"
"github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
commoncontroller "github.com/ai-dynamo/dynamo/deploy/operator/internal/controller_common"
......@@ -70,7 +71,8 @@ type DynamoModelReconciler struct {
client.Client
Recorder record.EventRecorder
EndpointClient *modelendpoint.Client
Config commoncontroller.Config
Config *configv1alpha1.OperatorConfiguration
RuntimeConfig *commoncontroller.RuntimeConfig
}
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamomodels,verbs=get;list;watch;create;update;patch;delete
......@@ -264,7 +266,7 @@ func (r *DynamoModelReconciler) SetupWithManager(mgr ctrl.Manager) error {
GenericFunc: func(e event.GenericEvent) bool { return false },
}),
).
WithEventFilter(commoncontroller.EphemeralDeploymentEventFilter(r.Config)). // set the event filter to ignore resources handled by other controllers in namespace-restricted mode
WithEventFilter(commoncontroller.EphemeralDeploymentEventFilter(r.Config, r.RuntimeConfig)). // set the event filter to ignore resources handled by other controllers in namespace-restricted mode
Complete(observability.NewObservedReconciler(r, consts.ResourceTypeDynamoModel))
}
......
......@@ -35,6 +35,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
configv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/config/v1alpha1"
nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/checkpoint"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
......@@ -44,8 +45,9 @@ import (
// CheckpointReconciler reconciles a DynamoCheckpoint object
type CheckpointReconciler struct {
client.Client
Config commonController.Config
Recorder record.EventRecorder
Config *configv1alpha1.OperatorConfiguration
RuntimeConfig *commonController.RuntimeConfig
Recorder record.EventRecorder
}
// Helper function to compute checkpoint location from operator config
......@@ -56,7 +58,7 @@ func (r *CheckpointReconciler) getCheckpointLocation(identityHash string) string
// Helper function to get checkpoint storage type from operator config
func (r *CheckpointReconciler) getCheckpointStorageType() nvidiacomv1alpha1.DynamoCheckpointStorageType {
return nvidiacomv1alpha1.DynamoCheckpointStorageType(commonController.CheckpointStorageTypePVC)
return nvidiacomv1alpha1.DynamoCheckpointStorageType(r.Config.Checkpoint.Storage.Type)
}
// GetRecorder returns the event recorder (implements controller_common.Reconciler interface)
......@@ -385,6 +387,6 @@ func (r *CheckpointReconciler) SetupWithManager(mgr ctrl.Manager) error {
UpdateFunc: func(ue event.UpdateEvent) bool { return true },
GenericFunc: func(ge event.GenericEvent) bool { return true },
})).
WithEventFilter(commonController.EphemeralDeploymentEventFilter(r.Config)).
WithEventFilter(commonController.EphemeralDeploymentEventFilter(r.Config, r.RuntimeConfig)).
Complete(r)
}
......@@ -21,9 +21,9 @@ import (
"context"
"testing"
configv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/config/v1alpha1"
nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
controller_common "github.com/ai-dynamo/dynamo/deploy/operator/internal/controller_common"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
batchv1 "k8s.io/api/batch/v1"
......@@ -50,14 +50,14 @@ func checkpointTestScheme() *runtime.Scheme {
return s
}
func checkpointTestConfig() controller_common.Config {
return controller_common.Config{
Checkpoint: controller_common.CheckpointConfig{
func checkpointTestConfig() *configv1alpha1.OperatorConfiguration {
return &configv1alpha1.OperatorConfiguration{
Checkpoint: configv1alpha1.CheckpointConfiguration{
Enabled: true,
ReadyForCheckpointFilePath: "/tmp/ready-for-checkpoint",
Storage: controller_common.CheckpointStorageConfig{
Type: controller_common.CheckpointStorageTypePVC,
PVC: controller_common.CheckpointPVCConfig{
Storage: configv1alpha1.CheckpointStorageConfiguration{
Type: configv1alpha1.CheckpointStorageTypePVC,
PVC: configv1alpha1.CheckpointPVCConfig{
PVCName: "chrek-pvc",
BasePath: "/checkpoints",
},
......
......@@ -33,6 +33,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"emperror.dev/errors"
configv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/config/v1alpha1"
"github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/checkpoint"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/common"
......@@ -69,7 +70,8 @@ const (
type DynamoComponentDeploymentReconciler struct {
client.Client
Recorder record.EventRecorder
Config commonController.Config
Config *configv1alpha1.OperatorConfiguration
RuntimeConfig *commonController.RuntimeConfig
DockerSecretRetriever dockerSecretRetriever
}
......@@ -180,7 +182,7 @@ func (r *DynamoComponentDeploymentReconciler) Reconcile(ctx context.Context, req
// Create the appropriate workload resource based on deployment type
var componentReconcileResult ComponentReconcileResult
if r.Config.LWS.Enabled && dynamoComponentDeployment.IsMultinode() {
if r.RuntimeConfig.LWSEnabled && dynamoComponentDeployment.IsMultinode() {
componentReconcileResult, err = r.reconcileLeaderWorkerSetResources(ctx, dynamoComponentDeployment)
} else {
componentReconcileResult, err = r.reconcileDeploymentResources(ctx, dynamoComponentDeployment)
......@@ -821,7 +823,7 @@ func (r *DynamoComponentDeploymentReconciler) createOrUpdateOrDeleteIngress(ctx
if err != nil {
return false, err
}
if r.Config.IngressConfig.UseVirtualService() {
if r.Config.Ingress.UseVirtualService() {
modified_, _, err := commonController.SyncResource(ctx, r, opt.dynamoComponentDeployment, func(ctx context.Context) (*networkingv1beta1.VirtualService, bool, error) {
return r.generateVirtualService(ctx, opt)
})
......@@ -1122,7 +1124,7 @@ func (r *DynamoComponentDeploymentReconciler) generateService(opt generateResour
},
}
isK8sDiscovery := r.Config.IsK8sDiscoveryEnabled(dcd.Spec.Annotations)
isK8sDiscovery := commonController.IsK8sDiscoveryEnabled(r.Config.Discovery.Backend, dcd.Spec.Annotations)
if !(isK8sDiscovery || dcd.IsFrontendComponent()) {
return deleteStub, true, nil
......@@ -1166,9 +1168,9 @@ func (r *DynamoComponentDeploymentReconciler) SetupWithManager(mgr ctrl.Manager)
Owns(&corev1.Service{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Owns(&networkingv1.Ingress{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Owns(&corev1.PersistentVolumeClaim{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
WithEventFilter(commonController.EphemeralDeploymentEventFilter(r.Config))
WithEventFilter(commonController.EphemeralDeploymentEventFilter(r.Config, r.RuntimeConfig))
if r.Config.LWS.Enabled {
if r.RuntimeConfig.LWSEnabled {
m.Owns(&leaderworkersetv1.LeaderWorkerSet{}, builder.WithPredicates(predicate.Funcs{
// ignore creation cause we don't want to be called again after we create the LeaderWorkerSet
CreateFunc: func(ce event.CreateEvent) bool { return false },
......@@ -1185,7 +1187,7 @@ func (r *DynamoComponentDeploymentReconciler) SetupWithManager(mgr ctrl.Manager)
}))
}
if r.Config.IngressConfig.UseVirtualService() {
if r.Config.Ingress.UseVirtualService() {
m.Owns(&networkingv1beta1.VirtualService{}, builder.WithPredicates(predicate.GenerationChangedPredicate{}))
}
m.Owns(&autoscalingv2.HorizontalPodAutoscaler{})
......
......@@ -23,6 +23,7 @@ import (
"context"
"testing"
configv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/config/v1alpha1"
"github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
commonconsts "github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/controller_common"
......@@ -436,7 +437,7 @@ func TestDynamoComponentDeploymentReconciler_generateVolcanoPodGroup(t *testing.
type fields struct {
Client client.Client
Recorder record.EventRecorder
Config controller_common.Config
Config *configv1alpha1.OperatorConfiguration
}
type args struct {
ctx context.Context
......@@ -579,7 +580,8 @@ func TestDynamoComponentDeploymentReconciler_generateLeaderWorkerSet(t *testing.
type fields struct {
Client client.Client
Recorder record.EventRecorder
Config controller_common.Config
Config *configv1alpha1.OperatorConfiguration
RuntimeConfig *controller_common.RuntimeConfig
DockerSecretRetriever *mockDockerSecretRetriever
}
type args struct {
......@@ -600,8 +602,9 @@ func TestDynamoComponentDeploymentReconciler_generateLeaderWorkerSet(t *testing.
{
name: "generateLeaderWorkerSet - nominal case",
fields: fields{
Recorder: record.NewFakeRecorder(100),
Config: controller_common.Config{}, // Provide default or test-specific config
Recorder: record.NewFakeRecorder(100),
Config: &configv1alpha1.OperatorConfiguration{},
RuntimeConfig: &controller_common.RuntimeConfig{},
DockerSecretRetriever: &mockDockerSecretRetriever{
GetSecretsFunc: func(namespace, imageName string) ([]string, error) {
return []string{}, nil
......@@ -973,7 +976,14 @@ func TestDynamoComponentDeploymentReconciler_generateLeaderWorkerSet(t *testing.
{
name: "nil instanceID", // This test should fail before r.List is called in generatePodTemplateSpec
fields: fields{
Recorder: record.NewFakeRecorder(100),
Recorder: record.NewFakeRecorder(100),
Config: &configv1alpha1.OperatorConfiguration{},
RuntimeConfig: &controller_common.RuntimeConfig{},
DockerSecretRetriever: &mockDockerSecretRetriever{
GetSecretsFunc: func(namespace, imageName string) ([]string, error) {
return []string{}, nil
},
},
},
args: args{
ctx: context.Background(),
......@@ -1016,7 +1026,14 @@ func TestDynamoComponentDeploymentReconciler_generateLeaderWorkerSet(t *testing.
{
name: "error from generateLeaderPodTemplateSpec", // This case involves an error from generatePodTemplateSpec
fields: fields{
Recorder: record.NewFakeRecorder(100),
Recorder: record.NewFakeRecorder(100),
Config: &configv1alpha1.OperatorConfiguration{},
RuntimeConfig: &controller_common.RuntimeConfig{},
DockerSecretRetriever: &mockDockerSecretRetriever{
GetSecretsFunc: func(namespace, imageName string) ([]string, error) {
return []string{}, nil
},
},
},
args: args{
ctx: context.Background(),
......@@ -1095,6 +1112,7 @@ func TestDynamoComponentDeploymentReconciler_generateLeaderWorkerSet(t *testing.
Client: fakeKubeClient, // Use the fake client
Recorder: tt.fields.Recorder,
Config: tt.fields.Config,
RuntimeConfig: tt.fields.RuntimeConfig,
DockerSecretRetriever: tt.fields.DockerSecretRetriever,
// Scheme: s, // Pass scheme if reconciler uses it directly, often client uses it
}
......@@ -1159,9 +1177,10 @@ func TestDynamoComponentDeploymentReconciler_createOrUpdateOrDeleteDeployments_R
// Set up reconciler
recorder := record.NewFakeRecorder(100)
reconciler := &DynamoComponentDeploymentReconciler{
Client: fakeKubeClient,
Recorder: recorder,
Config: controller_common.Config{},
Client: fakeKubeClient,
Recorder: recorder,
Config: &configv1alpha1.OperatorConfiguration{},
RuntimeConfig: &controller_common.RuntimeConfig{},
DockerSecretRetriever: &mockDockerSecretRetriever{
GetSecretsFunc: func(namespace, imageName string) ([]string, error) {
return []string{}, nil
......@@ -1273,12 +1292,12 @@ func TestDynamoComponentDeploymentReconciler_generatePodTemplateSpec_RestoreLabe
WithScheme(s).
WithObjects(objs...).
Build(),
Config: controller_common.Config{
Checkpoint: controller_common.CheckpointConfig{
Config: &configv1alpha1.OperatorConfiguration{
Checkpoint: configv1alpha1.CheckpointConfiguration{
Enabled: true,
Storage: controller_common.CheckpointStorageConfig{
Type: controller_common.CheckpointStorageTypePVC,
PVC: controller_common.CheckpointPVCConfig{
Storage: configv1alpha1.CheckpointStorageConfiguration{
Type: configv1alpha1.CheckpointStorageTypePVC,
PVC: configv1alpha1.CheckpointPVCConfig{
PVCName: "chrek-pvc",
BasePath: "/checkpoints",
},
......@@ -1405,12 +1424,12 @@ func TestDynamoComponentDeploymentReconciler_generateDeployment_RestoreStrategy(
WithScheme(s).
WithObjects(objs...).
Build(),
Config: controller_common.Config{
Checkpoint: controller_common.CheckpointConfig{
Config: &configv1alpha1.OperatorConfiguration{
Checkpoint: configv1alpha1.CheckpointConfiguration{
Enabled: true,
Storage: controller_common.CheckpointStorageConfig{
Type: controller_common.CheckpointStorageTypePVC,
PVC: controller_common.CheckpointPVCConfig{
Storage: configv1alpha1.CheckpointStorageConfiguration{
Type: configv1alpha1.CheckpointStorageTypePVC,
PVC: configv1alpha1.CheckpointPVCConfig{
PVCName: "chrek-pvc",
BasePath: "/checkpoints",
},
......@@ -1520,9 +1539,10 @@ func Test_createOrUpdateOrDeleteDeployments_K8sAPIDefaults(t *testing.T) {
recorder := record.NewFakeRecorder(100)
reconciler := &DynamoComponentDeploymentReconciler{
Client: fakeKubeClient,
Recorder: recorder,
Config: controller_common.Config{},
Client: fakeKubeClient,
Recorder: recorder,
Config: &configv1alpha1.OperatorConfiguration{},
RuntimeConfig: &controller_common.RuntimeConfig{},
DockerSecretRetriever: &mockDockerSecretRetriever{
GetSecretsFunc: func(namespace, imageName string) ([]string, error) {
return []string{}, nil
......@@ -1856,9 +1876,10 @@ func Test_reconcileLeaderWorkerSetResources(t *testing.T) {
// Set up reconciler
recorder := record.NewFakeRecorder(100)
reconciler := &DynamoComponentDeploymentReconciler{
Client: fakeKubeClient,
Recorder: recorder,
Config: controller_common.Config{},
Client: fakeKubeClient,
Recorder: recorder,
Config: &configv1alpha1.OperatorConfiguration{},
RuntimeConfig: &controller_common.RuntimeConfig{},
DockerSecretRetriever: &mockDockerSecretRetriever{
GetSecretsFunc: func(namespace, imageName string) ([]string, error) {
return []string{}, nil
......@@ -2026,9 +2047,10 @@ func Test_reconcileDeploymentResources(t *testing.T) {
// Set up reconciler
recorder := record.NewFakeRecorder(100)
reconciler := &DynamoComponentDeploymentReconciler{
Client: fakeKubeClient,
Recorder: recorder,
Config: controller_common.Config{},
Client: fakeKubeClient,
Recorder: recorder,
Config: &configv1alpha1.OperatorConfiguration{},
RuntimeConfig: &controller_common.RuntimeConfig{},
DockerSecretRetriever: &mockDockerSecretRetriever{
GetSecretsFunc: func(namespace, imageName string) ([]string, error) {
return []string{}, nil
......@@ -2446,9 +2468,10 @@ func Test_generateDeployment_Strategy(t *testing.T) {
recorder := record.NewFakeRecorder(100)
reconciler := &DynamoComponentDeploymentReconciler{
Client: fakeKubeClient,
Recorder: recorder,
Config: controller_common.Config{},
Client: fakeKubeClient,
Recorder: recorder,
Config: &configv1alpha1.OperatorConfiguration{},
RuntimeConfig: &controller_common.RuntimeConfig{},
DockerSecretRetriever: &mockDockerSecretRetriever{
GetSecretsFunc: func(namespace, imageName string) ([]string, error) {
return []string{}, nil
......
......@@ -47,6 +47,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
configv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/config/v1alpha1"
nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
commoncontroller "github.com/ai-dynamo/dynamo/deploy/operator/internal/controller_common"
......@@ -68,7 +69,8 @@ type rbacManager interface {
// DynamoGraphDeploymentReconciler reconciles a DynamoGraphDeployment object
type DynamoGraphDeploymentReconciler struct {
client.Client
Config commoncontroller.Config
Config *configv1alpha1.OperatorConfiguration
RuntimeConfig *commoncontroller.RuntimeConfig
Recorder record.EventRecorder
DockerSecretRetriever dockerSecretRetriever
ScaleClient scale.ScalesGetter
......@@ -247,7 +249,7 @@ func (r *DynamoGraphDeploymentReconciler) reconcileResources(ctx context.Context
logger := log.FromContext(ctx)
// Ensure planner RBAC exists in cluster-wide mode
if r.Config.RestrictedNamespace == "" {
if r.Config.Namespace.Restricted == "" {
if r.RBACManager == nil {
return ReconcileResult{}, fmt.Errorf("RBAC manager not initialized in cluster-wide mode")
}
......@@ -330,9 +332,9 @@ func (r *DynamoGraphDeploymentReconciler) reconcileResources(ctx context.Context
}
// return error early if Grove and LWS is not available for multinode
if !r.isGrovePathway(dynamoDeployment) && hasMultinode && !r.Config.LWS.Enabled {
if !r.isGrovePathway(dynamoDeployment) && hasMultinode && !r.RuntimeConfig.LWSEnabled {
err := fmt.Errorf("no multinode orchestrator available")
logger.Error(err, err.Error(), "hasMultinode", hasMultinode, "lwsEnabled", r.Config.LWS.Enabled)
logger.Error(err, err.Error(), "hasMultinode", hasMultinode, "lwsEnabled", r.RuntimeConfig.LWSEnabled)
return ReconcileResult{}, fmt.Errorf("failed to reconcile Dynamo components deployments: %w", err)
}
......@@ -341,10 +343,10 @@ func (r *DynamoGraphDeploymentReconciler) reconcileResources(ctx context.Context
var result ReconcileResult
if r.isGrovePathway(dynamoDeployment) {
logger.Info("Reconciling Grove resources", "hasMultinode", hasMultinode, "lwsEnabled", r.Config.LWS.Enabled)
logger.Info("Reconciling Grove resources", "hasMultinode", hasMultinode, "lwsEnabled", r.RuntimeConfig.LWSEnabled)
result, err = r.reconcileGroveResources(ctx, dynamoDeployment, restartState, checkpointInfos)
} else {
logger.Info("Reconciling Dynamo components deployments", "hasMultinode", hasMultinode, "lwsEnabled", r.Config.LWS.Enabled)
logger.Info("Reconciling Dynamo components deployments", "hasMultinode", hasMultinode, "lwsEnabled", r.RuntimeConfig.LWSEnabled)
result, err = r.reconcileDynamoComponentsDeployments(ctx, dynamoDeployment, restartState)
}
if err != nil {
......@@ -364,7 +366,7 @@ func (r *DynamoGraphDeploymentReconciler) isGrovePathway(dgd *nvidiacomv1alpha1.
enableGrove = false
}
return enableGrove && r.Config.Grove.Enabled
return enableGrove && r.RuntimeConfig.GroveEnabled
}
func (r *DynamoGraphDeploymentReconciler) getUpdatedInProgress(ctx context.Context, dgd *nvidiacomv1alpha1.DynamoGraphDeployment, inProgress []string) []string {
......@@ -472,7 +474,7 @@ func (r *DynamoGraphDeploymentReconciler) reconcileGrovePodCliqueSet(ctx context
}
// generate the dynamoComponentsDeployments from the config
grovePodCliqueSet, err := dynamo.GenerateGrovePodCliqueSet(ctx, dynamoDeployment, r.Config, r.DockerSecretRetriever, restartState, existingRestartAnnotations, checkpointInfos)
grovePodCliqueSet, err := dynamo.GenerateGrovePodCliqueSet(ctx, dynamoDeployment, r.Config, r.RuntimeConfig, r.DockerSecretRetriever, restartState, existingRestartAnnotations, checkpointInfos)
if err != nil {
logger.Error(err, "failed to generate the Grove GangSet")
return nil, fmt.Errorf("failed to generate the Grove GangSet: %w", err)
......@@ -604,7 +606,7 @@ func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Co
// if k8s discovery is enabled, create a service for each component
// else, only create for the frontend component
isK8sDiscoveryEnabled := r.Config.IsK8sDiscoveryEnabled(dynamoDeployment.Annotations)
isK8sDiscoveryEnabled := commoncontroller.IsK8sDiscoveryEnabled(r.Config.Discovery.Backend, dynamoDeployment.Annotations)
if isK8sDiscoveryEnabled || component.ComponentType == consts.ComponentTypeFrontend {
if component.DynamoNamespace == nil {
return ReconcileResult{}, fmt.Errorf("expected component %s to have a dynamoNamespace", componentName)
......@@ -643,7 +645,7 @@ func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Co
if component.ComponentType == consts.ComponentTypeFrontend {
// generate the main component ingress
ingressSpec := dynamo.GenerateDefaultIngressSpec(dynamoDeployment, r.Config.IngressConfig)
ingressSpec := dynamo.GenerateDefaultIngressSpec(dynamoDeployment, r.Config.Ingress)
if component.Ingress != nil {
ingressSpec = *component.Ingress
}
......@@ -670,7 +672,7 @@ func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Co
resources = append(resources, mainComponentIngressAsResource)
}
// generate the main component virtual service
if r.Config.IngressConfig.UseVirtualService() {
if r.Config.Ingress.UseVirtualService() {
mainComponentVirtualService := dynamo.GenerateComponentVirtualService(ctx, dynamo.GetDCDResourceName(dynamoDeployment, componentName, ""), dynamoDeployment.Namespace, ingressSpec)
_, syncedMainComponentVirtualService, err := commoncontroller.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*networkingv1beta1.VirtualService, bool, error) {
if !ingressSpec.IsVirtualServiceEnabled() {
......@@ -1008,7 +1010,7 @@ func (r *DynamoGraphDeploymentReconciler) reconcileDynamoComponentsDeployments(c
resources := []Resource{}
logger := log.FromContext(ctx)
defaultIngressSpec := dynamo.GenerateDefaultIngressSpec(dynamoDeployment, r.Config.IngressConfig)
defaultIngressSpec := dynamo.GenerateDefaultIngressSpec(dynamoDeployment, r.Config.Ingress)
rollingUpdateCtx := r.buildRollingUpdateContext(ctx, dynamoDeployment)
......@@ -1139,7 +1141,7 @@ func (r *DynamoGraphDeploymentReconciler) reconcilePVC(ctx context.Context, dyna
func (r *DynamoGraphDeploymentReconciler) reconcileK8sDiscoveryResources(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) error {
logger := log.FromContext(ctx)
if !r.Config.IsK8sDiscoveryEnabled(dynamoDeployment.Annotations) {
if !commoncontroller.IsK8sDiscoveryEnabled(r.Config.Discovery.Backend, dynamoDeployment.Annotations) {
logger.Info("K8s discovery is not enabled")
return nil
}
......@@ -1571,8 +1573,8 @@ func (r *DynamoGraphDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) err
UpdateFunc: func(de event.UpdateEvent) bool { return true },
GenericFunc: func(ge event.GenericEvent) bool { return true },
})).
WithEventFilter(commoncontroller.EphemeralDeploymentEventFilter(r.Config))
if r.Config.Grove.Enabled {
WithEventFilter(commoncontroller.EphemeralDeploymentEventFilter(r.Config, r.RuntimeConfig))
if r.RuntimeConfig.GroveEnabled {
ctrlBuilder = ctrlBuilder.Owns(&grovev1alpha1.PodCliqueSet{}, builder.WithPredicates(predicate.Funcs{
// ignore creation cause we don't want to be called again after we create the pod gang set
CreateFunc: func(ce event.CreateEvent) bool { return false },
......
......@@ -21,6 +21,7 @@ import (
"context"
"testing"
configv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/config/v1alpha1"
"github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
commonconsts "github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/controller_common"
......@@ -673,10 +674,11 @@ func Test_reconcileGroveResources(t *testing.T) {
recorder := record.NewFakeRecorder(100)
reconciler := &DynamoGraphDeploymentReconciler{
Client: fakeKubeClient,
Recorder: recorder,
Config: controller_common.Config{},
ScaleClient: &mockScaleClient{},
Client: fakeKubeClient,
Recorder: recorder,
Config: &configv1alpha1.OperatorConfiguration{},
RuntimeConfig: &controller_common.RuntimeConfig{},
ScaleClient: &mockScaleClient{},
DockerSecretRetriever: &mockDockerSecretRetriever{
GetSecretsFunc: func(namespace, imageName string) ([]string, error) {
return []string{}, nil
......@@ -1485,10 +1487,9 @@ func Test_computeRestartStatus(t *testing.T) {
reconciler := &DynamoGraphDeploymentReconciler{
Client: fakeKubeClient,
Recorder: recorder,
Config: controller_common.Config{
Grove: controller_common.GroveConfig{
Enabled: tt.groveEnabled,
},
Config: &configv1alpha1.OperatorConfiguration{},
RuntimeConfig: &controller_common.RuntimeConfig{
GroveEnabled: tt.groveEnabled,
},
}
......@@ -2059,9 +2060,10 @@ func Test_reconcileDynamoComponentsDeployments(t *testing.T) {
recorder := record.NewFakeRecorder(100)
reconciler := &DynamoGraphDeploymentReconciler{
Client: fakeKubeClient,
Recorder: recorder,
Config: controller_common.Config{},
Client: fakeKubeClient,
Recorder: recorder,
Config: &configv1alpha1.OperatorConfiguration{},
RuntimeConfig: &controller_common.RuntimeConfig{},
}
result, err := reconciler.reconcileDynamoComponentsDeployments(ctx, dgd, nil)
......
......@@ -32,8 +32,10 @@ import (
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
configv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/config/v1alpha1"
nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
commonController "github.com/ai-dynamo/dynamo/deploy/operator/internal/controller_common"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/dynamo"
)
......@@ -67,8 +69,15 @@ func createTestReconciler(objs ...runtime.Object) *DynamoGraphDeploymentReconcil
Build()
return &DynamoGraphDeploymentReconciler{
Client: fakeClient,
Recorder: record.NewFakeRecorder(10),
Client: fakeClient,
Recorder: record.NewFakeRecorder(10),
Config: &configv1alpha1.OperatorConfiguration{},
RuntimeConfig: &commonController.RuntimeConfig{},
DockerSecretRetriever: &mockDockerSecretRetriever{
GetSecretsFunc: func(namespace, imageName string) ([]string, error) {
return []string{}, nil
},
},
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment