Unverified Commit 53a609e5 authored by Julien Mancuso's avatar Julien Mancuso Committed by GitHub
Browse files

feat: add dynamo operator observability (#5543)


Signed-off-by: default avatarJulien Mancuso <jmancuso@nvidia.com>
parent 1032076d
......@@ -12,20 +12,25 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
{{- if and .Values.metricsService.enabled (not .Values.namespaceRestriction.enabled) }}
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
{{- if and .Values.metricsService.enabled (.Capabilities.APIVersions.Has "monitoring.coreos.com/v1") }}
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: {{ include "dynamo-operator.fullname" . }}-metrics-reader
name: {{ include "dynamo-operator.fullname" . }}-operator
namespace: {{ .Release.Namespace }}
labels:
app.kubernetes.io/component: kube-rbac-proxy
app.kubernetes.io/created-by: dynamo-operator
app.kubernetes.io/part-of: dynamo-operator
release: prometheus
{{- include "dynamo-operator.labels" . | nindent 4 }}
rules:
- nonResourceURLs:
- /metrics
verbs:
- get
spec:
selector:
matchLabels:
control-plane: controller-manager
{{- include "dynamo-operator.selectorLabels" . | nindent 6 }}
endpoints:
- port: https
scheme: https
path: /metrics
interval: 15s
tlsConfig:
insecureSkipVerify: true
{{- end }}
......@@ -52,6 +52,7 @@ controllerManager:
- --upstream=http://127.0.0.1:8080/
- --logtostderr=true
- --v=0
- --ignore-paths=/metrics
containerSecurityContext:
allowPrivilegeEscalation: false
capabilities:
......@@ -127,6 +128,7 @@ dynamo:
#imagePullSecrets: []
kubernetesClusterDomain: cluster.local
metricsService:
enabled: true
ports:
- name: https
port: 8443
......
This diff is collapsed.
......@@ -18,9 +18,12 @@
package v1alpha1
import (
"fmt"
"strings"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
)
// DynamoModelSpec defines the desired state of DynamoModel
......@@ -144,3 +147,26 @@ func (m *DynamoModel) HasEndpoints() bool {
func (m *DynamoModel) HasReadyEndpoints() bool {
return m.Status.ReadyEndpoints > 0
}
// IsReady returns true if all endpoints are ready
func (m *DynamoModel) IsReady() (bool, string) {
if m.Status.TotalEndpoints == 0 {
return false, "No endpoints configured"
}
if m.Status.ReadyEndpoints == 0 {
return false, "No endpoints ready"
}
if m.Status.ReadyEndpoints < m.Status.TotalEndpoints {
return false, fmt.Sprintf("Only %d/%d endpoints ready", m.Status.ReadyEndpoints, m.Status.TotalEndpoints)
}
return true, ""
}
// GetState returns "ready" or "not_ready" based on endpoint status
func (m *DynamoModel) GetState() string {
ready, _ := m.IsReady()
if ready {
return consts.ResourceStateReady
}
return consts.ResourceStateNotReady
}
......@@ -229,6 +229,15 @@ func (s *DynamoComponentDeployment) IsReady() (bool, string) {
return ready, reason
}
// GetState returns "ready" or "not_ready" based on conditions
func (d *DynamoComponentDeployment) GetState() string {
ready, _ := d.IsReady()
if ready {
return commonconsts.ResourceStateReady
}
return commonconsts.ResourceStateNotReady
}
func (s *DynamoComponentDeployment) GetServiceStatuses() map[string]ServiceReplicaStatus {
if s.Status.Service == nil {
return map[string]ServiceReplicaStatus{}
......
......@@ -22,6 +22,8 @@ package v1alpha1
import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
)
// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
......@@ -187,6 +189,14 @@ func (s *DynamoGraphDeployment) SetState(state string) {
s.Status.State = state
}
// GetState returns the current lifecycle state
func (d *DynamoGraphDeployment) GetState() string {
if d.Status.State == "" {
return consts.ResourceStateUnknown
}
return d.Status.State
}
// +kubebuilder:object:root=true
// DynamoGraphDeploymentList contains a list of DynamoGraphDeployment.
......
......@@ -28,6 +28,8 @@ import (
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
runtime "k8s.io/apimachinery/pkg/runtime"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
)
// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
......@@ -273,6 +275,14 @@ func (s *DynamoGraphDeploymentRequest) SetState(state string) {
s.Status.State = state
}
// GetState returns the current lifecycle state
func (d *DynamoGraphDeploymentRequest) GetState() string {
if d.Status.State == "" {
return consts.ResourceStateUnknown
}
return d.Status.State
}
// GetSpec returns the spec of this DGDR as a generic interface.
// Implements a common interface used by controller utilities.
func (s *DynamoGraphDeploymentRequest) GetSpec() any {
......
......@@ -19,6 +19,8 @@ package v1alpha1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
)
// DynamoGraphDeploymentScalingAdapterSpec defines the desired state of DynamoGraphDeploymentScalingAdapter
......@@ -100,3 +102,23 @@ type DynamoGraphDeploymentScalingAdapterList struct {
func init() {
SchemeBuilder.Register(&DynamoGraphDeploymentScalingAdapter{}, &DynamoGraphDeploymentScalingAdapterList{})
}
// IsReady returns true if the adapter has active replicas and a selector
func (d *DynamoGraphDeploymentScalingAdapter) IsReady() (bool, string) {
if d.Status.Selector == "" {
return false, "Selector not set"
}
if d.Status.Replicas == 0 {
return false, "No replicas"
}
return true, ""
}
// GetState returns "ready" or "not_ready"
func (d *DynamoGraphDeploymentScalingAdapter) GetState() string {
ready, _ := d.IsReady()
if ready {
return consts.ResourceStateReady
}
return consts.ResourceStateNotReady
}
......@@ -62,6 +62,7 @@ import (
"github.com/ai-dynamo/dynamo/deploy/operator/internal/etcd"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/modelendpoint"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/namespace_scope"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/observability"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/rbac"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/secret"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/secrets"
......@@ -339,6 +340,10 @@ func main() {
os.Exit(1)
}
// Initialize observability metrics
setupLog.Info("Initializing observability metrics")
observability.InitMetrics()
// Initialize namespace scope mechanism
var leaseManager *namespace_scope.LeaseManager
var leaseWatcher *namespace_scope.LeaseWatcher
......@@ -408,10 +413,14 @@ func main() {
}
setupLog.Info("Namespace scope marker lease watcher started successfully")
}
// Pass leaseWatcher to controller config for namespace exclusion filtering
ctrlConfig.ExcludedNamespaces = leaseWatcher
}
// Start resource counter background goroutine (after ExcludedNamespaces is set)
setupLog.Info("Starting resource counter")
go observability.StartResourceCounter(mainCtx, mgr.GetClient(), ctrlConfig.ExcludedNamespaces)
// Detect orchestrators availability using discovery client
setupLog.Info("Detecting Grove availability...")
......
......@@ -97,6 +97,19 @@ const (
MainContainerName = "main"
RestartAnnotation = "nvidia.com/restartAt"
// Resource type constants - match Kubernetes Kind names
// Used consistently across controllers, webhooks, and metrics
ResourceTypeDynamoGraphDeployment = "DynamoGraphDeployment"
ResourceTypeDynamoComponentDeployment = "DynamoComponentDeployment"
ResourceTypeDynamoModel = "DynamoModel"
ResourceTypeDynamoGraphDeploymentRequest = "DynamoGraphDeploymentRequest"
ResourceTypeDynamoGraphDeploymentScalingAdapter = "DynamoGraphDeploymentScalingAdapter"
// Resource state constants - used in status reporting and metrics
ResourceStateReady = "ready"
ResourceStateNotReady = "not_ready"
ResourceStateUnknown = "unknown"
)
type MultinodeDeploymentType string
......
......@@ -42,6 +42,7 @@ import (
commoncontroller "github.com/ai-dynamo/dynamo/deploy/operator/internal/controller_common"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/dynamo"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/modelendpoint"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/observability"
webhookvalidation "github.com/ai-dynamo/dynamo/deploy/operator/internal/webhook/validation"
)
......@@ -294,6 +295,7 @@ func (r *DynamoModelReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&v1alpha1.DynamoModel{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Named(consts.ResourceTypeDynamoModel).
// Watch EndpointSlices - reconcile when endpoints change (Service changes trigger EndpointSlice updates)
Watches(
&discoveryv1.EndpointSlice{},
......@@ -303,7 +305,7 @@ func (r *DynamoModelReconciler) SetupWithManager(mgr ctrl.Manager) error {
}),
).
WithEventFilter(commoncontroller.EphemeralDeploymentEventFilter(r.Config)). // set the event filter to ignore resources handled by other controllers in namespace-restricted mode
Complete(r)
Complete(observability.NewObservedReconciler(r, consts.ResourceTypeDynamoModel))
}
// findModelsForEndpointSlice maps an EndpointSlice to DynamoModels
......
......@@ -38,6 +38,7 @@ import (
commonconsts "github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
commonController "github.com/ai-dynamo/dynamo/deploy/operator/internal/controller_common"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/dynamo"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/observability"
webhookvalidation "github.com/ai-dynamo/dynamo/deploy/operator/internal/webhook/validation"
networkingv1beta1 "istio.io/client-go/pkg/apis/networking/v1beta1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
......@@ -1386,6 +1387,7 @@ type IngressConfig struct {
func (r *DynamoComponentDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error {
m := ctrl.NewControllerManagedBy(mgr).
For(&v1alpha1.DynamoComponentDeployment{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Named(commonconsts.ResourceTypeDynamoComponentDeployment).
Owns(&appsv1.Deployment{}, builder.WithPredicates(predicate.Funcs{
// ignore creation cause we don't want to be called again after we create the deployment
CreateFunc: func(ce event.CreateEvent) bool { return false },
......@@ -1419,7 +1421,9 @@ func (r *DynamoComponentDeploymentReconciler) SetupWithManager(mgr ctrl.Manager)
m.Owns(&networkingv1beta1.VirtualService{}, builder.WithPredicates(predicate.GenerationChangedPredicate{}))
}
m.Owns(&autoscalingv2.HorizontalPodAutoscaler{})
return m.Complete(r)
// Wrap with metrics collection
observedReconciler := observability.NewObservedReconciler(r, commonconsts.ResourceTypeDynamoComponentDeployment)
return m.Complete(observedReconciler)
}
func (r *DynamoComponentDeploymentReconciler) GetRecorder() record.EventRecorder {
......
......@@ -51,6 +51,7 @@ import (
"github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
commoncontroller "github.com/ai-dynamo/dynamo/deploy/operator/internal/controller_common"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/dynamo"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/observability"
webhookvalidation "github.com/ai-dynamo/dynamo/deploy/operator/internal/webhook/validation"
rbacv1 "k8s.io/api/rbac/v1"
)
......@@ -1174,7 +1175,7 @@ func (r *DynamoGraphDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) err
For(&nvidiacomv1alpha1.DynamoGraphDeployment{}, builder.WithPredicates(
predicate.GenerationChangedPredicate{},
)).
Named("dynamographdeployment").
Named(consts.ResourceTypeDynamoGraphDeployment).
Owns(&nvidiacomv1alpha1.DynamoComponentDeployment{}, builder.WithPredicates(predicate.Funcs{
// ignore creation cause we don't want to be called again after we create the deployment
CreateFunc: func(ce event.CreateEvent) bool { return false },
......@@ -1229,7 +1230,9 @@ func (r *DynamoGraphDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) err
}),
)
}
return ctrlBuilder.Complete(r)
// Wrap with metrics collection
observedReconciler := observability.NewObservedReconciler(r, consts.ResourceTypeDynamoGraphDeployment)
return ctrlBuilder.Complete(observedReconciler)
}
func (r *DynamoGraphDeploymentReconciler) GetRecorder() record.EventRecorder {
......
......@@ -45,7 +45,9 @@ import (
sigsyaml "sigs.k8s.io/yaml"
nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
commonController "github.com/ai-dynamo/dynamo/deploy/operator/internal/controller_common"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/observability"
webhookvalidation "github.com/ai-dynamo/dynamo/deploy/operator/internal/webhook/validation"
)
......@@ -1554,6 +1556,7 @@ func (r *DynamoGraphDeploymentRequestReconciler) updateStateWithCondition(
func (r *DynamoGraphDeploymentRequestReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&nvidiacomv1alpha1.DynamoGraphDeploymentRequest{}).
Named(consts.ResourceTypeDynamoGraphDeploymentRequest).
Owns(&batchv1.Job{}, builder.WithPredicates(predicate.Funcs{
// ignore creation cause we don't want to be called again after we create the job
CreateFunc: func(ce event.CreateEvent) bool { return false },
......@@ -1587,5 +1590,5 @@ func (r *DynamoGraphDeploymentRequestReconciler) SetupWithManager(mgr ctrl.Manag
}),
). // Watch DGDs created by this controller (via label)
WithEventFilter(commonController.EphemeralDeploymentEventFilter(r.Config)). // set the event filter to ignore resources handled by other controllers in namespace-restricted mode
Complete(r)
Complete(observability.NewObservedReconciler(r, consts.ResourceTypeDynamoGraphDeploymentRequest))
}
......@@ -39,6 +39,7 @@ import (
nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
commonController "github.com/ai-dynamo/dynamo/deploy/operator/internal/controller_common"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/observability"
)
// DynamoGraphDeploymentScalingAdapterReconciler reconciles a DynamoGraphDeploymentScalingAdapter object
......@@ -155,7 +156,7 @@ func (r *DynamoGraphDeploymentScalingAdapterReconciler) SetupWithManager(mgr ctr
For(&nvidiacomv1alpha1.DynamoGraphDeploymentScalingAdapter{}, builder.WithPredicates(
predicate.GenerationChangedPredicate{},
)).
Named("dgdscalingadapter").
Named(consts.ResourceTypeDynamoGraphDeploymentScalingAdapter).
// Watch DGDs to sync status when DGD service replicas change
Watches(
&nvidiacomv1alpha1.DynamoGraphDeployment{},
......@@ -177,7 +178,7 @@ func (r *DynamoGraphDeploymentScalingAdapterReconciler) SetupWithManager(mgr ctr
}),
).
WithEventFilter(commonController.EphemeralDeploymentEventFilter(r.Config)).
Complete(r)
Complete(observability.NewObservedReconciler(r, consts.ResourceTypeDynamoGraphDeploymentScalingAdapter))
}
// findAdaptersForDGD maps DGD changes to adapter reconcile requests
......
/*
* SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package observability
import (
"time"
"github.com/prometheus/client_golang/prometheus"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
)
const (
metricsNamespace = "dynamo_operator"
)
var (
// Reconciliation metrics
reconcileDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: metricsNamespace,
Name: "reconcile_duration_seconds",
Help: "Duration of reconciliation loops in seconds",
Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10, 30, 60},
},
[]string{"resource_type", "namespace", "result"},
)
reconcileTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: metricsNamespace,
Name: "reconcile_total",
Help: "Total number of reconciliations by resource type",
},
[]string{"resource_type", "namespace", "result"},
)
reconcileErrors = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: metricsNamespace,
Name: "reconcile_errors_total",
Help: "Total number of reconciliation errors by resource type and error type",
},
[]string{"resource_type", "namespace", "error_type"},
)
// Resource metrics (populated by resource counter)
resourcesTotal = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: metricsNamespace,
Name: "resources_total",
Help: "Total number of resources by type and status",
},
[]string{"resource_type", "namespace", "status"},
)
// Webhook metrics
webhookDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: metricsNamespace,
Name: "webhook_duration_seconds",
Help: "Duration of webhook validation in seconds",
Buckets: []float64{.001, .005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5},
},
[]string{"resource_type", "operation"},
)
webhookRequestsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: metricsNamespace,
Name: "webhook_requests_total",
Help: "Total number of webhook admission requests",
},
[]string{"resource_type", "operation", "result"},
)
webhookDenialsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: metricsNamespace,
Name: "webhook_denials_total",
Help: "Total number of webhook admission denials",
},
[]string{"resource_type", "operation", "reason"},
)
)
// InitMetrics registers all custom metrics with the controller-runtime metrics registry
func InitMetrics() {
ctrlmetrics.Registry.MustRegister(
reconcileDuration,
reconcileTotal,
reconcileErrors,
resourcesTotal,
webhookDuration,
webhookRequestsTotal,
webhookDenialsTotal,
)
}
// RecordReconciliation records metrics for a reconciliation loop
func RecordReconciliation(resourceType, namespace string, err error, requeue bool, duration time.Duration) {
result := "success"
if err != nil {
result = "error"
errorType := categorizeError(err)
reconcileErrors.WithLabelValues(resourceType, namespace, errorType).Inc()
} else if requeue {
result = "requeue"
}
reconcileDuration.WithLabelValues(resourceType, namespace, result).Observe(duration.Seconds())
reconcileTotal.WithLabelValues(resourceType, namespace, result).Inc()
}
// RecordWebhookAdmission records metrics for a webhook admission request
func RecordWebhookAdmission(resourceType, operation string, allowed bool, duration time.Duration) {
webhookDuration.WithLabelValues(resourceType, operation).Observe(duration.Seconds())
result := "allowed"
if !allowed {
result = "denied"
}
webhookRequestsTotal.WithLabelValues(resourceType, operation, result).Inc()
}
// RecordWebhookDenial records a webhook denial with a categorized reason
func RecordWebhookDenial(resourceType, operation string, err error) {
reason := categorizeError(err)
webhookDenialsTotal.WithLabelValues(resourceType, operation, reason).Inc()
}
// UpdateResourceCount updates the gauge for a specific resource type and status
func UpdateResourceCount(resourceType, namespace, status string, count float64) {
resourcesTotal.WithLabelValues(resourceType, namespace, status).Set(count)
}
// categorizeError categorizes Kubernetes errors for better metrics granularity
func categorizeError(err error) string {
if err == nil {
return "none"
}
switch {
case k8serrors.IsNotFound(err):
return "not_found"
case k8serrors.IsAlreadyExists(err):
return "already_exists"
case k8serrors.IsConflict(err):
return "conflict"
case k8serrors.IsInvalid(err):
return "validation"
case k8serrors.IsBadRequest(err):
return "bad_request"
case k8serrors.IsUnauthorized(err):
return "unauthorized"
case k8serrors.IsForbidden(err):
return "forbidden"
case k8serrors.IsTimeout(err):
return "timeout"
case k8serrors.IsServerTimeout(err):
return "server_timeout"
case k8serrors.IsServiceUnavailable(err):
return "unavailable"
case k8serrors.IsTooManyRequests(err):
return "rate_limited"
default:
return "internal"
}
}
/*
* SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package observability
import (
"context"
"time"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
// ObservedReconciler wraps any reconciler and automatically records metrics
// for reconciliation duration, results, and errors.
type ObservedReconciler struct {
reconcile.Reconciler
resourceType string
}
// NewObservedReconciler creates a new ObservedReconciler wrapper
func NewObservedReconciler(r reconcile.Reconciler, resourceType string) *ObservedReconciler {
return &ObservedReconciler{
Reconciler: r,
resourceType: resourceType,
}
}
// Reconcile wraps the underlying reconciler's Reconcile method with metrics collection
func (m *ObservedReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
startTime := time.Now()
result, err := m.Reconciler.Reconcile(ctx, req)
duration := time.Since(startTime)
// Determine if a requeue is happening
//nolint:staticcheck // SA1019: result.Requeue is deprecated but still supported by controller-runtime
requeue := result.Requeue || result.RequeueAfter > 0
// Record reconciliation metrics
RecordReconciliation(
m.resourceType,
req.Namespace,
err,
requeue,
duration,
)
return result, err
}
/*
* SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package observability
import (
"context"
"time"
"github.com/go-logr/logr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
)
const (
resourceCountInterval = 30 * time.Second
)
// StateProvider defines the interface for resources that can report their state
type StateProvider interface {
client.Object
GetState() string
}
// ExcludedNamespaces defines the interface for checking namespace exclusions
type ExcludedNamespaces interface {
Contains(namespace string) bool
}
// StartResourceCounter starts a background goroutine that periodically updates resource count metrics.
// It uses the manager's cached client to avoid loading the API server.
// The client's cache scope is automatically determined by the manager's configuration:
// - Namespace-restricted operators: cache is scoped to specific namespace
// - Cluster-wide operators: cache includes all namespaces (except those filtered by excludedNamespaces)
// The excludedNamespaces parameter allows filtering out namespaces managed by namespace-restricted operators.
func StartResourceCounter(ctx context.Context, c client.Client, excludedNamespaces ExcludedNamespaces) {
logger := log.FromContext(ctx).WithName("resource-counter")
logger.Info("Starting resource counter", "interval", resourceCountInterval)
ticker := time.NewTicker(resourceCountInterval)
defer ticker.Stop()
// Initial update
updateResourceMetrics(ctx, c, excludedNamespaces, logger)
for {
select {
case <-ctx.Done():
logger.Info("Stopping resource counter")
return
case <-ticker.C:
updateResourceMetrics(ctx, c, excludedNamespaces, logger)
}
}
}
// updateResourceMetrics queries all CRDs and updates gauges
// The client's cache scope determines which namespaces are queried
func updateResourceMetrics(ctx context.Context, c client.Client, excludedNamespaces ExcludedNamespaces, logger logr.Logger) {
// Count all resource types
updateDynamoGraphDeploymentCounts(ctx, c, excludedNamespaces, logger)
updateDynamoComponentDeploymentCounts(ctx, c, excludedNamespaces, logger)
updateDynamoModelCounts(ctx, c, excludedNamespaces, logger)
updateDynamoGraphDeploymentRequestCounts(ctx, c, excludedNamespaces, logger)
updateDynamoGraphDeploymentScalingAdapterCounts(ctx, c, excludedNamespaces, logger)
}
// countResourcesByState is a generic helper for all resources with GetState()
// It takes a slice of value types and iterates by index to avoid extra allocations
func countResourcesByState[T any, PT StateProvider](
items []T,
excludedNamespaces ExcludedNamespaces,
resourceType string,
toPtrFunc func(*T) PT,
) {
// Count by state and namespace
counts := make(map[string]map[string]int)
for i := range items {
item := toPtrFunc(&items[i])
namespace := item.GetNamespace()
// Skip if namespace is managed by a namespace-restricted operator
if excludedNamespaces != nil && excludedNamespaces.Contains(namespace) {
continue
}
state := item.GetState()
if counts[namespace] == nil {
counts[namespace] = make(map[string]int)
}
counts[namespace][state]++
}
// Update metrics
for namespace, stateCounts := range counts {
for state, count := range stateCounts {
UpdateResourceCount(resourceType, namespace, state, float64(count))
}
}
}
func updateDynamoGraphDeploymentCounts(ctx context.Context, c client.Client, excludedNamespaces ExcludedNamespaces, logger logr.Logger) {
dgdList := &v1alpha1.DynamoGraphDeploymentList{}
if err := c.List(ctx, dgdList); err != nil {
logger.Error(err, "failed to list DynamoGraphDeployments")
return
}
countResourcesByState(
dgdList.Items,
excludedNamespaces,
consts.ResourceTypeDynamoGraphDeployment,
func(d *v1alpha1.DynamoGraphDeployment) *v1alpha1.DynamoGraphDeployment { return d },
)
}
func updateDynamoComponentDeploymentCounts(ctx context.Context, c client.Client, excludedNamespaces ExcludedNamespaces, logger logr.Logger) {
dcdList := &v1alpha1.DynamoComponentDeploymentList{}
if err := c.List(ctx, dcdList); err != nil {
logger.Error(err, "failed to list DynamoComponentDeployments")
return
}
countResourcesByState(
dcdList.Items,
excludedNamespaces,
consts.ResourceTypeDynamoComponentDeployment,
func(d *v1alpha1.DynamoComponentDeployment) *v1alpha1.DynamoComponentDeployment { return d },
)
}
func updateDynamoModelCounts(ctx context.Context, c client.Client, excludedNamespaces ExcludedNamespaces, logger logr.Logger) {
dmList := &v1alpha1.DynamoModelList{}
if err := c.List(ctx, dmList); err != nil {
logger.Error(err, "failed to list DynamoModels")
return
}
countResourcesByState(
dmList.Items,
excludedNamespaces,
consts.ResourceTypeDynamoModel,
func(m *v1alpha1.DynamoModel) *v1alpha1.DynamoModel { return m },
)
}
func updateDynamoGraphDeploymentRequestCounts(ctx context.Context, c client.Client, excludedNamespaces ExcludedNamespaces, logger logr.Logger) {
dgdrList := &v1alpha1.DynamoGraphDeploymentRequestList{}
if err := c.List(ctx, dgdrList); err != nil {
logger.Error(err, "failed to list DynamoGraphDeploymentRequests")
return
}
countResourcesByState(
dgdrList.Items,
excludedNamespaces,
consts.ResourceTypeDynamoGraphDeploymentRequest,
func(d *v1alpha1.DynamoGraphDeploymentRequest) *v1alpha1.DynamoGraphDeploymentRequest { return d },
)
}
func updateDynamoGraphDeploymentScalingAdapterCounts(ctx context.Context, c client.Client, excludedNamespaces ExcludedNamespaces, logger logr.Logger) {
dgdsaList := &v1alpha1.DynamoGraphDeploymentScalingAdapterList{}
if err := c.List(ctx, dgdsaList); err != nil {
logger.Error(err, "failed to list DynamoGraphDeploymentScalingAdapters")
return
}
countResourcesByState(
dgdsaList.Items,
excludedNamespaces,
consts.ResourceTypeDynamoGraphDeploymentScalingAdapter,
func(d *v1alpha1.DynamoGraphDeploymentScalingAdapter) *v1alpha1.DynamoGraphDeploymentScalingAdapter {
return d
},
)
}
/*
* SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package observability
import (
"context"
"time"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
)
// ObservedValidator wraps any CustomValidator and automatically records metrics
// for webhook validation duration, results, and denials.
type ObservedValidator struct {
admission.CustomValidator
resourceType string
}
// NewObservedValidator creates a new ObservedValidator wrapper
func NewObservedValidator(v admission.CustomValidator, resourceType string) *ObservedValidator {
return &ObservedValidator{
CustomValidator: v,
resourceType: resourceType,
}
}
// ValidateCreate wraps the underlying validator's ValidateCreate method with metrics collection
func (m *ObservedValidator) ValidateCreate(ctx context.Context, obj runtime.Object) (admission.Warnings, error) {
startTime := time.Now()
warnings, err := m.CustomValidator.ValidateCreate(ctx, obj)
duration := time.Since(startTime)
allowed := err == nil
RecordWebhookAdmission(m.resourceType, "CREATE", allowed, duration)
if !allowed {
RecordWebhookDenial(m.resourceType, "CREATE", err)
}
return warnings, err
}
// ValidateUpdate wraps the underlying validator's ValidateUpdate method with metrics collection
func (m *ObservedValidator) ValidateUpdate(ctx context.Context, oldObj, newObj runtime.Object) (admission.Warnings, error) {
startTime := time.Now()
warnings, err := m.CustomValidator.ValidateUpdate(ctx, oldObj, newObj)
duration := time.Since(startTime)
allowed := err == nil
RecordWebhookAdmission(m.resourceType, "UPDATE", allowed, duration)
if !allowed {
RecordWebhookDenial(m.resourceType, "UPDATE", err)
}
return warnings, err
}
// ValidateDelete wraps the underlying validator's ValidateDelete method with metrics collection
func (m *ObservedValidator) ValidateDelete(ctx context.Context, obj runtime.Object) (admission.Warnings, error) {
startTime := time.Now()
warnings, err := m.CustomValidator.ValidateDelete(ctx, obj)
duration := time.Since(startTime)
allowed := err == nil
RecordWebhookAdmission(m.resourceType, "DELETE", allowed, duration)
if !allowed {
RecordWebhookDenial(m.resourceType, "DELETE", err)
}
return warnings, err
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment