Unverified Commit de6e4f30 authored by Julien Mancuso's avatar Julien Mancuso Committed by GitHub
Browse files

feat: add Topology Aware Scheduling support (#6375)


Signed-off-by: default avatarJulien Mancuso <jmancuso@nvidia.com>
parent 23fc8db7
...@@ -345,8 +345,13 @@ def extract_markdown_links( ...@@ -345,8 +345,13 @@ def extract_markdown_links(
# This captures the link text and URL, excluding image links (![text](url)) # This captures the link text and URL, excluding image links (![text](url))
link_pattern = r"(?<!\!)\[([^\]]+)\]\(([^)]+)\)" link_pattern = r"(?<!\!)\[([^\]]+)\]\(([^)]+)\)"
# Pattern to strip inline code spans so regex/code inside backticks
# is not misinterpreted as markdown links.
inline_code_pattern = r"`[^`]+`"
for line_num, line in enumerate(content.split("\n"), 1): for line_num, line in enumerate(content.split("\n"), 1):
matches = re.finditer(link_pattern, line) stripped_line = re.sub(inline_code_pattern, "", line)
matches = re.finditer(link_pattern, stripped_line)
for match in matches: for match in matches:
link_text = match.group(1) link_text = match.group(1)
link_url = match.group(2) link_url = match.group(2)
......
...@@ -54,4 +54,4 @@ generate-helm-docs: helm-docs-install ## Generate README.md from values.yaml and ...@@ -54,4 +54,4 @@ generate-helm-docs: helm-docs-install ## Generate README.md from values.yaml and
.PHONY: helm-docs-clean .PHONY: helm-docs-clean
helm-docs-clean: ## Remove generated helm documentation helm-docs-clean: ## Remove generated helm documentation
@rm -f README.md @rm -f README.md
@echo "Cleaned generated README.md" @echo "Cleaned generated README.md"
\ No newline at end of file
...@@ -11217,6 +11217,21 @@ spec: ...@@ -11217,6 +11217,21 @@ spec:
subComponentType: subComponentType:
description: SubComponentType indicates the sub-role of this component (for example, "prefill"). description: SubComponentType indicates the sub-role of this component (for example, "prefill").
type: string type: string
topologyConstraint:
description: |-
TopologyConstraint for this service. packDomain is required.
When both this and spec.topologyConstraint.packDomain are set, packDomain
must be narrower than or equal to the spec-level packDomain.
properties:
packDomain:
description: |-
PackDomain is the topology domain to pack pods within. Must match a
domain defined in the referenced ClusterTopology CR.
pattern: ^[a-z0-9]([a-z0-9-]*[a-z0-9])?$
type: string
required:
- packDomain
type: object
volumeMounts: volumeMounts:
description: VolumeMounts references PVCs defined at the top level for volumes to be mounted by the component. description: VolumeMounts references PVCs defined at the top level for volumes to be mounted by the component.
items: items:
......
...@@ -11440,6 +11440,21 @@ spec: ...@@ -11440,6 +11440,21 @@ spec:
subComponentType: subComponentType:
description: SubComponentType indicates the sub-role of this component (for example, "prefill"). description: SubComponentType indicates the sub-role of this component (for example, "prefill").
type: string type: string
topologyConstraint:
description: |-
TopologyConstraint for this service. packDomain is required.
When both this and spec.topologyConstraint.packDomain are set, packDomain
must be narrower than or equal to the spec-level packDomain.
properties:
packDomain:
description: |-
PackDomain is the topology domain to pack pods within. Must match a
domain defined in the referenced ClusterTopology CR.
pattern: ^[a-z0-9]([a-z0-9-]*[a-z0-9])?$
type: string
required:
- packDomain
type: object
volumeMounts: volumeMounts:
description: VolumeMounts references PVCs defined at the top level for volumes to be mounted by the component. description: VolumeMounts references PVCs defined at the top level for volumes to be mounted by the component.
items: items:
...@@ -11468,6 +11483,28 @@ spec: ...@@ -11468,6 +11483,28 @@ spec:
description: Services are the services to deploy as part of this deployment. description: Services are the services to deploy as part of this deployment.
maxProperties: 25 maxProperties: 25
type: object type: object
topologyConstraint:
description: |-
TopologyConstraint is the deployment-level topology constraint.
When set, topologyProfile is required and names the ClusterTopology CR to use.
packDomain is optional here — it can be omitted when only services carry constraints.
Services without their own topologyConstraint inherit from this value.
properties:
packDomain:
description: |-
PackDomain is the default topology domain to pack pods within.
Optional — omit when only services carry constraints.
pattern: ^[a-z0-9]([a-z0-9-]*[a-z0-9])?$
type: string
topologyProfile:
description: |-
TopologyProfile is the name of the ClusterTopology CR that defines the
topology hierarchy for this deployment.
minLength: 1
type: string
required:
- topologyProfile
type: object
type: object type: object
status: status:
description: Status reflects the current observed state of this graph deployment. description: Status reflects the current observed state of this graph deployment.
......
...@@ -145,13 +145,6 @@ rules: ...@@ -145,13 +145,6 @@ rules:
- get - get
- patch - patch
- update - update
- apiGroups:
- scheduling.run.ai
resources:
- queues
verbs:
- get
- list
- apiGroups: - apiGroups:
- apps - apps
resources: resources:
...@@ -517,6 +510,46 @@ subjects: ...@@ -517,6 +510,46 @@ subjects:
name: '{{ include "dynamo-operator.fullname" . }}-controller-manager' name: '{{ include "dynamo-operator.fullname" . }}-controller-manager'
namespace: '{{ .Release.Namespace }}' namespace: '{{ .Release.Namespace }}'
--- ---
# ClusterRole for Grove ClusterTopology access
# This is always a ClusterRole since ClusterTopology resources are cluster-scoped
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: {{ include "dynamo-operator.fullname" . }}-{{ .Release.Namespace }}-clustertopology-reader
labels:
app.kubernetes.io/component: rbac
app.kubernetes.io/created-by: dynamo-operator
app.kubernetes.io/part-of: dynamo-operator
{{- include "dynamo-operator.labels" . | nindent 4 }}
rules:
- apiGroups:
- grove.io
resources:
- clustertopologies
verbs:
- get
- list
- watch
---
# ClusterRoleBinding for Grove ClusterTopology access
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: {{ include "dynamo-operator.fullname" . }}-{{ .Release.Namespace }}-clustertopology-reader-binding
labels:
app.kubernetes.io/component: rbac
app.kubernetes.io/created-by: dynamo-operator
app.kubernetes.io/part-of: dynamo-operator
{{- include "dynamo-operator.labels" . | nindent 4 }}
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: {{ include "dynamo-operator.fullname" . }}-{{ .Release.Namespace }}-clustertopology-reader
subjects:
- kind: ServiceAccount
name: '{{ include "dynamo-operator.fullname" . }}-controller-manager'
namespace: '{{ .Release.Namespace }}'
---
# ClusterRole for kai-scheduler queue access # ClusterRole for kai-scheduler queue access
# This is always a ClusterRole since Queue resources are cluster-scoped # This is always a ClusterRole since Queue resources are cluster-scoped
apiVersion: rbac.authorization.k8s.io/v1 apiVersion: rbac.authorization.k8s.io/v1
......
...@@ -143,6 +143,12 @@ type DynamoComponentDeploymentSharedSpec struct { ...@@ -143,6 +143,12 @@ type DynamoComponentDeploymentSharedSpec struct {
// When enabled, pods can be restored from a checkpoint files for faster cold start. // When enabled, pods can be restored from a checkpoint files for faster cold start.
// +optional // +optional
Checkpoint *ServiceCheckpointConfig `json:"checkpoint,omitempty"` Checkpoint *ServiceCheckpointConfig `json:"checkpoint,omitempty"`
// TopologyConstraint for this service. packDomain is required.
// When both this and spec.topologyConstraint.packDomain are set, packDomain
// must be narrower than or equal to the spec-level packDomain.
// +optional
TopologyConstraint *TopologyConstraint `json:"topologyConstraint,omitempty"`
} }
type MultinodeSpec struct { type MultinodeSpec struct {
......
...@@ -84,6 +84,13 @@ type DynamoGraphDeploymentSpec struct { ...@@ -84,6 +84,13 @@ type DynamoGraphDeploymentSpec struct {
// Restart specifies the restart policy for the graph deployment. // Restart specifies the restart policy for the graph deployment.
// +kubebuilder:validation:Optional // +kubebuilder:validation:Optional
Restart *Restart `json:"restart,omitempty"` Restart *Restart `json:"restart,omitempty"`
// TopologyConstraint is the deployment-level topology constraint.
// When set, topologyProfile is required and names the ClusterTopology CR to use.
// packDomain is optional here — it can be omitted when only services carry constraints.
// Services without their own topologyConstraint inherit from this value.
// +optional
TopologyConstraint *SpecTopologyConstraint `json:"topologyConstraint,omitempty"`
} }
type Restart struct { type Restart struct {
...@@ -319,6 +326,19 @@ func (s *DynamoGraphDeployment) AddStatusCondition(condition metav1.Condition) { ...@@ -319,6 +326,19 @@ func (s *DynamoGraphDeployment) AddStatusCondition(condition metav1.Condition) {
s.Status.Conditions = append(s.Status.Conditions, condition) s.Status.Conditions = append(s.Status.Conditions, condition)
} }
// HasAnyTopologyConstraint reports whether any topology constraint is set at any level.
func (s *DynamoGraphDeployment) HasAnyTopologyConstraint() bool {
if s.Spec.TopologyConstraint != nil {
return true
}
for _, svc := range s.Spec.Services {
if svc != nil && svc.TopologyConstraint != nil {
return true
}
}
return false
}
// HasAnyMultinodeService reports whether any service in the graph is configured with more than one node. // HasAnyMultinodeService reports whether any service in the graph is configured with more than one node.
func (s *DynamoGraphDeployment) HasAnyMultinodeService() bool { func (s *DynamoGraphDeployment) HasAnyMultinodeService() bool {
for _, svc := range s.Spec.Services { for _, svc := range s.Spec.Services {
......
/*
* SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package v1alpha1
import "regexp"
const (
// ConditionTypeTopologyLevelsAvailable indicates whether the topology levels
// referenced by the deployment's constraints are available in the cluster topology.
ConditionTypeTopologyLevelsAvailable = "TopologyLevelsAvailable"
// ConditionReasonAllTopologyLevelsAvailable indicates all required topology levels
// are available in the cluster topology.
ConditionReasonAllTopologyLevelsAvailable = "AllTopologyLevelsAvailable"
// ConditionReasonTopologyLevelsUnavailable indicates one or more required topology
// levels are no longer available.
ConditionReasonTopologyLevelsUnavailable = "TopologyLevelsUnavailable"
// ConditionReasonTopologyDefinitionNotFound indicates the topology definition
// resource was not found by the framework.
ConditionReasonTopologyDefinitionNotFound = "TopologyDefinitionNotFound"
// ConditionReasonTopologyConditionPending indicates the scheduling framework
// has not yet reported a topology condition.
ConditionReasonTopologyConditionPending = "TopologyConditionPending"
)
// SpecTopologyConstraint defines deployment-level topology placement requirements.
// It carries both the topology profile (which ClusterTopology CR to use) and an
// optional default pack domain that services without their own constraint inherit.
type SpecTopologyConstraint struct {
// TopologyProfile is the name of the ClusterTopology CR that defines the
// topology hierarchy for this deployment.
// +kubebuilder:validation:MinLength=1
TopologyProfile string `json:"topologyProfile"`
// PackDomain is the default topology domain to pack pods within.
// Optional — omit when only services carry constraints.
// +optional
PackDomain TopologyDomain `json:"packDomain,omitempty"`
}
// TopologyConstraint defines service-level topology placement requirements.
// The topology profile is inherited from the deployment-level SpecTopologyConstraint;
// only the pack domain is specified here.
type TopologyConstraint struct {
// PackDomain is the topology domain to pack pods within. Must match a
// domain defined in the referenced ClusterTopology CR.
PackDomain TopologyDomain `json:"packDomain"`
}
// TopologyDomain is a free-form topology level identifier.
// Domain names are defined by the cluster admin in the ClusterTopology CR.
// Common examples: "region", "zone", "datacenter", "block", "rack", "host", "numa".
// Must match `^[a-z0-9]([a-z0-9-]*[a-z0-9])?$` (lowercase alphanumeric,
// may contain hyphens but must not start or end with one).
// +kubebuilder:validation:Pattern=`^[a-z0-9]([a-z0-9-]*[a-z0-9])?$`
type TopologyDomain string
var topologyDomainRegex = regexp.MustCompile(`^[a-z0-9]([a-z0-9-]*[a-z0-9])?$`)
// IsValidTopologyDomainFormat returns true if the domain matches the allowed format.
func IsValidTopologyDomainFormat(d TopologyDomain) bool {
return topologyDomainRegex.MatchString(string(d))
}
...@@ -552,6 +552,11 @@ func (in *DynamoComponentDeploymentSharedSpec) DeepCopyInto(out *DynamoComponent ...@@ -552,6 +552,11 @@ func (in *DynamoComponentDeploymentSharedSpec) DeepCopyInto(out *DynamoComponent
*out = new(ServiceCheckpointConfig) *out = new(ServiceCheckpointConfig)
(*in).DeepCopyInto(*out) (*in).DeepCopyInto(*out)
} }
if in.TopologyConstraint != nil {
in, out := &in.TopologyConstraint, &out.TopologyConstraint
*out = new(TopologyConstraint)
**out = **in
}
} }
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DynamoComponentDeploymentSharedSpec. // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DynamoComponentDeploymentSharedSpec.
...@@ -951,6 +956,11 @@ func (in *DynamoGraphDeploymentSpec) DeepCopyInto(out *DynamoGraphDeploymentSpec ...@@ -951,6 +956,11 @@ func (in *DynamoGraphDeploymentSpec) DeepCopyInto(out *DynamoGraphDeploymentSpec
*out = new(Restart) *out = new(Restart)
(*in).DeepCopyInto(*out) (*in).DeepCopyInto(*out)
} }
if in.TopologyConstraint != nil {
in, out := &in.TopologyConstraint, &out.TopologyConstraint
*out = new(SpecTopologyConstraint)
**out = **in
}
} }
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DynamoGraphDeploymentSpec. // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DynamoGraphDeploymentSpec.
...@@ -1681,6 +1691,36 @@ func (in *SharedMemorySpec) DeepCopy() *SharedMemorySpec { ...@@ -1681,6 +1691,36 @@ func (in *SharedMemorySpec) DeepCopy() *SharedMemorySpec {
return out return out
} }
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *SpecTopologyConstraint) DeepCopyInto(out *SpecTopologyConstraint) {
*out = *in
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SpecTopologyConstraint.
func (in *SpecTopologyConstraint) DeepCopy() *SpecTopologyConstraint {
if in == nil {
return nil
}
out := new(SpecTopologyConstraint)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *TopologyConstraint) DeepCopyInto(out *TopologyConstraint) {
*out = *in
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TopologyConstraint.
func (in *TopologyConstraint) DeepCopy() *TopologyConstraint {
if in == nil {
return nil
}
out := new(TopologyConstraint)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *VolumeMount) DeepCopyInto(out *VolumeMount) { func (in *VolumeMount) DeepCopyInto(out *VolumeMount) {
*out = *in *out = *in
......
...@@ -11217,6 +11217,21 @@ spec: ...@@ -11217,6 +11217,21 @@ spec:
subComponentType: subComponentType:
description: SubComponentType indicates the sub-role of this component (for example, "prefill"). description: SubComponentType indicates the sub-role of this component (for example, "prefill").
type: string type: string
topologyConstraint:
description: |-
TopologyConstraint for this service. packDomain is required.
When both this and spec.topologyConstraint.packDomain are set, packDomain
must be narrower than or equal to the spec-level packDomain.
properties:
packDomain:
description: |-
PackDomain is the topology domain to pack pods within. Must match a
domain defined in the referenced ClusterTopology CR.
pattern: ^[a-z0-9]([a-z0-9-]*[a-z0-9])?$
type: string
required:
- packDomain
type: object
volumeMounts: volumeMounts:
description: VolumeMounts references PVCs defined at the top level for volumes to be mounted by the component. description: VolumeMounts references PVCs defined at the top level for volumes to be mounted by the component.
items: items:
......
...@@ -11440,6 +11440,21 @@ spec: ...@@ -11440,6 +11440,21 @@ spec:
subComponentType: subComponentType:
description: SubComponentType indicates the sub-role of this component (for example, "prefill"). description: SubComponentType indicates the sub-role of this component (for example, "prefill").
type: string type: string
topologyConstraint:
description: |-
TopologyConstraint for this service. packDomain is required.
When both this and spec.topologyConstraint.packDomain are set, packDomain
must be narrower than or equal to the spec-level packDomain.
properties:
packDomain:
description: |-
PackDomain is the topology domain to pack pods within. Must match a
domain defined in the referenced ClusterTopology CR.
pattern: ^[a-z0-9]([a-z0-9-]*[a-z0-9])?$
type: string
required:
- packDomain
type: object
volumeMounts: volumeMounts:
description: VolumeMounts references PVCs defined at the top level for volumes to be mounted by the component. description: VolumeMounts references PVCs defined at the top level for volumes to be mounted by the component.
items: items:
...@@ -11468,6 +11483,28 @@ spec: ...@@ -11468,6 +11483,28 @@ spec:
description: Services are the services to deploy as part of this deployment. description: Services are the services to deploy as part of this deployment.
maxProperties: 25 maxProperties: 25
type: object type: object
topologyConstraint:
description: |-
TopologyConstraint is the deployment-level topology constraint.
When set, topologyProfile is required and names the ClusterTopology CR to use.
packDomain is optional here — it can be omitted when only services carry constraints.
Services without their own topologyConstraint inherit from this value.
properties:
packDomain:
description: |-
PackDomain is the default topology domain to pack pods within.
Optional — omit when only services carry constraints.
pattern: ^[a-z0-9]([a-z0-9-]*[a-z0-9])?$
type: string
topologyProfile:
description: |-
TopologyProfile is the name of the ClusterTopology CR that defines the
topology hierarchy for this deployment.
minLength: 1
type: string
required:
- topologyProfile
type: object
type: object type: object
status: status:
description: Status reflects the current observed state of this graph deployment. description: Status reflects the current observed state of this graph deployment.
......
...@@ -118,6 +118,14 @@ rules: ...@@ -118,6 +118,14 @@ rules:
- patch - patch
- update - update
- watch - watch
- apiGroups:
- grove.io
resources:
- clustertopologies
verbs:
- get
- list
- watch
- apiGroups: - apiGroups:
- grove.io - grove.io
resources: resources:
......
...@@ -23,8 +23,10 @@ import ( ...@@ -23,8 +23,10 @@ import (
"sort" "sort"
"strings" "strings"
groveconstants "github.com/ai-dynamo/grove/operator/api/common/constants"
grovev1alpha1 "github.com/ai-dynamo/grove/operator/api/core/v1alpha1" grovev1alpha1 "github.com/ai-dynamo/grove/operator/api/core/v1alpha1"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/checkpoint" "github.com/ai-dynamo/dynamo/deploy/operator/internal/checkpoint"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/discovery" "github.com/ai-dynamo/dynamo/deploy/operator/internal/discovery"
...@@ -85,6 +87,7 @@ type DynamoGraphDeploymentReconciler struct { ...@@ -85,6 +87,7 @@ type DynamoGraphDeploymentReconciler struct {
// +kubebuilder:rbac:groups=grove.io,resources=podcliquesets,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=grove.io,resources=podcliquesets,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=grove.io,resources=podcliques/scale,verbs=get;update;patch // +kubebuilder:rbac:groups=grove.io,resources=podcliques/scale,verbs=get;update;patch
// +kubebuilder:rbac:groups=grove.io,resources=podcliquescalinggroups/scale,verbs=get;update;patch // +kubebuilder:rbac:groups=grove.io,resources=podcliquescalinggroups/scale,verbs=get;update;patch
// +kubebuilder:rbac:groups=grove.io,resources=clustertopologies,verbs=get;list;watch
// +kubebuilder:rbac:groups=scheduling.run.ai,resources=queues,verbs=get;list // +kubebuilder:rbac:groups=scheduling.run.ai,resources=queues,verbs=get;list
// +kubebuilder:rbac:groups=inference.networking.k8s.io,resources=inferencepools,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=inference.networking.k8s.io,resources=inferencepools,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch // +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch
...@@ -143,6 +146,8 @@ func (r *DynamoGraphDeploymentReconciler) Reconcile(ctx context.Context, req ctr ...@@ -143,6 +146,8 @@ func (r *DynamoGraphDeploymentReconciler) Reconcile(ctx context.Context, req ctr
if err == nil { if err == nil {
dynamoDeployment.Status.ObservedGeneration = dynamoDeployment.Generation dynamoDeployment.Status.ObservedGeneration = dynamoDeployment.Generation
} }
// Propagate topology condition from framework (e.g., Grove PCS) to DGD status
r.propagateTopologyCondition(ctx, dynamoDeployment)
updateErr := r.Status().Update(ctx, dynamoDeployment) updateErr := r.Status().Update(ctx, dynamoDeployment)
if updateErr != nil { if updateErr != nil {
...@@ -418,6 +423,75 @@ func (r *DynamoGraphDeploymentReconciler) getUpdatedInProgressForGrove(ctx conte ...@@ -418,6 +423,75 @@ func (r *DynamoGraphDeploymentReconciler) getUpdatedInProgressForGrove(ctx conte
return updatedInProgress return updatedInProgress
} }
// propagateTopologyCondition reads the PCS topology condition from Grove and maps it
// to a TopologyLevelsAvailable condition on the DGD. This is a no-op when no
// topology constraints are set or when the Grove pathway is not in use.
func (r *DynamoGraphDeploymentReconciler) propagateTopologyCondition(ctx context.Context, dgd *nvidiacomv1alpha1.DynamoGraphDeployment) {
if !dgd.HasAnyTopologyConstraint() || !r.isGrovePathway(dgd) {
return
}
logger := log.FromContext(ctx)
pcs := &grovev1alpha1.PodCliqueSet{}
if err := r.Client.Get(ctx, types.NamespacedName{Name: dgd.Name, Namespace: dgd.Namespace}, pcs); err != nil {
if errors.IsNotFound(err) {
return
}
logger.V(1).Info("failed to read PCS for topology condition propagation", "error", err)
return
}
// Look for Grove's TopologyLevelsUnavailable condition on the PCS.
var groveTopoCond *metav1.Condition
for i := range pcs.Status.Conditions {
if pcs.Status.Conditions[i].Type == groveconstants.ConditionTopologyLevelsUnavailable {
groveTopoCond = &pcs.Status.Conditions[i]
break
}
}
var dynamoCond metav1.Condition
if groveTopoCond == nil {
// No topology condition from Grove yet — don't assume healthy.
dynamoCond = metav1.Condition{
Type: nvidiacomv1alpha1.ConditionTypeTopologyLevelsAvailable,
Status: metav1.ConditionUnknown,
Reason: nvidiacomv1alpha1.ConditionReasonTopologyConditionPending,
Message: "Waiting for topology condition from the scheduling framework",
LastTransitionTime: metav1.Now(),
}
} else if groveTopoCond.Status == metav1.ConditionTrue {
// Grove reports topology levels are unavailable.
reason := nvidiacomv1alpha1.ConditionReasonTopologyLevelsUnavailable
if groveTopoCond.Reason == groveconstants.ConditionReasonClusterTopologyNotFound {
reason = nvidiacomv1alpha1.ConditionReasonTopologyDefinitionNotFound
}
dynamoCond = metav1.Condition{
Type: nvidiacomv1alpha1.ConditionTypeTopologyLevelsAvailable,
Status: metav1.ConditionFalse,
Reason: reason,
Message: groveTopoCond.Message,
LastTransitionTime: metav1.Now(),
}
prev := meta.FindStatusCondition(dgd.Status.Conditions, nvidiacomv1alpha1.ConditionTypeTopologyLevelsAvailable)
if prev == nil || prev.Status != metav1.ConditionFalse || prev.Reason != reason || prev.Message != groveTopoCond.Message {
logger.Info("Topology constraints no longer enforced", "reason", reason, "message", groveTopoCond.Message)
r.Recorder.Eventf(dgd, corev1.EventTypeWarning, reason, "Topology constraints no longer enforced: %s", groveTopoCond.Message)
}
} else {
// Grove's TopologyLevelsUnavailable is False → all levels available.
dynamoCond = metav1.Condition{
Type: nvidiacomv1alpha1.ConditionTypeTopologyLevelsAvailable,
Status: metav1.ConditionTrue,
Reason: nvidiacomv1alpha1.ConditionReasonAllTopologyLevelsAvailable,
Message: "All required topology levels are available in the cluster topology",
LastTransitionTime: metav1.Now(),
}
}
dgd.AddStatusCondition(dynamoCond)
}
func isRestartAlreadyProcessed(dgd *nvidiacomv1alpha1.DynamoGraphDeployment) bool { func isRestartAlreadyProcessed(dgd *nvidiacomv1alpha1.DynamoGraphDeployment) bool {
if dgd.Spec.Restart == nil || dgd.Spec.Restart.ID == "" { if dgd.Spec.Restart == nil || dgd.Spec.Restart.ID == "" {
return true return true
......
...@@ -26,6 +26,7 @@ import ( ...@@ -26,6 +26,7 @@ import (
"github.com/ai-dynamo/dynamo/deploy/operator/internal/checkpoint" "github.com/ai-dynamo/dynamo/deploy/operator/internal/checkpoint"
commonconsts "github.com/ai-dynamo/dynamo/deploy/operator/internal/consts" commonconsts "github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/controller_common" "github.com/ai-dynamo/dynamo/deploy/operator/internal/controller_common"
groveconstants "github.com/ai-dynamo/grove/operator/api/common/constants"
grovev1alpha1 "github.com/ai-dynamo/grove/operator/api/core/v1alpha1" grovev1alpha1 "github.com/ai-dynamo/grove/operator/api/core/v1alpha1"
"github.com/onsi/gomega" "github.com/onsi/gomega"
autoscalingv1 "k8s.io/api/autoscaling/v1" autoscalingv1 "k8s.io/api/autoscaling/v1"
...@@ -2383,3 +2384,230 @@ func Test_reconcileDynamoComponentsDeployments(t *testing.T) { ...@@ -2383,3 +2384,230 @@ func Test_reconcileDynamoComponentsDeployments(t *testing.T) {
}) })
} }
} }
func TestPropagateTopologyCondition(t *testing.T) {
tests := []struct {
name string
dgd *v1alpha1.DynamoGraphDeployment
pcs *grovev1alpha1.PodCliqueSet
groveEnabled bool
wantCondition bool
wantStatus metav1.ConditionStatus
wantReason string
wantEventCount int
}{
{
name: "no topology constraints - no condition added",
dgd: &v1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "default"},
Spec: v1alpha1.DynamoGraphDeploymentSpec{
Services: map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
"worker": {},
},
},
},
groveEnabled: true,
wantCondition: false,
},
{
name: "topology set but Grove not enabled - no condition added",
dgd: &v1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test", Namespace: "default",
Annotations: map[string]string{commonconsts.KubeAnnotationEnableGrove: "false"},
},
Spec: v1alpha1.DynamoGraphDeploymentSpec{
TopologyConstraint: &v1alpha1.SpecTopologyConstraint{TopologyProfile: "test-topology", PackDomain: v1alpha1.TopologyDomain("rack")},
},
},
groveEnabled: false,
wantCondition: false,
},
{
name: "topology set, PCS has no topology condition - unknown",
dgd: &v1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "default"},
Spec: v1alpha1.DynamoGraphDeploymentSpec{
TopologyConstraint: &v1alpha1.SpecTopologyConstraint{TopologyProfile: "test-topology", PackDomain: v1alpha1.TopologyDomain("rack")},
},
},
pcs: &grovev1alpha1.PodCliqueSet{
ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "default"},
Status: grovev1alpha1.PodCliqueSetStatus{},
},
groveEnabled: true,
wantCondition: true,
wantStatus: metav1.ConditionUnknown,
wantReason: v1alpha1.ConditionReasonTopologyConditionPending,
},
{
name: "PCS reports TopologyLevelsUnavailable=True with ClusterTopologyLevelsUnavailable",
dgd: &v1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "default"},
Spec: v1alpha1.DynamoGraphDeploymentSpec{
TopologyConstraint: &v1alpha1.SpecTopologyConstraint{TopologyProfile: "test-topology", PackDomain: v1alpha1.TopologyDomain("rack")},
},
},
pcs: &grovev1alpha1.PodCliqueSet{
ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "default"},
Status: grovev1alpha1.PodCliqueSetStatus{
Conditions: []metav1.Condition{
{
Type: groveconstants.ConditionTopologyLevelsUnavailable,
Status: metav1.ConditionTrue,
Reason: groveconstants.ConditionReasonTopologyLevelsUnavailable,
Message: "Topology level 'rack' is no longer available",
},
},
},
},
groveEnabled: true,
wantCondition: true,
wantStatus: metav1.ConditionFalse,
wantReason: v1alpha1.ConditionReasonTopologyLevelsUnavailable,
wantEventCount: 1,
},
{
name: "PCS reports TopologyLevelsUnavailable=True with ClusterTopologyNotFound",
dgd: &v1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "default"},
Spec: v1alpha1.DynamoGraphDeploymentSpec{
TopologyConstraint: &v1alpha1.SpecTopologyConstraint{TopologyProfile: "test-topology", PackDomain: v1alpha1.TopologyDomain("rack")},
},
},
pcs: &grovev1alpha1.PodCliqueSet{
ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "default"},
Status: grovev1alpha1.PodCliqueSetStatus{
Conditions: []metav1.Condition{
{
Type: groveconstants.ConditionTopologyLevelsUnavailable,
Status: metav1.ConditionTrue,
Reason: groveconstants.ConditionReasonClusterTopologyNotFound,
Message: "ClusterTopology 'default' not found",
},
},
},
},
groveEnabled: true,
wantCondition: true,
wantStatus: metav1.ConditionFalse,
wantReason: v1alpha1.ConditionReasonTopologyDefinitionNotFound,
wantEventCount: 1,
},
{
name: "PCS reports TopologyLevelsUnavailable=False - all levels available",
dgd: &v1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "default"},
Spec: v1alpha1.DynamoGraphDeploymentSpec{
TopologyConstraint: &v1alpha1.SpecTopologyConstraint{TopologyProfile: "test-topology", PackDomain: v1alpha1.TopologyDomain("rack")},
},
},
pcs: &grovev1alpha1.PodCliqueSet{
ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "default"},
Status: grovev1alpha1.PodCliqueSetStatus{
Conditions: []metav1.Condition{
{
Type: groveconstants.ConditionTopologyLevelsUnavailable,
Status: metav1.ConditionFalse,
Reason: groveconstants.ConditionReasonAllTopologyLevelsAvailable,
Message: "All topology levels available",
},
},
},
},
groveEnabled: true,
wantCondition: true,
wantStatus: metav1.ConditionTrue,
wantReason: v1alpha1.ConditionReasonAllTopologyLevelsAvailable,
},
{
name: "service-only topology constraint triggers condition propagation",
dgd: &v1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "default"},
Spec: v1alpha1.DynamoGraphDeploymentSpec{
TopologyConstraint: &v1alpha1.SpecTopologyConstraint{TopologyProfile: "test-topology"},
Services: map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
"worker": {
TopologyConstraint: &v1alpha1.TopologyConstraint{PackDomain: v1alpha1.TopologyDomain("rack")},
},
},
},
},
pcs: &grovev1alpha1.PodCliqueSet{
ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "default"},
Status: grovev1alpha1.PodCliqueSetStatus{},
},
groveEnabled: true,
wantCondition: true,
wantStatus: metav1.ConditionUnknown,
wantReason: v1alpha1.ConditionReasonTopologyConditionPending,
},
{
name: "PCS not found yet - no condition added",
dgd: &v1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "default"},
Spec: v1alpha1.DynamoGraphDeploymentSpec{
TopologyConstraint: &v1alpha1.SpecTopologyConstraint{TopologyProfile: "test-topology", PackDomain: v1alpha1.TopologyDomain("rack")},
},
},
pcs: nil,
groveEnabled: true,
wantCondition: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := gomega.NewGomegaWithT(t)
s := scheme.Scheme
err := v1alpha1.AddToScheme(s)
g.Expect(err).NotTo(gomega.HaveOccurred())
err = grovev1alpha1.AddToScheme(s)
g.Expect(err).NotTo(gomega.HaveOccurred())
objs := []client.Object{}
if tt.pcs != nil {
objs = append(objs, tt.pcs)
}
fakeClient := fake.NewClientBuilder().WithScheme(s).WithObjects(objs...).Build()
recorder := record.NewFakeRecorder(10)
reconciler := &DynamoGraphDeploymentReconciler{
Client: fakeClient,
Recorder: recorder,
RuntimeConfig: &controller_common.RuntimeConfig{
GroveEnabled: tt.groveEnabled,
},
}
ctx := context.Background()
reconciler.propagateTopologyCondition(ctx, tt.dgd)
var topoCond *metav1.Condition
for i := range tt.dgd.Status.Conditions {
if tt.dgd.Status.Conditions[i].Type == v1alpha1.ConditionTypeTopologyLevelsAvailable {
topoCond = &tt.dgd.Status.Conditions[i]
break
}
}
if !tt.wantCondition {
g.Expect(topoCond).To(gomega.BeNil(), "expected no TopologyLevelsAvailable condition")
return
}
g.Expect(topoCond).NotTo(gomega.BeNil(), "expected TopologyLevelsAvailable condition to be set")
g.Expect(topoCond.Status).To(gomega.Equal(tt.wantStatus))
g.Expect(topoCond.Reason).To(gomega.Equal(tt.wantReason))
close(recorder.Events)
eventCount := 0
for range recorder.Events {
eventCount++
}
g.Expect(eventCount).To(gomega.Equal(tt.wantEventCount))
})
}
}
...@@ -1378,6 +1378,10 @@ func GenerateGrovePodCliqueSet( ...@@ -1378,6 +1378,10 @@ func GenerateGrovePodCliqueSet(
gangSet.Spec.Template.TerminationDelay = &operatorConfig.Orchestrators.Grove.TerminationDelay gangSet.Spec.Template.TerminationDelay = &operatorConfig.Orchestrators.Grove.TerminationDelay
} }
// Inject deployment-level topology constraint (PCS template).
// specToGroveTopologyConstraint returns nil when input is nil, so this is a no-op without TAS.
gangSet.Spec.Template.TopologyConstraint = specToGroveTopologyConstraint(dynamoDeployment.Spec.TopologyConstraint)
// Validate kai-scheduler queue once if kai-scheduler is enabled // Validate kai-scheduler queue once if kai-scheduler is enabled
var validatedQueueName string var validatedQueueName string
if runtimeConfig.GroveEnabled && runtimeConfig.KaiSchedulerEnabled { if runtimeConfig.GroveEnabled && runtimeConfig.KaiSchedulerEnabled {
...@@ -1444,6 +1448,13 @@ func GenerateGrovePodCliqueSet( ...@@ -1444,6 +1448,13 @@ func GenerateGrovePodCliqueSet(
PodSpec: *podSpec, PodSpec: *podSpec,
}, },
} }
// For single-node services, set topology constraint directly on the clique.
// For multinode services, the constraint goes on the PCSG instead;
// child cliques inherit from PCSG and should NOT have explicit constraints.
if !isMultinode {
clique.TopologyConstraint = toGroveTopologyConstraint(component.TopologyConstraint)
}
labels, err := generateLabels(component, dynamoDeployment, serviceName) labels, err := generateLabels(component, dynamoDeployment, serviceName)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to generate labels: %w", err) return nil, fmt.Errorf("failed to generate labels: %w", err)
...@@ -1485,10 +1496,11 @@ func GenerateGrovePodCliqueSet( ...@@ -1485,10 +1496,11 @@ func GenerateGrovePodCliqueSet(
if isMultinode { if isMultinode {
scalingGroups = append(scalingGroups, grovev1alpha1.PodCliqueScalingGroupConfig{ scalingGroups = append(scalingGroups, grovev1alpha1.PodCliqueScalingGroupConfig{
Name: strings.ToLower(serviceName), Name: strings.ToLower(serviceName),
CliqueNames: cliqueNames, CliqueNames: cliqueNames,
Replicas: component.Replicas, Replicas: component.Replicas,
MinAvailable: ptr.To(int32(1)), MinAvailable: ptr.To(int32(1)),
TopologyConstraint: toGroveTopologyConstraint(component.TopologyConstraint),
}) })
} }
} }
......
...@@ -7480,3 +7480,180 @@ func TestGenerateDynamoComponentsDeployments_SpecMetadataPropagation(t *testing. ...@@ -7480,3 +7480,180 @@ func TestGenerateDynamoComponentsDeployments_SpecMetadataPropagation(t *testing.
assert.Equal(t, "frontend", dcd.Spec.Labels[commonconsts.KubeLabelDynamoComponent]) assert.Equal(t, "frontend", dcd.Spec.Labels[commonconsts.KubeLabelDynamoComponent])
assert.Equal(t, dgd.Name, dcd.Spec.Labels[commonconsts.KubeLabelDynamoGraphDeploymentName]) assert.Equal(t, dgd.Name, dcd.Spec.Labels[commonconsts.KubeLabelDynamoGraphDeploymentName])
} }
func TestGenerateGrovePodCliqueSet_TopologyConstraints(t *testing.T) {
secretsRetriever := &mockSecretsRetriever{}
operatorConfig := &configv1alpha1.OperatorConfiguration{}
tests := []struct {
name string
deployment *v1alpha1.DynamoGraphDeployment
wantPCSTemplateTC *grovev1alpha1.TopologyConstraint
wantCliqueTC map[string]*grovev1alpha1.TopologyConstraint // clique name -> expected TC
wantPCSGTC map[string]*grovev1alpha1.TopologyConstraint // pcsg name -> expected TC
wantPCSGCount int
}{
{
name: "no topology constraints - PCS has no TC, cliques have no TC",
deployment: &v1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-deploy",
Namespace: "default",
},
Spec: v1alpha1.DynamoGraphDeploymentSpec{
Services: map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
"Worker": {
ComponentType: commonconsts.ComponentTypeWorker,
Replicas: ptr.To(int32(2)),
},
},
},
},
wantPCSTemplateTC: nil,
wantCliqueTC: map[string]*grovev1alpha1.TopologyConstraint{"worker": nil},
wantPCSGCount: 0,
},
{
name: "single-node service with topology constraints - TC on PCS template and clique",
deployment: &v1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-deploy",
Namespace: "default",
},
Spec: v1alpha1.DynamoGraphDeploymentSpec{
TopologyConstraint: &v1alpha1.SpecTopologyConstraint{
TopologyProfile: "test-topology",
PackDomain: v1alpha1.TopologyDomain("zone"),
},
Services: map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
"Worker": {
ComponentType: commonconsts.ComponentTypeWorker,
Replicas: ptr.To(int32(2)),
TopologyConstraint: &v1alpha1.TopologyConstraint{
PackDomain: v1alpha1.TopologyDomain("rack"),
},
},
},
},
},
wantPCSTemplateTC: &grovev1alpha1.TopologyConstraint{
PackDomain: grovev1alpha1.TopologyDomain("zone"),
},
wantCliqueTC: map[string]*grovev1alpha1.TopologyConstraint{
"worker": {PackDomain: grovev1alpha1.TopologyDomain("rack")},
},
wantPCSGCount: 0,
},
{
name: "multinode service with topology constraints - TC on PCS template and PCSG, not clique",
deployment: &v1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-deploy",
Namespace: "default",
},
Spec: v1alpha1.DynamoGraphDeploymentSpec{
TopologyConstraint: &v1alpha1.SpecTopologyConstraint{
TopologyProfile: "test-topology",
PackDomain: v1alpha1.TopologyDomain("zone"),
},
Services: map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
"Worker": {
ComponentType: commonconsts.ComponentTypeWorker,
Replicas: ptr.To(int32(2)),
Multinode: &v1alpha1.MultinodeSpec{
NodeCount: 4,
},
TopologyConstraint: &v1alpha1.TopologyConstraint{
PackDomain: v1alpha1.TopologyDomain("block"),
},
},
},
},
},
wantPCSTemplateTC: &grovev1alpha1.TopologyConstraint{
PackDomain: grovev1alpha1.TopologyDomain("zone"),
},
wantCliqueTC: map[string]*grovev1alpha1.TopologyConstraint{
"worker-ldr": nil,
"worker-wkr": nil,
},
wantPCSGTC: map[string]*grovev1alpha1.TopologyConstraint{
"worker": {PackDomain: grovev1alpha1.TopologyDomain("block")},
},
wantPCSGCount: 1,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
pcs, err := GenerateGrovePodCliqueSet(
context.Background(),
tt.deployment,
operatorConfig,
&controller_common.RuntimeConfig{},
secretsRetriever,
&RestartState{},
nil,
nil,
)
assert.NoError(t, err)
assert.NotNil(t, pcs)
// Verify PCS template-level TopologyConstraint
if tt.wantPCSTemplateTC == nil {
assert.Nil(t, pcs.Spec.Template.TopologyConstraint, "expected PCS template TopologyConstraint to be nil")
} else {
assert.NotNil(t, pcs.Spec.Template.TopologyConstraint, "expected PCS template TopologyConstraint to be set")
assert.Equal(t, tt.wantPCSTemplateTC.PackDomain, pcs.Spec.Template.TopologyConstraint.PackDomain)
}
// Verify clique-level TopologyConstraints (exhaustive)
assert.Equal(t, len(tt.wantCliqueTC), len(pcs.Spec.Template.Cliques), "clique count mismatch")
actualCliqueNames := make(map[string]struct{}, len(pcs.Spec.Template.Cliques))
for _, clique := range pcs.Spec.Template.Cliques {
actualCliqueNames[clique.Name] = struct{}{}
expectedTC, ok := tt.wantCliqueTC[clique.Name]
if !ok {
t.Errorf("unexpected clique %q in PCS", clique.Name)
continue
}
if expectedTC == nil {
assert.Nil(t, clique.TopologyConstraint, "clique %q: expected nil TopologyConstraint", clique.Name)
} else {
assert.NotNil(t, clique.TopologyConstraint, "clique %q: expected non-nil TopologyConstraint", clique.Name)
assert.Equal(t, expectedTC.PackDomain, clique.TopologyConstraint.PackDomain, "clique %q: packDomain mismatch", clique.Name)
}
}
for expectedName := range tt.wantCliqueTC {
if _, found := actualCliqueNames[expectedName]; !found {
t.Errorf("expected clique %q not found in PCS", expectedName)
}
}
// Verify PCSG-level TopologyConstraints (exhaustive)
assert.Equal(t, tt.wantPCSGCount, len(pcs.Spec.Template.PodCliqueScalingGroupConfigs), "PCSG count mismatch")
actualPCSGNames := make(map[string]struct{}, len(pcs.Spec.Template.PodCliqueScalingGroupConfigs))
for _, pcsg := range pcs.Spec.Template.PodCliqueScalingGroupConfigs {
actualPCSGNames[pcsg.Name] = struct{}{}
if tt.wantPCSGTC != nil {
expectedTC, ok := tt.wantPCSGTC[pcsg.Name]
if !ok {
t.Errorf("unexpected PCSG %q in PCS", pcsg.Name)
continue
}
if expectedTC == nil {
assert.Nil(t, pcsg.TopologyConstraint, "PCSG %q: expected nil TopologyConstraint", pcsg.Name)
} else {
assert.NotNil(t, pcsg.TopologyConstraint, "PCSG %q: expected non-nil TopologyConstraint", pcsg.Name)
assert.Equal(t, expectedTC.PackDomain, pcsg.TopologyConstraint.PackDomain, "PCSG %q: packDomain mismatch", pcsg.Name)
}
}
}
for expectedName := range tt.wantPCSGTC {
if _, found := actualPCSGNames[expectedName]; !found {
t.Errorf("expected PCSG %q not found in PCS", expectedName)
}
}
})
}
}
...@@ -239,6 +239,28 @@ func CheckPCSGReady(ctx context.Context, client client.Client, resourceName, nam ...@@ -239,6 +239,28 @@ func CheckPCSGReady(ctx context.Context, client client.Client, resourceName, nam
return true, "", serviceStatus return true, "", serviceStatus
} }
// specToGroveTopologyConstraint converts a deployment-level SpecTopologyConstraint
// to a Grove TopologyConstraint, extracting only the PackDomain.
func specToGroveTopologyConstraint(tc *v1alpha1.SpecTopologyConstraint) *grovev1alpha1.TopologyConstraint {
if tc == nil || tc.PackDomain == "" {
return nil
}
return &grovev1alpha1.TopologyConstraint{
PackDomain: grovev1alpha1.TopologyDomain(tc.PackDomain),
}
}
// toGroveTopologyConstraint converts a service-level TopologyConstraint
// to a Grove TopologyConstraint.
func toGroveTopologyConstraint(tc *v1alpha1.TopologyConstraint) *grovev1alpha1.TopologyConstraint {
if tc == nil || tc.PackDomain == "" {
return nil
}
return &grovev1alpha1.TopologyConstraint{
PackDomain: grovev1alpha1.TopologyDomain(tc.PackDomain),
}
}
// resolveKaiSchedulerQueueName extracts the queue name from annotations or returns default // resolveKaiSchedulerQueueName extracts the queue name from annotations or returns default
// This is the shared logic between DetermineKaiSchedulerQueue and ResolveKaiSchedulerQueue // This is the shared logic between DetermineKaiSchedulerQueue and ResolveKaiSchedulerQueue
func resolveKaiSchedulerQueueName(annotations map[string]string) string { func resolveKaiSchedulerQueueName(annotations map[string]string) string {
......
...@@ -28,7 +28,10 @@ import ( ...@@ -28,7 +28,10 @@ import (
nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1" nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/consts" "github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
internalwebhook "github.com/ai-dynamo/dynamo/deploy/operator/internal/webhook" internalwebhook "github.com/ai-dynamo/dynamo/deploy/operator/internal/webhook"
grovev1alpha1 "github.com/ai-dynamo/grove/operator/api/core/v1alpha1"
authenticationv1 "k8s.io/api/authentication/v1" authenticationv1 "k8s.io/api/authentication/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime" ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission" "sigs.k8s.io/controller-runtime/pkg/webhook/admission"
) )
...@@ -65,9 +68,10 @@ func NewDynamoGraphDeploymentValidatorWithManager(deployment *nvidiacomv1alpha1. ...@@ -65,9 +68,10 @@ func NewDynamoGraphDeploymentValidatorWithManager(deployment *nvidiacomv1alpha1.
} }
} }
// Validate performs stateless validation on the DynamoGraphDeployment. // Validate performs validation on the DynamoGraphDeployment.
// Context is required for operations that may need to query the cluster (e.g., CRD checks). // The ClusterTopology CRD check only runs on CREATE (Generation == 1). On UPDATE
// Returns warnings and error. // (Generation > 1) it is skipped because TAS fields are immutable — domains were
// already validated at creation time and the topology may have changed since.
func (v *DynamoGraphDeploymentValidator) Validate(ctx context.Context) (admission.Warnings, error) { func (v *DynamoGraphDeploymentValidator) Validate(ctx context.Context) (admission.Warnings, error) {
// Validate that at least one service is specified // Validate that at least one service is specified
if len(v.deployment.Spec.Services) == 0 { if len(v.deployment.Spec.Services) == 0 {
...@@ -89,6 +93,11 @@ func (v *DynamoGraphDeploymentValidator) Validate(ctx context.Context) (admissio ...@@ -89,6 +93,11 @@ func (v *DynamoGraphDeploymentValidator) Validate(ctx context.Context) (admissio
return nil, err return nil, err
} }
// Validate topology constraints
if err := v.validateTopologyConstraints(ctx); err != nil {
return nil, err
}
var allWarnings admission.Warnings var allWarnings admission.Warnings
// Validate each service // Validate each service
...@@ -161,6 +170,11 @@ func (v *DynamoGraphDeploymentValidator) validateImmutableFields(old *nvidiacomv ...@@ -161,6 +170,11 @@ func (v *DynamoGraphDeploymentValidator) validateImmutableFields(old *nvidiacomv
} }
} }
// Validate topology constraint immutability
if err := v.validateTopologyConstraintImmutability(old); err != nil {
errs = append(errs, err)
}
return errors.Join(errs...) return errors.Join(errs...)
} }
...@@ -455,6 +469,254 @@ func (v *DynamoGraphDeploymentValidator) validateAnnotations() error { ...@@ -455,6 +469,254 @@ func (v *DynamoGraphDeploymentValidator) validateAnnotations() error {
return errors.Join(errs...) return errors.Join(errs...)
} }
// validateTopologyConstraints validates topology constraint configuration.
// Topology constraints are independently optional at the spec and service levels.
// On UPDATE (Generation > 1) the ClusterTopology CRD check is skipped (TAS is immutable).
func (v *DynamoGraphDeploymentValidator) validateTopologyConstraints(ctx context.Context) error {
specConstraint := v.deployment.Spec.TopologyConstraint
hasAnyConstraint := specConstraint != nil
var errs []error
// Validate spec-level fields if set
if specConstraint != nil {
if specConstraint.PackDomain != "" && !nvidiacomv1alpha1.IsValidTopologyDomainFormat(specConstraint.PackDomain) {
errs = append(errs, fmt.Errorf("spec.topologyConstraint.packDomain %q is not a valid topology domain; "+
"must match ^[a-z0-9]([a-z0-9-]*[a-z0-9])?$", specConstraint.PackDomain))
}
}
// Validate each service's topologyConstraint
serviceNames := make([]string, 0, len(v.deployment.Spec.Services))
for name := range v.deployment.Spec.Services {
serviceNames = append(serviceNames, name)
}
sort.Strings(serviceNames)
for _, serviceName := range serviceNames {
service := v.deployment.Spec.Services[serviceName]
if service == nil || service.TopologyConstraint == nil {
continue
}
hasAnyConstraint = true
fieldPath := fmt.Sprintf("spec.services[%s]", serviceName)
// packDomain is required at service level
if service.TopologyConstraint.PackDomain == "" {
errs = append(errs, fmt.Errorf("%s.topologyConstraint.packDomain is required", fieldPath))
continue
}
if !nvidiacomv1alpha1.IsValidTopologyDomainFormat(service.TopologyConstraint.PackDomain) {
errs = append(errs, fmt.Errorf("%s.topologyConstraint.packDomain %q is not a valid topology domain; "+
"must match ^[a-z0-9]([a-z0-9-]*[a-z0-9])?$", fieldPath, service.TopologyConstraint.PackDomain))
}
}
if !hasAnyConstraint {
return nil
}
// When any constraint is set, spec.topologyConstraint must exist with topologyProfile
if specConstraint == nil {
errs = append(errs, fmt.Errorf("spec.topologyConstraint with topologyProfile is required "+
"when any topology constraint is set (at spec or service level)"))
return errors.Join(errs...)
}
if specConstraint.TopologyProfile == "" {
errs = append(errs, fmt.Errorf("spec.topologyConstraint.topologyProfile is required "+
"when any topology constraint is set"))
}
// When spec-level packDomain is omitted, every service must carry its own topologyConstraint.
// Otherwise the service would have no pack domain despite TAS being active.
if specConstraint.PackDomain == "" {
for _, serviceName := range serviceNames {
service := v.deployment.Spec.Services[serviceName]
if service == nil || service.TopologyConstraint == nil {
errs = append(errs, fmt.Errorf("spec.services[%s].topologyConstraint is required "+
"because spec.topologyConstraint.packDomain is not set; either set a spec-level "+
"packDomain or provide a topologyConstraint for every service", serviceName))
}
}
}
// Validate domains and hierarchy against the framework's topology CRD (CREATE only).
// On UPDATE (Generation > 1) this is skipped because TAS fields are immutable.
// Skip when prior validation errors exist to avoid redundant "domain not found" messages.
if len(errs) == 0 && v.mgr != nil && v.isGrovePathway() && v.deployment.Generation == 1 {
if err := v.validateTopologyDomainsAgainstGroveClusterTopology(ctx); err != nil {
errs = append(errs, err)
}
}
return errors.Join(errs...)
}
// validateTopologyDomainsAgainstGroveClusterTopology reads the Grove ClusterTopology
// (identified by spec.topologyConstraint.topologyProfile) and validates that each
// packDomain exists as a level and that the hierarchy is respected.
func (v *DynamoGraphDeploymentValidator) validateTopologyDomainsAgainstGroveClusterTopology(ctx context.Context) error {
profileName := v.deployment.Spec.TopologyConstraint.TopologyProfile
if profileName == "" {
return nil
}
cl := v.mgr.GetClient()
ct := &grovev1alpha1.ClusterTopology{}
err := cl.Get(ctx, types.NamespacedName{Name: profileName}, ct)
if err != nil {
if k8serrors.IsNotFound(err) {
return fmt.Errorf("topology-aware scheduling requires a ClusterTopology resource %q but it was not found; "+
"ensure the cluster topology is configured per the framework documentation", profileName)
}
return fmt.Errorf("failed to read ClusterTopology %q for topology validation: %w", profileName, err)
}
// Build a map from domain name to its index in the levels array (broadest = 0).
domainIndex := make(map[string]int, len(ct.Spec.Levels))
for i, level := range ct.Spec.Levels {
domainIndex[string(level.Domain)] = i
}
// Collect all (fieldPath, domain) pairs to validate.
type domainCheck struct {
fieldPath string
domain nvidiacomv1alpha1.TopologyDomain
}
var checks []domainCheck
if v.deployment.Spec.TopologyConstraint.PackDomain != "" {
checks = append(checks, domainCheck{
fieldPath: "spec.topologyConstraint.packDomain",
domain: v.deployment.Spec.TopologyConstraint.PackDomain,
})
}
serviceNames := make([]string, 0, len(v.deployment.Spec.Services))
for name := range v.deployment.Spec.Services {
serviceNames = append(serviceNames, name)
}
sort.Strings(serviceNames)
for _, serviceName := range serviceNames {
service := v.deployment.Spec.Services[serviceName]
if service != nil && service.TopologyConstraint != nil && service.TopologyConstraint.PackDomain != "" {
checks = append(checks, domainCheck{
fieldPath: fmt.Sprintf("spec.services[%s].topologyConstraint.packDomain", serviceName),
domain: service.TopologyConstraint.PackDomain,
})
}
}
var errs []error
for _, c := range checks {
if _, ok := domainIndex[string(c.domain)]; !ok {
errs = append(errs, fmt.Errorf("%s: domain %q does not exist in ClusterTopology %q; "+
"available domains: %v", c.fieldPath, c.domain, profileName, topologyLevelDomains(ct)))
}
}
// Validate hierarchy: service packDomain must be at equal or higher index than spec packDomain.
specDomain := v.deployment.Spec.TopologyConstraint.PackDomain
if specDomain != "" {
specIdx, specOk := domainIndex[string(specDomain)]
if specOk {
for _, serviceName := range serviceNames {
service := v.deployment.Spec.Services[serviceName]
if service == nil || service.TopologyConstraint == nil || service.TopologyConstraint.PackDomain == "" {
continue
}
svcDomain := service.TopologyConstraint.PackDomain
svcIdx, svcOk := domainIndex[string(svcDomain)]
if svcOk && svcIdx < specIdx {
errs = append(errs, fmt.Errorf("spec.services[%s]: topologyConstraint.packDomain %q is broader "+
"than spec-level %q; service constraints must be equal to or narrower than the "+
"deployment-level constraint", serviceName, svcDomain, specDomain))
}
}
}
}
return errors.Join(errs...)
}
// topologyLevelDomains returns the list of domain names from a ClusterTopology for error messages.
func topologyLevelDomains(ct *grovev1alpha1.ClusterTopology) []string {
domains := make([]string, 0, len(ct.Spec.Levels))
for _, level := range ct.Spec.Levels {
domains = append(domains, string(level.Domain))
}
sort.Strings(domains)
return domains
}
// validateTopologyConstraintImmutability validates that topology constraints are not changed on UPDATE.
func (v *DynamoGraphDeploymentValidator) validateTopologyConstraintImmutability(old *nvidiacomv1alpha1.DynamoGraphDeployment) error {
var errs []error
oldTC := old.Spec.TopologyConstraint
newTC := v.deployment.Spec.TopologyConstraint
// Check spec-level topology constraint immutability
if !specTopologyConstraintsEqual(oldTC, newTC) {
errs = append(errs, fmt.Errorf("spec.topologyConstraint is immutable and cannot be added, removed, or changed after creation; "+
"delete and recreate the DynamoGraphDeployment to change topology constraints"))
}
// Check per-service topology constraint immutability (sorted for deterministic errors)
serviceNames := make([]string, 0, len(v.deployment.Spec.Services))
for name := range v.deployment.Spec.Services {
serviceNames = append(serviceNames, name)
}
sort.Strings(serviceNames)
for _, serviceName := range serviceNames {
newService := v.deployment.Spec.Services[serviceName]
oldService, exists := old.Spec.Services[serviceName]
if !exists {
continue
}
var oldSvcTC, newSvcTC *nvidiacomv1alpha1.TopologyConstraint
if oldService != nil {
oldSvcTC = oldService.TopologyConstraint
}
if newService != nil {
newSvcTC = newService.TopologyConstraint
}
if !topologyConstraintsEqual(oldSvcTC, newSvcTC) {
errs = append(errs, fmt.Errorf("spec.services[%s].topologyConstraint is immutable and cannot be added, removed, or changed after creation; "+
"delete and recreate the DynamoGraphDeployment to change topology constraints", serviceName))
}
}
return errors.Join(errs...)
}
// specTopologyConstraintsEqual returns true if two SpecTopologyConstraint pointers are semantically equal.
func specTopologyConstraintsEqual(a, b *nvidiacomv1alpha1.SpecTopologyConstraint) bool {
if a == nil && b == nil {
return true
}
if a == nil || b == nil {
return false
}
return a.TopologyProfile == b.TopologyProfile && a.PackDomain == b.PackDomain
}
// topologyConstraintsEqual returns true if two service-level TopologyConstraint pointers are semantically equal.
func topologyConstraintsEqual(a, b *nvidiacomv1alpha1.TopologyConstraint) bool {
if a == nil && b == nil {
return true
}
if a == nil || b == nil {
return false
}
return a.PackDomain == b.PackDomain
}
func getUnique[T comparable](slice []T) []T { func getUnique[T comparable](slice []T) []T {
seen := make(map[T]struct{}, len(slice)) seen := make(map[T]struct{}, len(slice))
uniqueSlice := make([]T, 0, len(slice)) uniqueSlice := make([]T, 0, len(slice))
......
...@@ -89,10 +89,8 @@ func (h *DynamoGraphDeploymentHandler) ValidateUpdate(ctx context.Context, oldOb ...@@ -89,10 +89,8 @@ func (h *DynamoGraphDeploymentHandler) ValidateUpdate(ctx context.Context, oldOb
return nil, err return nil, err
} }
// Create validator with manager for API group detection and perform validation // Create validator with manager for API group detection and perform validation.
validator := NewDynamoGraphDeploymentValidatorWithManager(newDeployment, h.mgr) validator := NewDynamoGraphDeploymentValidatorWithManager(newDeployment, h.mgr)
// Validate stateless rules
warnings, err := validator.Validate(ctx) warnings, err := validator.Validate(ctx)
if err != nil { if err != nil {
return warnings, err return warnings, err
......
...@@ -838,16 +838,379 @@ func TestDynamoGraphDeploymentValidator_Validate(t *testing.T) { ...@@ -838,16 +838,379 @@ func TestDynamoGraphDeploymentValidator_Validate(t *testing.T) {
wantErr: true, wantErr: true,
errMsg: `annotation nvidia.com/dynamo-operator-origin-version has invalid value "not-a-version": must be valid semver`, errMsg: `annotation nvidia.com/dynamo-operator-origin-version has invalid value "not-a-version": must be valid semver`,
}, },
// Topology constraint validation tests
{ {
name: "both annotations valid", name: "no topology constraints is valid (backward compatible)",
deployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-graph",
Namespace: "default",
},
Spec: nvidiacomv1alpha1.DynamoGraphDeploymentSpec{
Services: map[string]*nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
"main": {},
},
},
},
wantErr: false,
},
{
name: "valid topology constraints with spec and service level",
deployment: &nvidiacomv1alpha1.DynamoGraphDeployment{ deployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "test-graph", Name: "test-graph",
Namespace: "default", Namespace: "default",
Annotations: map[string]string{ Annotations: map[string]string{
consts.KubeAnnotationDynamoOperatorOriginVersion: "1.0.0", consts.KubeAnnotationEnableGrove: "false",
consts.KubeAnnotationVLLMDistributedExecutorBackend: "mp", },
},
Spec: nvidiacomv1alpha1.DynamoGraphDeploymentSpec{
TopologyConstraint: &nvidiacomv1alpha1.SpecTopologyConstraint{
TopologyProfile: "test-topology",
PackDomain: nvidiacomv1alpha1.TopologyDomain("zone"),
},
Services: map[string]*nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
"Worker": {
TopologyConstraint: &nvidiacomv1alpha1.TopologyConstraint{
PackDomain: nvidiacomv1alpha1.TopologyDomain("block"),
},
},
"Frontend": {
TopologyConstraint: &nvidiacomv1alpha1.TopologyConstraint{
PackDomain: nvidiacomv1alpha1.TopologyDomain("zone"),
},
},
},
},
},
wantErr: false,
},
{
name: "spec-level with topologyProfile only (no packDomain) is rejected when service lacks constraint",
deployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-graph",
Namespace: "default",
Annotations: map[string]string{
consts.KubeAnnotationEnableGrove: "false",
},
},
Spec: nvidiacomv1alpha1.DynamoGraphDeploymentSpec{
TopologyConstraint: &nvidiacomv1alpha1.SpecTopologyConstraint{
TopologyProfile: "test-topology",
},
Services: map[string]*nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
"Worker": {
TopologyConstraint: &nvidiacomv1alpha1.TopologyConstraint{
PackDomain: nvidiacomv1alpha1.TopologyDomain("rack"),
},
},
"Frontend": {},
},
},
},
wantErr: true,
errContains: true,
errMsg: "spec.services[Frontend].topologyConstraint is required because spec.topologyConstraint.packDomain is not set",
},
{
name: "spec-level set but service has no topology constraint is valid (inherits)",
deployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-graph",
Namespace: "default",
Annotations: map[string]string{
consts.KubeAnnotationEnableGrove: "false",
},
},
Spec: nvidiacomv1alpha1.DynamoGraphDeploymentSpec{
TopologyConstraint: &nvidiacomv1alpha1.SpecTopologyConstraint{
TopologyProfile: "test-topology",
PackDomain: nvidiacomv1alpha1.TopologyDomain("zone"),
},
Services: map[string]*nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
"Worker": {},
},
},
},
wantErr: false,
},
{
name: "invalid packDomain format at spec level",
deployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-graph",
Namespace: "default",
Annotations: map[string]string{
consts.KubeAnnotationEnableGrove: "false",
},
},
Spec: nvidiacomv1alpha1.DynamoGraphDeploymentSpec{
TopologyConstraint: &nvidiacomv1alpha1.SpecTopologyConstraint{
TopologyProfile: "test-topology",
PackDomain: nvidiacomv1alpha1.TopologyDomain("INVALID!"),
},
Services: map[string]*nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
"Worker": {
TopologyConstraint: &nvidiacomv1alpha1.TopologyConstraint{
PackDomain: nvidiacomv1alpha1.TopologyDomain("rack"),
},
},
},
},
},
wantErr: true,
errContains: true,
errMsg: "is not a valid topology domain",
},
{
name: "service domain equal to spec-level is valid (no hierarchy check without CRD)",
deployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-graph",
Namespace: "default",
Annotations: map[string]string{
consts.KubeAnnotationEnableGrove: "false",
},
},
Spec: nvidiacomv1alpha1.DynamoGraphDeploymentSpec{
TopologyConstraint: &nvidiacomv1alpha1.SpecTopologyConstraint{
TopologyProfile: "test-topology",
PackDomain: nvidiacomv1alpha1.TopologyDomain("rack"),
},
Services: map[string]*nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
"Worker": {
TopologyConstraint: &nvidiacomv1alpha1.TopologyConstraint{
PackDomain: nvidiacomv1alpha1.TopologyDomain("rack"),
},
},
},
},
},
wantErr: false,
},
{
name: "mixed: spec-level with some services having constraints and some not",
deployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-graph",
Namespace: "default",
Annotations: map[string]string{
consts.KubeAnnotationEnableGrove: "false",
},
},
Spec: nvidiacomv1alpha1.DynamoGraphDeploymentSpec{
TopologyConstraint: &nvidiacomv1alpha1.SpecTopologyConstraint{
TopologyProfile: "test-topology",
PackDomain: nvidiacomv1alpha1.TopologyDomain("zone"),
},
Services: map[string]*nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
"Worker": {
TopologyConstraint: &nvidiacomv1alpha1.TopologyConstraint{
PackDomain: nvidiacomv1alpha1.TopologyDomain("rack"),
},
},
"Frontend": {},
},
},
},
wantErr: false,
},
{
name: "topologyProfile missing at spec level when service has constraint",
deployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-graph",
Namespace: "default",
Annotations: map[string]string{
consts.KubeAnnotationEnableGrove: "false",
},
},
Spec: nvidiacomv1alpha1.DynamoGraphDeploymentSpec{
Services: map[string]*nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
"Worker": {
TopologyConstraint: &nvidiacomv1alpha1.TopologyConstraint{
PackDomain: nvidiacomv1alpha1.TopologyDomain("rack"),
},
},
},
},
},
wantErr: true,
errContains: true,
errMsg: "spec.topologyConstraint with topologyProfile is required",
},
{
name: "topologyProfile empty at spec level when service has constraint",
deployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-graph",
Namespace: "default",
Annotations: map[string]string{
consts.KubeAnnotationEnableGrove: "false",
},
},
Spec: nvidiacomv1alpha1.DynamoGraphDeploymentSpec{
TopologyConstraint: &nvidiacomv1alpha1.SpecTopologyConstraint{
PackDomain: nvidiacomv1alpha1.TopologyDomain("zone"),
},
Services: map[string]*nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
"Worker": {
TopologyConstraint: &nvidiacomv1alpha1.TopologyConstraint{
PackDomain: nvidiacomv1alpha1.TopologyDomain("rack"),
},
},
},
},
},
wantErr: true,
errContains: true,
errMsg: "topologyProfile is required",
},
{
name: "service-level topologyConstraint without packDomain is rejected",
deployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-graph",
Namespace: "default",
Annotations: map[string]string{
consts.KubeAnnotationEnableGrove: "false",
},
},
Spec: nvidiacomv1alpha1.DynamoGraphDeploymentSpec{
TopologyConstraint: &nvidiacomv1alpha1.SpecTopologyConstraint{
TopologyProfile: "test-topology",
PackDomain: nvidiacomv1alpha1.TopologyDomain("zone"),
},
Services: map[string]*nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
"Worker": {
TopologyConstraint: &nvidiacomv1alpha1.TopologyConstraint{},
},
},
},
},
wantErr: true,
errContains: true,
errMsg: "packDomain is required",
},
{
name: "invalid packDomain format at service level",
deployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-graph",
Namespace: "default",
Annotations: map[string]string{
consts.KubeAnnotationEnableGrove: "false",
},
},
Spec: nvidiacomv1alpha1.DynamoGraphDeploymentSpec{
TopologyConstraint: &nvidiacomv1alpha1.SpecTopologyConstraint{
TopologyProfile: "test-topology",
PackDomain: nvidiacomv1alpha1.TopologyDomain("zone"),
},
Services: map[string]*nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
"Worker": {
TopologyConstraint: &nvidiacomv1alpha1.TopologyConstraint{
PackDomain: nvidiacomv1alpha1.TopologyDomain("INVALID!"),
},
},
},
},
},
wantErr: true,
errContains: true,
errMsg: "is not a valid topology domain",
},
{
name: "service domain narrower than spec-level is valid",
deployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-graph",
Namespace: "default",
Annotations: map[string]string{
consts.KubeAnnotationEnableGrove: "false",
},
},
Spec: nvidiacomv1alpha1.DynamoGraphDeploymentSpec{
TopologyConstraint: &nvidiacomv1alpha1.SpecTopologyConstraint{
TopologyProfile: "test-topology",
PackDomain: nvidiacomv1alpha1.TopologyDomain("zone"),
},
Services: map[string]*nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
"Worker": {
TopologyConstraint: &nvidiacomv1alpha1.TopologyConstraint{
PackDomain: nvidiacomv1alpha1.TopologyDomain("host"),
},
},
},
},
},
wantErr: false,
},
{
name: "no spec packDomain but all services have topology constraint is valid",
deployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-graph",
Namespace: "default",
Annotations: map[string]string{
consts.KubeAnnotationEnableGrove: "false",
},
},
Spec: nvidiacomv1alpha1.DynamoGraphDeploymentSpec{
TopologyConstraint: &nvidiacomv1alpha1.SpecTopologyConstraint{
TopologyProfile: "test-topology",
}, },
Services: map[string]*nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
"Worker": {
TopologyConstraint: &nvidiacomv1alpha1.TopologyConstraint{
PackDomain: nvidiacomv1alpha1.TopologyDomain("rack"),
},
},
"Frontend": {
TopologyConstraint: &nvidiacomv1alpha1.TopologyConstraint{
PackDomain: nvidiacomv1alpha1.TopologyDomain("zone"),
},
},
},
},
},
wantErr: false,
},
{
name: "no spec packDomain and service missing topology constraint is rejected",
deployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-graph",
Namespace: "default",
Annotations: map[string]string{
consts.KubeAnnotationEnableGrove: "false",
},
},
Spec: nvidiacomv1alpha1.DynamoGraphDeploymentSpec{
TopologyConstraint: &nvidiacomv1alpha1.SpecTopologyConstraint{
TopologyProfile: "test-topology",
},
Services: map[string]*nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
"Worker": {
TopologyConstraint: &nvidiacomv1alpha1.TopologyConstraint{
PackDomain: nvidiacomv1alpha1.TopologyDomain("rack"),
},
},
"Frontend": {},
},
},
},
wantErr: true,
errContains: true,
errMsg: "spec.services[Frontend].topologyConstraint is required because spec.topologyConstraint.packDomain is not set",
},
{
name: "both annotations valid",
deployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-graph",
Namespace: "default",
}, },
Spec: nvidiacomv1alpha1.DynamoGraphDeploymentSpec{ Spec: nvidiacomv1alpha1.DynamoGraphDeploymentSpec{
Services: map[string]*nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{ Services: map[string]*nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
...@@ -1578,8 +1941,8 @@ func TestDynamoGraphDeploymentValidator_ValidateUpdate(t *testing.T) { ...@@ -1578,8 +1941,8 @@ func TestDynamoGraphDeploymentValidator_ValidateUpdate(t *testing.T) {
return return
} }
if tt.wantErr && err.Error() != tt.errMsg { if tt.wantErr && !strings.Contains(err.Error(), tt.errMsg) {
t.Errorf("DynamoGraphDeploymentValidator.ValidateUpdate() error message = %v, want %v", err.Error(), tt.errMsg) t.Errorf("DynamoGraphDeploymentValidator.ValidateUpdate() error message = %v, want to contain %v", err.Error(), tt.errMsg)
} }
if tt.wantWarnings && len(warnings) == 0 { if tt.wantWarnings && len(warnings) == 0 {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment