Unverified Commit 677e9b7f authored by Julien Mancuso's avatar Julien Mancuso Committed by GitHub
Browse files

feat: allow for cluster-wide and namespace restricted operators in same cluster (#3966)


Signed-off-by: default avatarJulien Mancuso <jmancuso@nvidia.com>
parent e30a3054
......@@ -15,32 +15,16 @@
{{/*
Validation to prevent operator conflicts
Prevents all conflict scenarios:
1. Multiple cluster-wide operators (multiple cluster managers)
2. Namespace-restricted operator when cluster-wide exists (both would manage same resources)
3. Cluster-wide operator when namespace-restricted exist (both would manage same resources)
With the namespace scope marker lease mechanism, cluster-wide and namespace-restricted
operators can now coexist safely. The cluster-wide operator automatically excludes
namespaces that have namespace-restricted operators.
This validation only prevents:
1. Multiple cluster-wide operators (would compete for the same resources)
*/}}
{{- define "dynamo-operator.validateClusterWideInstallation" -}}
{{- $currentReleaseName := .Release.Name -}}
{{/* Check for existing namespace-restricted operators (only when installing cluster-wide) */}}
{{- if not .Values.namespaceRestriction.enabled -}}
{{- $allRoles := lookup "rbac.authorization.k8s.io/v1" "Role" "" "" -}}
{{- $namespaceRestrictedOperators := list -}}
{{- if $allRoles -}}
{{- range $role := $allRoles.items -}}
{{- if and (contains "-dynamo-operator-" $role.metadata.name) (hasSuffix "-manager-role" $role.metadata.name) -}}
{{- $namespaceRestrictedOperators = append $namespaceRestrictedOperators $role.metadata.namespace -}}
{{- end -}}
{{- end -}}
{{- end -}}
{{- if $namespaceRestrictedOperators -}}
{{- fail (printf "VALIDATION ERROR: Cannot install cluster-wide Dynamo operator. Found existing namespace-restricted Dynamo operators in namespaces: %s. This would create resource conflicts as both the cluster-wide operator and namespace-restricted operators would manage the same DGDs/DCDs. Either:\n1. Use one of the existing namespace-restricted operators for your specific namespace, or\n2. Uninstall all existing namespace-restricted operators first, or\n3. Install this operator in namespace-restricted mode: --set dynamo-operator.namespaceRestriction.enabled=true" (join ", " ($namespaceRestrictedOperators | uniq))) -}}
{{- end -}}
{{- end -}}
{{/* Check for existing ClusterRoles that would indicate other cluster-wide installations */}}
{{- $existingClusterRoles := lookup "rbac.authorization.k8s.io/v1" "ClusterRole" "" "" -}}
{{- $foundExistingClusterWideOperator := false -}}
......@@ -82,22 +66,17 @@ Prevents all conflict scenarios:
{{- end -}}
{{- if $foundExistingClusterWideOperator -}}
{{/* Only prevent multiple cluster-wide operators, not namespace-restricted */}}
{{- if not .Values.namespaceRestriction.enabled -}}
{{- $uninstallCmd := printf "helm uninstall %s" $existingOperatorRelease -}}
{{- if $existingOperatorNamespace -}}
{{- $uninstallCmd = printf "helm uninstall %s -n %s" $existingOperatorRelease $existingOperatorNamespace -}}
{{- end -}}
{{- if .Values.namespaceRestriction.enabled -}}
{{- if $existingOperatorNamespace -}}
{{- fail (printf "VALIDATION ERROR: Found existing cluster-wide Dynamo operator from release '%s' in namespace '%s' (ClusterRole: %s). Cannot install namespace-restricted operator because the cluster-wide operator already manages resources in all namespaces, including the target namespace. This would create resource conflicts. Either:\n1. Use the existing cluster-wide operator, or\n2. Uninstall the existing cluster-wide operator first: %s" $existingOperatorRelease $existingOperatorNamespace $existingOperatorRoleName $uninstallCmd) -}}
{{- else -}}
{{- fail (printf "VALIDATION ERROR: Found existing cluster-wide Dynamo operator from release '%s' (ClusterRole: %s). Cannot install namespace-restricted operator because the cluster-wide operator already manages resources in all namespaces, including the target namespace. This would create resource conflicts. Either:\n1. Use the existing cluster-wide operator, or\n2. Uninstall the existing cluster-wide operator first: %s" $existingOperatorRelease $existingOperatorRoleName $uninstallCmd) -}}
{{- end -}}
{{- else -}}
{{- if $existingOperatorNamespace -}}
{{- fail (printf "VALIDATION ERROR: Found existing cluster-wide Dynamo operator from release '%s' in namespace '%s' (ClusterRole: %s). Only one cluster-wide Dynamo operator should be deployed per cluster. Either:\n1. Use the existing cluster-wide operator (no need to install another), or\n2. Uninstall the existing cluster-wide operator first: %s" $existingOperatorRelease $existingOperatorNamespace $existingOperatorRoleName $uninstallCmd) -}}
{{- fail (printf "VALIDATION ERROR: Found existing cluster-wide Dynamo operator from release '%s' in namespace '%s' (ClusterRole: %s). Only one cluster-wide Dynamo operator should be deployed per cluster. Either:\n1. Use the existing cluster-wide operator (no need to install another), or\n2. Uninstall the existing cluster-wide operator first: %s\n\nNote: You can install namespace-restricted operators alongside the cluster-wide operator using: --set namespaceRestriction.enabled=true" $existingOperatorRelease $existingOperatorNamespace $existingOperatorRoleName $uninstallCmd) -}}
{{- else -}}
{{- fail (printf "VALIDATION ERROR: Found existing cluster-wide Dynamo operator from release '%s' (ClusterRole: %s). Only one cluster-wide Dynamo operator should be deployed per cluster. Either:\n1. Use the existing cluster-wide operator (no need to install another), or\n2. Uninstall the existing cluster-wide operator first: %s" $existingOperatorRelease $existingOperatorRoleName $uninstallCmd) -}}
{{- fail (printf "VALIDATION ERROR: Found existing cluster-wide Dynamo operator from release '%s' (ClusterRole: %s). Only one cluster-wide Dynamo operator should be deployed per cluster. Either:\n1. Use the existing cluster-wide operator (no need to install another), or\n2. Uninstall the existing cluster-wide operator first: %s\n\nNote: You can install namespace-restricted operators alongside the cluster-wide operator using: --set namespaceRestriction.enabled=true" $existingOperatorRelease $existingOperatorRoleName $uninstallCmd) -}}
{{- end -}}
{{- end -}}
{{- end -}}
......
......@@ -128,6 +128,13 @@ spec:
- --dgdr-profiling-cluster-role-name={{ include "dynamo-operator.fullname" . }}-dgdr-profiling
- --planner-cluster-role-name={{ include "dynamo-operator.fullname" . }}-planner
{{- end }}
{{- if .Values.namespaceRestriction.enabled }}
{{- if .Values.namespaceRestriction.lease }}
- --namespace-scope-lease-duration={{ .Values.namespaceRestriction.lease.duration }}
- --namespace-scope-lease-renew-interval={{ .Values.namespaceRestriction.lease.renewInterval }}
{{- end }}
{{- end }}
- --operator-version={{ .Values.controllerManager.manager.image.tag | default .Chart.AppVersion }}
command:
- /manager
env:
......
......@@ -35,6 +35,13 @@ dynamo-operator:
enabled: false
# -- Target namespace for operator deployment (leave empty for current namespace)
targetNamespace:
# Namespace scope marker lease configuration (used to prevent conflicts when running both cluster-wide and namespace-restricted operators)
lease:
# Duration before the namespace scope marker lease expires if not renewed (namespace-restricted mode only). When a namespace-restricted operator is running, it creates a lease in its namespace. The cluster-wide operator detects this lease and excludes that namespace from processing. If the namespace operator stops renewing the lease (e.g., crashes), the lease expires and the cluster-wide operator automatically resumes processing that namespace.
duration: 30s
# Interval for renewing the namespace scope marker lease (namespace-restricted mode only). The namespace-restricted operator renews its lease at this interval to signal it's still running.
renewInterval: 10s
# Controller manager configuration
controllerManager:
......
......@@ -60,6 +60,7 @@ import (
"github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/controller"
commonController "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/controller_common"
"github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/etcd"
"github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/namespace_scope"
"github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/rbac"
"github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/secret"
"github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/secrets"
......@@ -141,6 +142,9 @@ func main() {
var mpiRunSecretNamespace string
var plannerClusterRoleName string
var dgdrProfilingClusterRoleName string
var namespaceScopeLeaseDuration time.Duration
var namespaceScopeLeaseRenewInterval time.Duration
var operatorVersion string
flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
flag.BoolVar(&enableLeaderElection, "leader-elect", false,
......@@ -183,6 +187,12 @@ func main() {
"Name of the ClusterRole for planner (cluster-wide mode only)")
flag.StringVar(&dgdrProfilingClusterRoleName, "dgdr-profiling-cluster-role-name", "",
"Name of the ClusterRole for DGDR profiling jobs (cluster-wide mode only)")
flag.DurationVar(&namespaceScopeLeaseDuration, "namespace-scope-lease-duration", 30*time.Second,
"Duration of namespace scope marker lease before expiration (namespace-restricted mode only)")
flag.DurationVar(&namespaceScopeLeaseRenewInterval, "namespace-scope-lease-renew-interval", 10*time.Second,
"Interval for renewing namespace scope marker lease (namespace-restricted mode only)")
flag.StringVar(&operatorVersion, "operator-version", "unknown",
"Version of the operator (used in lease holder identity)")
opts := zap.Options{
Development: true,
}
......@@ -305,6 +315,80 @@ func main() {
os.Exit(1)
}
// Initialize namespace scope mechanism
var leaseManager *namespace_scope.LeaseManager
var leaseWatcher *namespace_scope.LeaseWatcher
if restrictedNamespace != "" {
// Namespace-restricted mode: Create and maintain namespace scope marker lease
setupLog.Info("Creating namespace scope marker lease manager",
"namespace", restrictedNamespace,
"leaseDuration", namespaceScopeLeaseDuration,
"renewInterval", namespaceScopeLeaseRenewInterval)
leaseManager, err = namespace_scope.NewLeaseManager(
mgr.GetConfig(),
restrictedNamespace,
operatorVersion,
namespaceScopeLeaseDuration,
namespaceScopeLeaseRenewInterval,
)
if err != nil {
setupLog.Error(err, "unable to create namespace scope marker lease manager")
os.Exit(1)
}
// Start the lease manager
if err = leaseManager.Start(mainCtx); err != nil {
setupLog.Error(err, "unable to start namespace scope marker lease manager")
os.Exit(1)
}
// Monitor for fatal lease errors
// If lease renewal fails repeatedly, we must exit to prevent split-brain
go func() {
select {
case err := <-leaseManager.Errors():
setupLog.Error(err, "FATAL: Lease manager encountered unrecoverable error, shutting down to prevent split-brain")
os.Exit(1)
case <-mainCtx.Done():
// Normal shutdown, error channel monitoring no longer needed
return
}
}()
// Ensure lease is released on shutdown
defer func() {
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := leaseManager.Stop(shutdownCtx); err != nil {
setupLog.Error(err, "failed to stop lease manager cleanly")
}
}()
setupLog.Info("Namespace scope marker lease manager started successfully")
} else {
// Cluster-wide mode: Watch for namespace scope marker leases
setupLog.Info("Setting up namespace scope marker lease watcher for cluster-wide mode")
leaseWatcher, err = namespace_scope.NewLeaseWatcher(mgr.GetConfig())
if err != nil {
setupLog.Error(err, "unable to create namespace scope marker lease watcher")
os.Exit(1)
}
// Start the lease watcher
if err = leaseWatcher.Start(mainCtx); err != nil {
setupLog.Error(err, "unable to start namespace scope marker lease watcher")
os.Exit(1)
}
setupLog.Info("Namespace scope marker lease watcher started successfully")
}
// Pass leaseWatcher to controller config for namespace exclusion filtering
ctrlConfig.ExcludedNamespaces = leaseWatcher
// Detect orchestrators availability using discovery client
setupLog.Info("Detecting Grove availability...")
groveEnabled := commonController.DetectGroveAvailability(mainCtx, mgr)
......
......@@ -30,6 +30,11 @@ import (
"sigs.k8s.io/controller-runtime/pkg/predicate"
)
// ExcludedNamespacesInterface defines the interface for checking namespace exclusions
type ExcludedNamespacesInterface interface {
Contains(namespace string) bool
}
type GroveConfig struct {
// Enabled is automatically determined by checking if Grove CRDs are installed in the cluster
Enabled bool
......@@ -68,6 +73,8 @@ type Config struct {
MpiRun MpiRunConfig
// RBAC configuration for cross-namespace resource management
RBAC RBACConfig
// ExcludedNamespaces is a thread-safe set of namespaces to exclude (cluster-wide mode only)
ExcludedNamespaces ExcludedNamespacesInterface
}
// RBACConfig holds configuration for RBAC management
......@@ -158,6 +165,16 @@ func EphemeralDeploymentEventFilter(config Config) predicate.Predicate {
// in case of a restricted namespace, we only want to process the events that are in the restricted namespace
return objMeta.GetNamespace() == config.RestrictedNamespace
}
// Cluster-wide mode: check if namespace is excluded
if config.ExcludedNamespaces != nil && config.ExcludedNamespaces.Contains(objMeta.GetNamespace()) {
l.V(1).Info("Skipping resource - namespace is excluded",
"namespace", objMeta.GetNamespace(),
"resource", objMeta.GetName(),
"kind", o.GetObjectKind().GroupVersionKind().Kind)
return false
}
// in all other cases, discard the event if it is destined to an ephemeral deployment
if strings.Contains(objMeta.GetNamespace(), "ephemeral") {
return false
......
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package namespace_scope
import (
"context"
"fmt"
"sync"
"time"
"github.com/go-logr/logr"
coordinationv1 "k8s.io/api/coordination/v1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/log"
)
const (
// LeaseName is the well-known name for namespace scope marker leases
LeaseName = "dynamo-operator-namespace-scope"
)
// LeaseManager manages the namespace scope marker lease
type LeaseManager struct {
client kubernetes.Interface
namespace string
leaseDuration time.Duration
renewInterval time.Duration
holderIdentity string
operatorVersion string
stopCh chan struct{}
errCh chan error
wg sync.WaitGroup
failureCount int
maxFailures int
logger logr.Logger
}
// NewLeaseManager creates a new lease manager for namespace scope marking
func NewLeaseManager(config *rest.Config, namespace string, operatorVersion string, leaseDuration time.Duration, renewInterval time.Duration) (*LeaseManager, error) {
// Validate inputs
if leaseDuration <= 0 {
return nil, fmt.Errorf("lease duration must be greater than zero, got %v", leaseDuration)
}
if renewInterval <= 0 {
return nil, fmt.Errorf("renew interval must be greater than zero, got %v", renewInterval)
}
if renewInterval >= leaseDuration {
return nil, fmt.Errorf("renew interval (%v) must be less than lease duration (%v)", renewInterval, leaseDuration)
}
client, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("failed to create kubernetes client: %w", err)
}
// Create holder identity with operator version
// No need for pod name since there's only one operator instance in namespace-restricted mode
holderIdentity := fmt.Sprintf("namespace-restricted-operator-%s", operatorVersion)
// Calculate max failures with buffer to ensure operator exits BEFORE lease expires
// This prevents split-brain: if we allow failures for the full lease duration,
// the lease expires at exactly the same time we exit, creating a race condition.
//
// Strategy: Subtract 1 renewal interval as a safety buffer
// Example: 30s lease / 10s renewal = 3 intervals
// maxFailures = 3 - 1 = 2 → operator exits after 20s of failures
// This leaves 10s buffer before lease expires at 30s
rawMaxFailures := int(leaseDuration / renewInterval)
maxFailures := rawMaxFailures - 1
if maxFailures < 1 {
maxFailures = 1 // Always allow at least 1 failure for transient issues
}
return &LeaseManager{
client: client,
namespace: namespace,
leaseDuration: leaseDuration,
renewInterval: renewInterval,
holderIdentity: holderIdentity,
operatorVersion: operatorVersion,
stopCh: make(chan struct{}),
maxFailures: maxFailures,
}, nil
}
// Errors returns a channel that will receive fatal errors from the lease manager
// Callers should monitor this channel and take appropriate action (e.g., exit to prevent split-brain)
func (lm *LeaseManager) Errors() <-chan error {
return lm.errCh
}
// Start creates the lease and begins renewal loop
func (lm *LeaseManager) Start(ctx context.Context) error {
lm.logger = log.FromContext(ctx).WithValues("component", "namespace-scope-lease", "namespace", lm.namespace)
// Initialize error channel
lm.errCh = make(chan error, 1) // buffered to avoid blocking
lm.logger.Info("Starting namespace scope marker lease manager",
"leaseName", LeaseName,
"leaseDuration", lm.leaseDuration,
"renewInterval", lm.renewInterval,
"holderIdentity", lm.holderIdentity,
"maxFailures", lm.maxFailures)
// Create or update the lease initially
if err := lm.createOrUpdateLease(ctx); err != nil {
return fmt.Errorf("failed to create initial lease: %w", err)
}
lm.logger.Info("Namespace scope marker lease created successfully")
// Start renewal loop in background
lm.wg.Add(1)
go lm.renewalLoop(ctx)
return nil
}
// Stop stops the lease renewal loop and releases the lease
func (lm *LeaseManager) Stop(ctx context.Context) error {
lm.logger.Info("Stopping namespace scope marker lease manager")
// Signal renewal loop to stop
close(lm.stopCh)
// Wait for renewal loop to complete to avoid race condition
// where we delete the lease while a renewal is in progress
lm.wg.Wait()
// Delete the lease to signal we're no longer managing this namespace
err := lm.client.CoordinationV1().Leases(lm.namespace).Delete(ctx, LeaseName, metav1.DeleteOptions{})
if err != nil {
// If lease is already deleted (TTL expiry, manual cleanup, etc.), that's fine
// The goal is achieved - the lease is gone
if k8sErrors.IsNotFound(err) {
lm.logger.Info("Namespace scope marker lease already deleted")
return nil
}
// Real failure - return the error
lm.logger.Error(err, "Failed to delete lease on shutdown")
return err
}
lm.logger.Info("Namespace scope marker lease deleted successfully")
return nil
}
// createOrUpdateLease creates or updates the namespace scope marker lease
func (lm *LeaseManager) createOrUpdateLease(ctx context.Context) error {
now := metav1.NewMicroTime(time.Now())
leaseDurationSeconds := int32(lm.leaseDuration.Seconds())
lease := &coordinationv1.Lease{
ObjectMeta: metav1.ObjectMeta{
Name: LeaseName,
Namespace: lm.namespace,
},
Spec: coordinationv1.LeaseSpec{
HolderIdentity: &lm.holderIdentity,
LeaseDurationSeconds: &leaseDurationSeconds,
AcquireTime: &now,
},
}
// Try to get existing lease
existingLease, err := lm.client.CoordinationV1().Leases(lm.namespace).Get(ctx, LeaseName, metav1.GetOptions{})
if err != nil {
if !k8sErrors.IsNotFound(err) {
return fmt.Errorf("failed to get lease: %w", err)
}
// Lease doesn't exist, create it
_, err = lm.client.CoordinationV1().Leases(lm.namespace).Create(ctx, lease, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("failed to create lease: %w", err)
}
lm.logger.Info("Created namespace scope marker lease")
return nil
}
// Lease exists, update it
existingLease.Spec.HolderIdentity = &lm.holderIdentity
existingLease.Spec.LeaseDurationSeconds = &leaseDurationSeconds
existingLease.Spec.RenewTime = &now
_, err = lm.client.CoordinationV1().Leases(lm.namespace).Update(ctx, existingLease, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("failed to update lease: %w", err)
}
lm.logger.V(1).Info("Refreshed namespace scope marker lease")
return nil
}
// renewalLoop continuously renews the lease until stopped
func (lm *LeaseManager) renewalLoop(ctx context.Context) {
defer lm.wg.Done()
ticker := time.NewTicker(lm.renewInterval)
defer ticker.Stop()
for {
select {
case <-lm.stopCh:
lm.logger.Info("Lease renewal loop stopped")
return
case <-ctx.Done():
lm.logger.Info("Context cancelled, stopping lease renewal loop")
return
case <-ticker.C:
// Use createOrUpdateLease instead of renewLease for self-healing
// If the lease is manually deleted, it will be automatically recreated
if err := lm.createOrUpdateLease(ctx); err != nil {
lm.failureCount++
lm.logger.Error(err, "Failed to create/update lease, will retry",
"failureCount", lm.failureCount,
"maxFailures", lm.maxFailures,
"nextRetry", lm.renewInterval)
// Warn when approaching max failures
if lm.failureCount == lm.maxFailures-1 {
lm.logger.Error(nil, "WARNING: One more lease renewal failure will cause operator shutdown to prevent split-brain",
"failureCount", lm.failureCount,
"maxFailures", lm.maxFailures)
}
// After max consecutive failures, signal fatal error to prevent split-brain
if lm.failureCount >= lm.maxFailures {
fatalErr := fmt.Errorf("lease renewal failed %d consecutive times (max: %d), operator must exit to prevent split-brain with cluster-wide operator", lm.failureCount, lm.maxFailures)
lm.logger.Error(fatalErr, "FATAL: Max lease renewal failures exceeded")
// Send error to channel (non-blocking)
select {
case lm.errCh <- fatalErr:
default:
// Error already sent, don't block
}
return
}
} else {
// Success: reset failure counter
if lm.failureCount > 0 {
lm.logger.Info("Lease renewal recovered after failures",
"previousFailures", lm.failureCount)
lm.failureCount = 0
}
}
}
}
}
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package namespace_scope
import (
"context"
"fmt"
"strings"
"testing"
"time"
coordinationv1 "k8s.io/api/coordination/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/fake"
k8stesting "k8s.io/client-go/testing"
"k8s.io/utils/ptr"
)
const (
testNamespace = "test-ns"
testOperatorVersion = "v1.0.0"
)
func TestLeaseManager_CreateOrUpdateLease(t *testing.T) {
tests := []struct {
name string
namespace string
operatorVersion string
existingLease *coordinationv1.Lease
wantRenewTime bool // Whether RenewTime should be set
}{
{
name: "creates lease when it doesn't exist",
namespace: testNamespace,
operatorVersion: testOperatorVersion,
existingLease: nil,
wantRenewTime: false, // RenewTime should be nil on creation
},
{
name: "updates existing lease",
namespace: testNamespace,
operatorVersion: testOperatorVersion,
existingLease: &coordinationv1.Lease{
ObjectMeta: metav1.ObjectMeta{
Name: LeaseName,
Namespace: testNamespace,
},
Spec: coordinationv1.LeaseSpec{
HolderIdentity: ptr.To("old-holder"),
LeaseDurationSeconds: ptr.To[int32](60),
AcquireTime: &metav1.MicroTime{Time: time.Now().Add(-2 * time.Minute)},
RenewTime: &metav1.MicroTime{Time: time.Now().Add(-1 * time.Minute)},
},
},
wantRenewTime: true, // RenewTime should be set on update
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Create fake client with or without existing lease
var client *fake.Clientset
if tt.existingLease != nil {
client = fake.NewSimpleClientset(tt.existingLease)
} else {
client = fake.NewSimpleClientset()
}
// Create lease manager
lm := &LeaseManager{
client: client,
namespace: tt.namespace,
leaseDuration: 30 * time.Second,
renewInterval: 10 * time.Second,
holderIdentity: "namespace-restricted-operator-" + tt.operatorVersion,
operatorVersion: tt.operatorVersion,
stopCh: make(chan struct{}),
}
// Call createOrUpdateLease
ctx := context.Background()
err := lm.createOrUpdateLease(ctx)
if err != nil {
t.Fatalf("createOrUpdateLease() error = %v", err)
}
// Verify lease exists
lease, err := client.CoordinationV1().Leases(tt.namespace).Get(ctx, LeaseName, metav1.GetOptions{})
if err != nil {
t.Fatalf("failed to get lease: %v", err)
}
// Verify lease name
if lease.Name != LeaseName {
t.Errorf("lease name = %v, want %v", lease.Name, LeaseName)
}
// Verify lease namespace
if lease.Namespace != tt.namespace {
t.Errorf("lease namespace = %v, want %v", lease.Namespace, tt.namespace)
}
// Verify holder identity
if lease.Spec.HolderIdentity == nil {
t.Fatal("lease holder identity is nil")
}
wantIdentity := "namespace-restricted-operator-" + tt.operatorVersion
if *lease.Spec.HolderIdentity != wantIdentity {
t.Errorf("holder identity = %v, want %v", *lease.Spec.HolderIdentity, wantIdentity)
}
// Verify lease duration
if lease.Spec.LeaseDurationSeconds == nil {
t.Fatal("lease duration is nil")
}
if *lease.Spec.LeaseDurationSeconds != 30 {
t.Errorf("lease duration = %v, want %v", *lease.Spec.LeaseDurationSeconds, 30)
}
// Verify renew time and acquire time based on operation
if tt.wantRenewTime {
// Update case: RenewTime should be set and newer than before
if lease.Spec.RenewTime == nil {
t.Error("lease renew time should be set on update")
} else if tt.existingLease != nil && !lease.Spec.RenewTime.After(tt.existingLease.Spec.RenewTime.Time) {
t.Error("renew time was not updated")
}
// AcquireTime should be preserved from existing lease
if tt.existingLease != nil && tt.existingLease.Spec.AcquireTime != nil {
if lease.Spec.AcquireTime == nil {
t.Error("acquire time should be preserved on update")
} else if !lease.Spec.AcquireTime.Equal(tt.existingLease.Spec.AcquireTime) {
t.Error("acquire time should not change on update")
}
}
} else {
// Create case: RenewTime should be nil, AcquireTime should be set
if lease.Spec.RenewTime != nil {
t.Error("lease renew time should be nil on initial creation")
}
if lease.Spec.AcquireTime == nil {
t.Error("lease acquire time should be set on initial creation")
}
}
})
}
}
func TestLeaseManager_Stop(t *testing.T) {
namespace := testNamespace
operatorVersion := testOperatorVersion
// Create fake client with existing lease
existingLease := &coordinationv1.Lease{
ObjectMeta: metav1.ObjectMeta{
Name: LeaseName,
Namespace: namespace,
},
Spec: coordinationv1.LeaseSpec{
HolderIdentity: ptr.To("namespace-restricted-operator-v1.0.0"),
LeaseDurationSeconds: ptr.To[int32](30),
},
}
client := fake.NewSimpleClientset(existingLease)
// Create lease manager
lm := &LeaseManager{
client: client,
namespace: namespace,
leaseDuration: 30 * time.Second,
renewInterval: 10 * time.Second,
holderIdentity: "namespace-restricted-operator-" + operatorVersion,
operatorVersion: operatorVersion,
stopCh: make(chan struct{}),
}
// Stop lease manager
ctx := context.Background()
err := lm.Stop(ctx)
if err != nil {
t.Fatalf("Stop() error = %v", err)
}
// Verify lease was deleted
_, err = client.CoordinationV1().Leases(namespace).Get(ctx, LeaseName, metav1.GetOptions{})
if err == nil {
t.Error("expected lease to be deleted, but it still exists")
}
}
func TestLeaseManager_Stop_LeaseAlreadyDeleted(t *testing.T) {
namespace := testNamespace
operatorVersion := testOperatorVersion
// Create fake client WITHOUT existing lease (simulating already deleted/expired)
client := fake.NewSimpleClientset()
// Create lease manager
lm := &LeaseManager{
client: client,
namespace: namespace,
leaseDuration: 30 * time.Second,
renewInterval: 10 * time.Second,
holderIdentity: "namespace-restricted-operator-" + operatorVersion,
operatorVersion: operatorVersion,
stopCh: make(chan struct{}),
}
// Stop lease manager - should succeed even though lease doesn't exist
ctx := context.Background()
err := lm.Stop(ctx)
if err != nil {
t.Fatalf("Stop() should succeed when lease is already deleted, got error = %v", err)
}
}
func TestLeaseManager_StartAndStop_CompleteLifecycle(t *testing.T) {
namespace := testNamespace
operatorVersion := testOperatorVersion
// Create fake client
client := fake.NewSimpleClientset()
// Create lease manager with short intervals for testing
lm := &LeaseManager{
client: client,
namespace: namespace,
leaseDuration: 30 * time.Second,
renewInterval: 50 * time.Millisecond,
holderIdentity: "namespace-restricted-operator-" + operatorVersion,
operatorVersion: operatorVersion,
stopCh: make(chan struct{}),
}
// Start the lease manager
ctx := context.Background()
err := lm.Start(ctx)
if err != nil {
t.Fatalf("Start() error = %v", err)
}
// Verify lease was created
lease, err := client.CoordinationV1().Leases(namespace).Get(ctx, LeaseName, metav1.GetOptions{})
if err != nil {
t.Fatalf("failed to get lease after Start(): %v", err)
}
if lease.Name != LeaseName {
t.Errorf("lease name = %v, want %v", lease.Name, LeaseName)
}
// Verify initial lease has no renew time (since it was just created)
if lease.Spec.RenewTime != nil {
t.Error("initial lease should not have renew time set on creation")
}
// Poll for renewal with timeout (more robust than fixed sleep)
renewalDetected := false
deadline := time.Now().Add(500 * time.Millisecond)
for time.Now().Before(deadline) {
updatedLease, err := client.CoordinationV1().Leases(namespace).Get(ctx, LeaseName, metav1.GetOptions{})
if err == nil && updatedLease.Spec.RenewTime != nil {
renewalDetected = true
break
}
time.Sleep(20 * time.Millisecond)
}
if !renewalDetected {
t.Error("lease should have renew time set after renewal")
}
// Stop the lease manager (should delete the lease and stop renewal loop)
stopCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
err = lm.Stop(stopCtx)
if err != nil {
t.Fatalf("Stop() error = %v", err)
}
// Verify lease was deleted
_, err = client.CoordinationV1().Leases(namespace).Get(ctx, LeaseName, metav1.GetOptions{})
if err == nil {
t.Error("lease should be deleted after Stop()")
}
}
// TestLeaseManager_FailureTracking_SendsErrorOnMaxFailures verifies that consecutive
// lease renewal failures trigger a fatal error to prevent split-brain scenarios.
// Note: This test calls renewalLoop() directly (not Start()) to inject failures via reactor.
func TestLeaseManager_FailureTracking_SendsErrorOnMaxFailures(t *testing.T) {
namespace := testNamespace
operatorVersion := testOperatorVersion
// Create existing lease
existingLease := &coordinationv1.Lease{
ObjectMeta: metav1.ObjectMeta{
Name: LeaseName,
Namespace: namespace,
},
Spec: coordinationv1.LeaseSpec{
HolderIdentity: ptr.To("namespace-restricted-operator-v1.0.0"),
LeaseDurationSeconds: ptr.To[int32](30),
RenewTime: &metav1.MicroTime{Time: time.Now()},
},
}
// Create fake client with the lease
client := fake.NewSimpleClientset(existingLease)
// Add reactor to make all update operations fail (simulates persistent API failure)
client.PrependReactor("update", "leases", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, fmt.Errorf("simulated persistent API failure for testing")
})
// Create lease manager with short intervals for faster test execution
lm := &LeaseManager{
client: client,
namespace: namespace,
leaseDuration: 30 * time.Second,
renewInterval: 10 * time.Millisecond, // Fast for testing
holderIdentity: "namespace-restricted-operator-" + operatorVersion,
operatorVersion: operatorVersion,
stopCh: make(chan struct{}),
maxFailures: 3,
errCh: make(chan error, 1),
}
// Start renewal loop - all updates will fail
ctx := context.Background()
lm.wg.Add(1)
go lm.renewalLoop(ctx)
// Wait for fatal error on channel (with generous timeout)
select {
case err := <-lm.errCh:
if err == nil {
t.Fatal("expected error from error channel, got nil")
}
t.Logf("Received expected fatal error: %v", err)
// Verify error message is meaningful
if !strings.Contains(err.Error(), "split-brain") {
t.Errorf("error should mention split-brain prevention, got: %v", err)
}
if !strings.Contains(err.Error(), "3") {
t.Errorf("error should mention failure count, got: %v", err)
}
case <-time.After(1 * time.Second):
t.Fatal("timeout waiting for fatal error from lease manager (expected within ~30ms)")
}
// Clean shutdown
close(lm.stopCh)
lm.wg.Wait()
}
// TestLeaseManager_FailureTracking_ResetsOnSuccess verifies that the failure counter
// is reset to 0 after a successful renewal, allowing recovery from transient failures.
// Note: This test calls renewalLoop() directly to verify internal failure counter behavior.
func TestLeaseManager_FailureTracking_ResetsOnSuccess(t *testing.T) {
namespace := testNamespace
operatorVersion := testOperatorVersion
// Create fake client with existing lease
existingLease := &coordinationv1.Lease{
ObjectMeta: metav1.ObjectMeta{
Name: LeaseName,
Namespace: namespace,
},
Spec: coordinationv1.LeaseSpec{
HolderIdentity: ptr.To("namespace-restricted-operator-v1.0.0"),
LeaseDurationSeconds: ptr.To[int32](30),
RenewTime: &metav1.MicroTime{Time: time.Now()},
},
}
client := fake.NewSimpleClientset(existingLease)
// Create lease manager with pre-existing failures
lm := &LeaseManager{
client: client,
namespace: namespace,
leaseDuration: 30 * time.Second,
renewInterval: 20 * time.Millisecond, // Reasonable interval for test
holderIdentity: "namespace-restricted-operator-" + operatorVersion,
operatorVersion: operatorVersion,
stopCh: make(chan struct{}),
maxFailures: 3,
errCh: make(chan error, 1),
failureCount: 2, // Simulates 2 previous failures
}
// Start renewal loop (will succeed and reset counter)
ctx := context.Background()
lm.wg.Add(1)
go lm.renewalLoop(ctx)
// Wait for at least one renewal cycle
time.Sleep(50 * time.Millisecond)
// Stop the loop
close(lm.stopCh)
lm.wg.Wait()
// Verify failure count was reset to 0 after successful renewal
if lm.failureCount != 0 {
t.Errorf("failure count should be reset to 0 after success, got %d", lm.failureCount)
}
// Verify no fatal error was sent
select {
case err := <-lm.errCh:
t.Errorf("unexpected error on channel after successful renewal: %v", err)
default:
// Expected: no error sent
}
}
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package namespace_scope
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/go-logr/logr"
coordinationv1 "k8s.io/api/coordination/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
k8sCache "k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/log"
)
// LeaseWatcher watches for namespace scope marker leases and maintains
// an exclusion list for the cluster-wide operator.
// It implements the ExcludedNamespacesInterface from controller_common.
type LeaseWatcher struct {
excludedNamespaces sync.Map // map[string]*coordinationv1.Lease (namespace -> lease object)
informerFactory informers.SharedInformerFactory
logger logr.Logger
}
// NewLeaseWatcher creates a new lease watcher for cluster-wide operator
func NewLeaseWatcher(config *rest.Config) (*LeaseWatcher, error) {
client, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
// Create informer factory for all namespaces
informerFactory := informers.NewSharedInformerFactory(client, 0)
return &LeaseWatcher{
informerFactory: informerFactory,
}, nil
}
// Contains checks if a namespace is in the exclusion list AND if its lease is still valid.
// This method implements the ExcludedNamespacesInterface.
// It automatically removes expired leases, preventing stale leases from blocking reconciliation.
func (lw *LeaseWatcher) Contains(namespace string) bool {
value, exists := lw.excludedNamespaces.Load(namespace)
if !exists {
return false
}
lease, ok := value.(*coordinationv1.Lease)
if !ok {
// Should never happen, but clean up if it does
lw.logger.Error(nil, "Invalid lease object type in exclusion map",
"namespace", namespace,
"type", fmt.Sprintf("%T", value))
lw.excludedNamespaces.Delete(namespace)
return false
}
// Check if lease has expired (critical for handling stale leases after crashes)
if lw.isLeaseExpired(lease) {
lw.logger.Info("Lease expired during Contains check, resuming cluster-wide processing",
"namespace", namespace,
"renewTime", lease.Spec.RenewTime,
"leaseDuration", lease.Spec.LeaseDurationSeconds)
lw.removeExcludedNamespace(namespace)
return false
}
return true
}
// Start starts watching for namespace scope marker leases
func (lw *LeaseWatcher) Start(ctx context.Context) error {
lw.logger = log.FromContext(ctx).WithValues("component", "namespace-scope-lease-watcher")
lw.logger.Info("Starting namespace scope marker lease watcher")
// Get the lease informer
leaseInformer := lw.informerFactory.Coordination().V1().Leases().Informer()
// Add event handler for namespace scope marker leases
_, err := leaseInformer.AddEventHandler(k8sCache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if lease := lw.extractLease(obj); lease != nil {
lw.handleLeaseAdd(lease)
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
if lease := lw.extractLease(newObj); lease != nil {
lw.handleLeaseUpdate(lease)
}
},
DeleteFunc: func(obj interface{}) {
if lease := lw.extractLease(obj); lease != nil {
lw.handleLeaseDelete(lease)
}
},
})
if err != nil {
return err
}
// Start informers
lw.informerFactory.Start(ctx.Done())
// Wait for cache sync
if !k8sCache.WaitForCacheSync(ctx.Done(), leaseInformer.HasSynced) {
err := errors.New("failed to sync lease informer cache")
lw.logger.Error(err, "Lease watcher cache sync failed")
return err
}
lw.logger.Info("Namespace scope marker lease watcher started and cache synced")
return nil
}
// extractLease safely extracts a Lease from an event object, handling tombstones.
// Returns nil if the object is not a valid Lease.
func (lw *LeaseWatcher) extractLease(obj any) *coordinationv1.Lease {
// Handle DeletedFinalStateUnknown tombstones
if tombstone, ok := obj.(k8sCache.DeletedFinalStateUnknown); ok {
obj = tombstone.Obj
}
// Type assert to Lease
lease, ok := obj.(*coordinationv1.Lease)
if !ok {
lw.logger.V(1).Info("Received non-Lease object in lease watcher, ignoring",
"objectType", fmt.Sprintf("%T", obj))
return nil
}
return lease
}
// handleLeaseAdd handles lease creation events
func (lw *LeaseWatcher) handleLeaseAdd(lease *coordinationv1.Lease) {
// Only process namespace scope marker leases
if !lw.isNamespaceScopeMarker(lease) {
return
}
// Don't add if already expired (defensive - shouldn't happen in practice)
if lw.isLeaseExpired(lease) {
lw.logger.V(1).Info("Ignoring already-expired lease on add",
"namespace", lease.Namespace)
return
}
lw.addExcludedNamespace(lease)
}
// handleLeaseUpdate handles lease update events (renewals)
func (lw *LeaseWatcher) handleLeaseUpdate(lease *coordinationv1.Lease) {
// Only process namespace scope marker leases
if !lw.isNamespaceScopeMarker(lease) {
return
}
// If lease expired, remove from exclusion list
// This handles the critical case where namespace-scoped operator crashes
// without deleting its lease - we detect expiry and resume processing
if lw.isLeaseExpired(lease) {
lw.logger.Info("Lease expired on update, resuming cluster-wide processing",
"namespace", lease.Namespace,
"renewTime", lease.Spec.RenewTime,
"leaseDuration", lease.Spec.LeaseDurationSeconds)
lw.removeExcludedNamespace(lease.Namespace)
return
}
// Lease still valid - update with fresh lease object (refreshes RenewTime)
lw.addExcludedNamespace(lease)
}
// handleLeaseDelete handles lease deletion/expiration events
func (lw *LeaseWatcher) handleLeaseDelete(lease *coordinationv1.Lease) {
// Only process namespace scope marker leases
if !lw.isNamespaceScopeMarker(lease) {
return
}
lw.removeExcludedNamespace(lease.Namespace)
}
// addExcludedNamespace adds a namespace to the exclusion list
func (lw *LeaseWatcher) addExcludedNamespace(lease *coordinationv1.Lease) {
holderIdentity := ""
if lease.Spec.HolderIdentity != nil {
holderIdentity = *lease.Spec.HolderIdentity
}
// Store the full lease object so Contains() can check TTL on every access
lw.excludedNamespaces.Store(lease.Namespace, lease)
lw.logger.Info("Excluding namespace from cluster-wide operator processing",
"namespace", lease.Namespace,
"holderIdentity", holderIdentity)
}
// removeExcludedNamespace removes a namespace from the exclusion list
func (lw *LeaseWatcher) removeExcludedNamespace(namespace string) {
lw.excludedNamespaces.Delete(namespace)
lw.logger.Info("Resuming namespace processing in cluster-wide operator",
"namespace", namespace,
"reason", "namespace-restricted operator lease expired or deleted")
}
// isNamespaceScopeMarker checks if a lease is a namespace scope marker
func (lw *LeaseWatcher) isNamespaceScopeMarker(lease *coordinationv1.Lease) bool {
// A lease is a namespace scope marker if it has the well-known name
// Labels are added for observability/filtering but not required for identification
return lease.Name == LeaseName
}
// isLeaseExpired checks if a lease has exceeded its TTL.
// This is critical for handling stale leases when the namespace-scoped operator
// crashes without gracefully deleting its lease.
func (lw *LeaseWatcher) isLeaseExpired(lease *coordinationv1.Lease) bool {
if lease.Spec.RenewTime == nil || lease.Spec.LeaseDurationSeconds == nil {
// Missing required fields - treat as expired for safety
lw.logger.V(1).Info("Lease missing RenewTime or LeaseDurationSeconds, treating as expired",
"namespace", lease.Namespace,
"hasRenewTime", lease.Spec.RenewTime != nil,
"hasLeaseDuration", lease.Spec.LeaseDurationSeconds != nil)
return true
}
expiryTime := lease.Spec.RenewTime.Add(
time.Duration(*lease.Spec.LeaseDurationSeconds) * time.Second,
)
return time.Now().After(expiryTime)
}
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package namespace_scope
import (
"testing"
"time"
"github.com/go-logr/logr"
coordinationv1 "k8s.io/api/coordination/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8sCache "k8s.io/client-go/tools/cache"
"k8s.io/utils/ptr"
)
func TestLeaseWatcher_HandleLeaseAdd(t *testing.T) {
tests := []struct {
name string
lease *coordinationv1.Lease
shouldExclude bool
excludedNamespace string
}{
{
name: "adds namespace for valid marker lease",
lease: &coordinationv1.Lease{
ObjectMeta: metav1.ObjectMeta{
Name: LeaseName,
Namespace: "test-ns",
},
Spec: coordinationv1.LeaseSpec{
HolderIdentity: ptr.To("namespace-restricted-operator-v1.0.0"),
LeaseDurationSeconds: ptr.To[int32](30),
RenewTime: &metav1.MicroTime{Time: time.Now()},
},
},
shouldExclude: true,
excludedNamespace: "test-ns",
},
{
name: "ignores lease with wrong name",
lease: &coordinationv1.Lease{
ObjectMeta: metav1.ObjectMeta{
Name: "other-lease",
Namespace: "test-ns",
},
},
shouldExclude: false,
excludedNamespace: "test-ns",
},
{
name: "adds namespace for lease without labels",
lease: &coordinationv1.Lease{
ObjectMeta: metav1.ObjectMeta{
Name: LeaseName,
Namespace: "test-ns",
// No labels - still identified by name
},
Spec: coordinationv1.LeaseSpec{
HolderIdentity: ptr.To("namespace-restricted-operator-v1.0.0"),
LeaseDurationSeconds: ptr.To[int32](30),
RenewTime: &metav1.MicroTime{Time: time.Now()},
},
},
shouldExclude: true,
excludedNamespace: "test-ns",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
logger := logr.Discard()
lw := &LeaseWatcher{
logger: logger,
}
// Handle lease add
lw.handleLeaseAdd(tt.lease)
// Check if namespace was excluded
got := lw.Contains(tt.excludedNamespace)
if got != tt.shouldExclude {
t.Errorf("namespace exclusion = %v, want %v", got, tt.shouldExclude)
}
})
}
}
func TestLeaseWatcher_HandleLeaseUpdate(t *testing.T) {
logger := logr.Discard()
lw := &LeaseWatcher{
logger: logger,
}
lease := &coordinationv1.Lease{
ObjectMeta: metav1.ObjectMeta{
Name: LeaseName,
Namespace: "test-ns",
},
Spec: coordinationv1.LeaseSpec{
HolderIdentity: ptr.To("namespace-restricted-operator-v1.0.0"),
LeaseDurationSeconds: ptr.To[int32](30),
RenewTime: &metav1.MicroTime{Time: time.Now()},
},
}
// Handle lease update (should add if not present)
lw.handleLeaseUpdate(lease)
// Verify namespace was added
if !lw.Contains("test-ns") {
t.Error("namespace should be excluded after update")
}
// Handle another update (should remain)
lw.handleLeaseUpdate(lease)
// Verify namespace is still excluded
if !lw.Contains("test-ns") {
t.Error("namespace should still be excluded after second update")
}
}
func TestLeaseWatcher_HandleLeaseDelete(t *testing.T) {
logger := logr.Discard()
lw := &LeaseWatcher{
logger: logger,
}
lease := &coordinationv1.Lease{
ObjectMeta: metav1.ObjectMeta{
Name: LeaseName,
Namespace: "test-ns",
},
Spec: coordinationv1.LeaseSpec{
HolderIdentity: ptr.To("operator-v1.0.0"),
LeaseDurationSeconds: ptr.To[int32](30),
RenewTime: &metav1.MicroTime{Time: time.Now()},
},
}
// Pre-add namespace
lw.addExcludedNamespace(lease)
// Verify namespace is excluded before delete
if !lw.Contains("test-ns") {
t.Fatal("namespace should be excluded before delete")
}
// Handle lease delete
lw.handleLeaseDelete(lease)
// Verify namespace was removed
if lw.Contains("test-ns") {
t.Error("namespace should not be excluded after delete")
}
}
func TestLeaseWatcher_ExtractLease(t *testing.T) {
logger := logr.Discard()
lw := &LeaseWatcher{
logger: logger,
}
tests := []struct {
name string
obj any
wantNil bool
wantName string
}{
{
name: "extracts regular lease object",
obj: &coordinationv1.Lease{
ObjectMeta: metav1.ObjectMeta{
Name: "test-lease",
Namespace: "test-ns",
},
},
wantNil: false,
wantName: "test-lease",
},
{
name: "extracts lease from tombstone",
obj: k8sCache.DeletedFinalStateUnknown{
Key: "test-ns/test-lease",
Obj: &coordinationv1.Lease{
ObjectMeta: metav1.ObjectMeta{
Name: "test-lease",
Namespace: "test-ns",
},
},
},
wantNil: false,
wantName: "test-lease",
},
{
name: "returns nil for non-Lease object",
obj: &coordinationv1.LeaseList{},
wantNil: true,
},
{
name: "returns nil for tombstone with non-Lease object",
obj: k8sCache.DeletedFinalStateUnknown{
Key: "test-ns/test-obj",
Obj: &coordinationv1.LeaseList{},
},
wantNil: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
lease := lw.extractLease(tt.obj)
if (lease == nil) != tt.wantNil {
t.Errorf("extractLease() returned nil = %v, want nil = %v", lease == nil, tt.wantNil)
}
if !tt.wantNil && lease.Name != tt.wantName {
t.Errorf("extractLease() lease.Name = %v, want %v", lease.Name, tt.wantName)
}
})
}
}
func TestLeaseWatcher_IsNamespaceScopeMarker(t *testing.T) {
logger := logr.Discard()
lw := &LeaseWatcher{
logger: logger,
}
tests := []struct {
name string
lease *coordinationv1.Lease
want bool
}{
{
name: "returns true for lease with correct name",
lease: &coordinationv1.Lease{
ObjectMeta: metav1.ObjectMeta{
Name: LeaseName,
Namespace: "test-ns",
},
},
want: true,
},
{
name: "returns true for lease with correct name and no labels",
lease: &coordinationv1.Lease{
ObjectMeta: metav1.ObjectMeta{
Name: LeaseName,
Namespace: "test-ns",
},
},
want: true,
},
{
name: "returns false for lease with wrong name",
lease: &coordinationv1.Lease{
ObjectMeta: metav1.ObjectMeta{
Name: "other-lease",
Namespace: "test-ns",
},
},
want: false,
},
{
name: "returns false for lease with wrong name even if other metadata exists",
lease: &coordinationv1.Lease{
ObjectMeta: metav1.ObjectMeta{
Name: "wrong-name",
Namespace: "test-ns",
},
},
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := lw.isNamespaceScopeMarker(tt.lease)
if got != tt.want {
t.Errorf("isNamespaceScopeMarker() = %v, want %v", got, tt.want)
}
})
}
}
func TestLeaseWatcher_MultipleNamespaces(t *testing.T) {
logger := logr.Discard()
lw := &LeaseWatcher{
logger: logger,
}
// Add multiple namespaces
namespaces := []string{"test-ns", "staging", "dev"}
for _, ns := range namespaces {
lease := &coordinationv1.Lease{
ObjectMeta: metav1.ObjectMeta{
Name: LeaseName,
Namespace: ns,
},
Spec: coordinationv1.LeaseSpec{
HolderIdentity: ptr.To("operator-v1.0.0"),
LeaseDurationSeconds: ptr.To[int32](30),
RenewTime: &metav1.MicroTime{Time: time.Now()},
},
}
lw.handleLeaseAdd(lease)
}
// Verify all are excluded
for _, ns := range namespaces {
if !lw.Contains(ns) {
t.Errorf("namespace %s should be excluded", ns)
}
}
// Delete one namespace
deleteLease := &coordinationv1.Lease{
ObjectMeta: metav1.ObjectMeta{
Name: LeaseName,
Namespace: "staging",
},
}
lw.handleLeaseDelete(deleteLease)
// Verify staging is removed but others remain
if lw.Contains("staging") {
t.Error("staging should not be excluded after delete")
}
if !lw.Contains("test-ns") {
t.Error("test-ns should still be excluded")
}
if !lw.Contains("dev") {
t.Error("dev should still be excluded")
}
}
......@@ -19,6 +19,85 @@ Dynamo operator is a Kubernetes operator that simplifies the deployment, configu
3. Kubernetes resources (Deployments, Services, etc.) are created or updated to match the CR spec.
4. Status fields are updated to reflect the current state.
## Deployment Modes
The Dynamo operator supports three deployment modes to accommodate different cluster environments and use cases:
### 1. Cluster-Wide Mode (Default)
The operator monitors and manages DynamoGraph resources across **all namespaces** in the cluster.
**When to Use:**
- You have full cluster admin access
- You want centralized management of all Dynamo workloads
- Standard production deployment on a dedicated cluster
---
### 2. Namespace-Scoped Mode
The operator monitors and manages DynamoGraph resources **only in a specific namespace**. A lease marker is created to signal the operator's presence to any cluster-wide operators.
**When to Use:**
- You're on a shared/multi-tenant cluster
- You only have namespace-level permissions
- You want to test a new operator version in isolation
- You need to avoid conflicts with other operators
**Installation:**
```bash
helm install dynamo-platform dynamo-platform-${RELEASE_VERSION}.tgz \
--namespace my-namespace \
--create-namespace \
--set dynamo-operator.namespaceRestriction.enabled=true
```
---
### 3. Hybrid Mode
A **cluster-wide operator** manages most namespaces, while **one or more namespace-scoped operators** run in specific namespaces (e.g., for testing new versions). The cluster-wide operator automatically detects and excludes namespaces with namespace-scoped operators using lease markers.
**When to Use:**
- Running production workloads with a stable operator version
- Testing new operator versions in isolated namespaces without affecting production
- Gradual rollout of operator updates
- Development/staging environments on production clusters
**How It Works:**
1. Namespace-scoped operator creates a lease named `dynamo-operator-namespace-scope` in its namespace
2. Cluster-wide operator watches for these lease markers across all namespaces
3. Cluster-wide operator automatically excludes any namespace with a lease marker
4. If namespace-scoped operator stops, its lease expires (TTL: 30s by default)
5. Cluster-wide operator automatically resumes managing that namespace
**Setup Example:**
```bash
# 1. Install cluster-wide operator (production, v1.0.0)
helm install dynamo-platform dynamo-platform-${RELEASE_VERSION}.tgz \
--namespace dynamo-system \
--create-namespace
# 2. Install namespace-scoped operator (testing, v2.0.0-beta)
helm install dynamo-test dynamo-platform-${RELEASE_VERSION}.tgz \
--namespace test-namespace \
--create-namespace \
--set dynamo-operator.namespaceRestriction.enabled=true \
--set dynamo-operator.controllerManager.manager.image.tag=v2.0.0-beta
**Observability:**
```bash
# List all namespaces with local operators
kubectl get lease -A --field-selector metadata.name=dynamo-operator-namespace-scope
# Check which operator version is running in a namespace
kubectl get lease -n my-namespace dynamo-operator-namespace-scope \
-o jsonpath='{.spec.holderIdentity}'
```
## Custom Resource Definitions (CRDs)
For the complete technical API reference for Dynamo Custom Resource Definitions, see:
......@@ -39,14 +118,7 @@ helm fetch https://helm.ngc.nvidia.com/nvidia/ai-dynamo/charts/dynamo-platform-$
helm install dynamo-platform dynamo-platform-${RELEASE_VERSION}.tgz --namespace ${NAMESPACE} --create-namespace
```
For namespace-restricted installations (shared clusters), you'll need to install the Dynamo platform in each namespace you want to deploy to.
Namespace restriction is enabled by setting the `dynamo-operator.namespaceRestriction.enabled` flag to `true`.
```bash
helm install dynamo-platform dynamo-platform-${RELEASE_VERSION}.tgz \
--namespace ${NAMESPACE} \
--create-namespace \
--set dynamo-operator.namespaceRestriction.enabled=true
```
> **Note:** For shared/multi-tenant clusters or testing scenarios, see [Deployment Modes](#deployment-modes) above for namespace-scoped and hybrid configurations.
### Building from Source
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment