Unverified Commit a48672f5 authored by Julien Mancuso's avatar Julien Mancuso Committed by GitHub
Browse files

feat: add inter-pod GMS (#7777)

parent 0d635418
......@@ -770,7 +770,9 @@ func Test_reconcileGroveResources(t *testing.T) {
name string
dgdSpec v1alpha1.DynamoGraphDeploymentSpec
existingGroveResources []client.Object
draEnabled bool
wantReconcileResult ReconcileResult
wantErrSubstring string
}{
{
name: "singular frontend service with 2 replicas - creates a PodClique with 2 replicas - ready",
......@@ -1038,6 +1040,25 @@ func Test_reconcileGroveResources(t *testing.T) {
},
},
},
{
name: "inter-pod GMS failover requires DRA - returns clear error when DRA is disabled",
dgdSpec: v1alpha1.DynamoGraphDeploymentSpec{
BackendFramework: "vllm",
Services: map[string]*v1alpha1.DynamoComponentDeploymentSharedSpec{
"decode": {
ComponentType: string(commonconsts.ComponentTypeDecode),
Replicas: ptr.To(int32(1)),
Failover: &v1alpha1.FailoverSpec{
Enabled: true,
Mode: v1alpha1.GMSModeInterPod,
NumShadows: 1,
},
},
},
},
draEnabled: false,
wantErrSubstring: "requires DRA",
},
}
for _, tt := range tests {
......@@ -1073,7 +1094,7 @@ func Test_reconcileGroveResources(t *testing.T) {
Client: fakeKubeClient,
Recorder: recorder,
Config: &configv1alpha1.OperatorConfiguration{},
RuntimeConfig: &controller_common.RuntimeConfig{},
RuntimeConfig: &controller_common.RuntimeConfig{DRAEnabled: tt.draEnabled},
ScaleClient: &mockScaleClient{},
DockerSecretRetriever: &mockDockerSecretRetriever{
GetSecretsFunc: func(namespace, imageName string) ([]string, error) {
......@@ -1083,6 +1104,11 @@ func Test_reconcileGroveResources(t *testing.T) {
}
result, err := reconciler.reconcileGroveResources(ctx, dgd, nil, nil)
if tt.wantErrSubstring != "" {
g.Expect(err).To(gomega.HaveOccurred())
g.Expect(err.Error()).To(gomega.ContainSubstring(tt.wantErrSubstring))
return
}
g.Expect(err).NotTo(gomega.HaveOccurred())
g.Expect(result).To(gomega.Equal(tt.wantReconcileResult))
......
/*
* SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*/
package controller
import (
"context"
"fmt"
commonconsts "github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)
// Grove labels that together uniquely identify an "engine group" — the set of
// pods (one per rank in multi-node, or a single pod in single-node) that share
// the same pod index within a PCSG replica. When any one of them terminates,
// the whole group must be torn down so Grove can recreate it as a healthy unit.
const (
groveLabelPCSG = "grove.io/podcliquescalinggroup"
groveLabelPCSGReplicaIndex = "grove.io/podcliquescalinggroup-replica-index"
groveLabelPodIndex = "grove.io/podclique-pod-index"
)
// FailoverCascadeReconciler watches GMS failover pods (restartPolicy: Never)
// and cascade-deletes all pods in the same engine group when any member
// reaches a terminal phase (Failed or Succeeded). This ensures broken
// distributed inference groups are restarted cleanly by Grove.
//
// Background: GMS (GPU Memory Service) pods run with restartPolicy: Never so
// that Kubernetes does not attempt to restart them in-place — a partial
// restart would leave the distributed inference group in an inconsistent
// state. Instead, this controller detects the terminal pod and deletes the
// entire group. Grove then sees the missing pods and recreates the whole
// group from scratch.
//
// An engine group is identified by three Grove labels:
// - grove.io/podcliquescalinggroup (PCSG name)
// - grove.io/podcliquescalinggroup-replica-index (PCSG replica — which copy of the group)
// - grove.io/podclique-pod-index (pod index within the clique)
//
// Only pods carrying the dynamo failover engine-group-member label are
// considered; see failoverCascadePredicate().
type FailoverCascadeReconciler struct {
client.Client
Recorder record.EventRecorder
}
// NewFailoverCascadeReconciler creates a new reconciler.
func NewFailoverCascadeReconciler(c client.Client, recorder record.EventRecorder) *FailoverCascadeReconciler {
return &FailoverCascadeReconciler{
Client: c,
Recorder: recorder,
}
}
// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch;delete;deletecollection
// Reconcile is called whenever a failover-eligible pod transitions to a
// terminal phase (see failoverCascadePredicate).
//
// DeleteAllOf is idempotent, so concurrent reconciles for multiple pods in the
// same engine group are harmless — the first deletes the group and subsequent
// calls are no-ops.
func (r *FailoverCascadeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)
var pod corev1.Pod
if err := r.Get(ctx, req.NamespacedName, &pod); err != nil {
if errors.IsNotFound(err) {
return ctrl.Result{}, nil
}
return ctrl.Result{}, err
}
if !isTerminalPhase(pod.Status.Phase) {
return ctrl.Result{}, nil
}
// Between predicate evaluation and reconcile execution, another reconcile
// may have already cascade-deleted this pod. The pod still exists in the
// API server but is marked for deletion — skip it.
if pod.DeletionTimestamp != nil {
return ctrl.Result{}, nil
}
// Defensive re-check of the engine-group-member label: the predicate
// already filters on it at the informer layer, but labels can be removed
// between predicate evaluation and reconcile. We never want to cascade-
// delete a pod that has been explicitly unlabeled (e.g. an operator
// manually quarantining a pod).
if pod.Labels[commonconsts.KubeLabelDynamoFailoverEngineGroupMember] != commonconsts.KubeLabelValueTrue {
return ctrl.Result{}, nil
}
pcsg := pod.Labels[groveLabelPCSG]
pcsgReplica := pod.Labels[groveLabelPCSGReplicaIndex]
podIndex := pod.Labels[groveLabelPodIndex]
if pcsg == "" || pcsgReplica == "" || podIndex == "" {
logger.Info("failover pod missing Grove labels, skipping cascade",
"pod", pod.Name,
groveLabelPCSG, pcsg,
groveLabelPCSGReplicaIndex, pcsgReplica,
groveLabelPodIndex, podIndex,
)
return ctrl.Result{}, nil
}
groupLabels := client.MatchingLabels{
commonconsts.KubeLabelDynamoFailoverEngineGroupMember: commonconsts.KubeLabelValueTrue,
groveLabelPCSG: pcsg,
groveLabelPCSGReplicaIndex: pcsgReplica,
groveLabelPodIndex: podIndex,
}
// Force delete (grace=0) intentionally: the distributed inference group is
// already broken when we get here, so giving the surviving engines a SIGTERM
// window only delays Grove's recreation of the cohort and risks leaving
// half-torn-down NCCL/CUDA IPC state and stale UDS sockets on the shared
// hostPath. We deliberately skip preStop hooks and the graceful shutdown
// window; do NOT soften this to a positive grace period.
if err := r.DeleteAllOf(ctx, &corev1.Pod{}, client.InNamespace(pod.Namespace), groupLabels, client.GracePeriodSeconds(0)); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to cascade-delete engine group: %w", err)
}
logger.Info("cascade-deleted engine group",
"trigger", pod.Name,
"pcsg", pcsg,
"pcsgReplica", pcsgReplica,
"podIndex", podIndex,
)
r.Recorder.Eventf(&pod, corev1.EventTypeWarning, "FailoverCascade",
"Pod %s terminated (phase=%s); cascade-deleted engine group (pcsg=%s, replica=%s, index=%s)",
pod.Name, pod.Status.Phase, pcsg, pcsgReplica, podIndex,
)
return ctrl.Result{}, nil
}
// SetupWithManager registers a controller that watches all Pods (not just
// owned ones) and uses failoverCascadePredicate to filter down to only the
// failover-eligible phase transitions. EnqueueRequestForObject means the
// reconcile key is the pod itself (namespace/name), not a parent resource.
func (r *FailoverCascadeReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
Named("gms-failover-cascade").
Watches(&corev1.Pod{}, &handler.EnqueueRequestForObject{},
builder.WithPredicates(failoverCascadePredicate()),
).
Complete(r)
}
func isTerminalPhase(phase corev1.PodPhase) bool {
return phase == corev1.PodFailed || phase == corev1.PodSucceeded
}
// failoverCascadePredicate keeps the reconcile queue minimal by filtering
// events at the informer level, before they ever reach Reconcile().
//
// It accepts only pods carrying the dynamo failover engine-group-member label
// and only when they reach a terminal phase:
//
// - CreateFunc: handles the edge case where the informer's initial list-watch
// delivers a pod that is already Failed/Succeeded (e.g. the informer cache
// started after the pod transitioned, so no Update event was observed).
// Without this, such pods would be silently ignored and their engine group
// would never be cascade-deleted.
//
// - UpdateFunc: the primary path — fires when a Running/Pending pod
// transitions to Failed/Succeeded. Pods that already have a
// deletionTimestamp are filtered out to avoid acting on pods that are
// being terminated by an ongoing cascade or DGD deletion.
//
// - DeleteFunc / GenericFunc: always suppressed — pod deletions are the
// *result* of our cascade, not triggers for one.
func failoverCascadePredicate() predicate.Predicate {
hasLabel := func(labels map[string]string) bool {
return labels[commonconsts.KubeLabelDynamoFailoverEngineGroupMember] == commonconsts.KubeLabelValueTrue
}
return predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
if !hasLabel(e.Object.GetLabels()) {
return false
}
pod, ok := e.Object.(*corev1.Pod)
if !ok {
return false
}
return isTerminalPhase(pod.Status.Phase)
},
DeleteFunc: func(e event.DeleteEvent) bool {
return false
},
GenericFunc: func(e event.GenericEvent) bool {
return false
},
UpdateFunc: func(e event.UpdateEvent) bool {
if !hasLabel(e.ObjectNew.GetLabels()) {
return false
}
// Ignore pods already being deleted — this avoids reacting to
// our own cascade-delete (which sets deletionTimestamp before
// the pod actually disappears from the cache).
if e.ObjectNew.GetDeletionTimestamp() != nil {
return false
}
newPod, ok := e.ObjectNew.(*corev1.Pod)
if !ok {
return false
}
oldPod, ok := e.ObjectOld.(*corev1.Pod)
if !ok {
return false
}
// Only trigger on actual phase transitions to avoid processing
// the same pod twice (e.g. a metadata update on an already-Failed pod).
return !isTerminalPhase(oldPod.Status.Phase) && isTerminalPhase(newPod.Status.Phase)
},
}
}
/*
* SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package controller
import (
"context"
"testing"
commonconsts "github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
)
const (
cascadeTestNamespace = "test-ns"
cascadeTestPCSG = "my-pcsg"
)
func newFailoverPod(name string, phase corev1.PodPhase, replicaIdx, podIdx string) *corev1.Pod {
return &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: cascadeTestNamespace,
Labels: map[string]string{
commonconsts.KubeLabelDynamoFailoverEngineGroupMember: commonconsts.KubeLabelValueTrue,
groveLabelPCSG: cascadeTestPCSG,
groveLabelPCSGReplicaIndex: replicaIdx,
groveLabelPodIndex: podIdx,
},
},
Status: corev1.PodStatus{Phase: phase},
}
}
func newCascadeReconciler(objs ...client.Object) (*FailoverCascadeReconciler, client.Client) {
scheme := runtime.NewScheme()
_ = corev1.AddToScheme(scheme)
cb := fake.NewClientBuilder().WithScheme(scheme).WithStatusSubresource(&corev1.Pod{})
for _, o := range objs {
cb = cb.WithObjects(o)
}
c := cb.Build()
return NewFailoverCascadeReconciler(c, record.NewFakeRecorder(16)), c
}
func TestFailoverCascade_FailedPodDeletesEntireGroup(t *testing.T) {
failedPod := newFailoverPod("ldr-0", corev1.PodFailed, "0", "0")
sibling1 := newFailoverPod("gms-0-0", corev1.PodRunning, "0", "0")
sibling2 := newFailoverPod("wkr-1-0", corev1.PodRunning, "0", "0")
r, c := newCascadeReconciler(failedPod, sibling1, sibling2)
result, err := r.Reconcile(context.Background(), ctrl.Request{
NamespacedName: types.NamespacedName{Name: "ldr-0", Namespace: cascadeTestNamespace},
})
require.NoError(t, err)
assert.Equal(t, ctrl.Result{}, result)
var remaining corev1.PodList
require.NoError(t, c.List(context.Background(), &remaining, client.InNamespace(cascadeTestNamespace)))
assert.Empty(t, remaining.Items, "all pods in the engine group should be deleted")
}
func TestFailoverCascade_SucceededPodDeletesEntireGroup(t *testing.T) {
succeededPod := newFailoverPod("ldr-0", corev1.PodSucceeded, "0", "0")
sibling := newFailoverPod("gms-0-0", corev1.PodRunning, "0", "0")
r, c := newCascadeReconciler(succeededPod, sibling)
result, err := r.Reconcile(context.Background(), ctrl.Request{
NamespacedName: types.NamespacedName{Name: "ldr-0", Namespace: cascadeTestNamespace},
})
require.NoError(t, err)
assert.Equal(t, ctrl.Result{}, result)
var remaining corev1.PodList
require.NoError(t, c.List(context.Background(), &remaining, client.InNamespace(cascadeTestNamespace)))
assert.Empty(t, remaining.Items, "succeeded pod should also trigger cascade")
}
func TestFailoverCascade_DifferentGroupUnaffected(t *testing.T) {
failedPod := newFailoverPod("ldr-0", corev1.PodFailed, "0", "0")
differentGroup := newFailoverPod("ldr-1", corev1.PodRunning, "0", "1")
r, c := newCascadeReconciler(failedPod, differentGroup)
_, err := r.Reconcile(context.Background(), ctrl.Request{
NamespacedName: types.NamespacedName{Name: "ldr-0", Namespace: cascadeTestNamespace},
})
require.NoError(t, err)
var remaining corev1.PodList
require.NoError(t, c.List(context.Background(), &remaining, client.InNamespace(cascadeTestNamespace)))
assert.Len(t, remaining.Items, 1, "only the different engine group pod should remain")
assert.Equal(t, "ldr-1", remaining.Items[0].Name)
}
func TestFailoverCascade_MultipleFailedPodsAllDeleted(t *testing.T) {
failedPod := newFailoverPod("ldr-0", corev1.PodFailed, "0", "0")
alsoFailed := newFailoverPod("wkr-1-0", corev1.PodFailed, "0", "0")
running := newFailoverPod("gms-0-0", corev1.PodRunning, "0", "0")
r, c := newCascadeReconciler(failedPod, alsoFailed, running)
_, err := r.Reconcile(context.Background(), ctrl.Request{
NamespacedName: types.NamespacedName{Name: "ldr-0", Namespace: cascadeTestNamespace},
})
require.NoError(t, err)
var remaining corev1.PodList
require.NoError(t, c.List(context.Background(), &remaining, client.InNamespace(cascadeTestNamespace)))
assert.Empty(t, remaining.Items, "all pods in the engine group should be deleted")
}
func TestFailoverCascade_PodWithoutLabelIgnored(t *testing.T) {
unlabeled := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "random-pod",
Namespace: cascadeTestNamespace,
},
Status: corev1.PodStatus{Phase: corev1.PodFailed},
}
r, _ := newCascadeReconciler(unlabeled)
result, err := r.Reconcile(context.Background(), ctrl.Request{
NamespacedName: types.NamespacedName{Name: "random-pod", Namespace: cascadeTestNamespace},
})
require.NoError(t, err)
assert.Equal(t, ctrl.Result{}, result)
}
func TestFailoverCascade_NonFailedPodIsNoop(t *testing.T) {
runningPod := newFailoverPod("ldr-0", corev1.PodRunning, "0", "0")
sibling := newFailoverPod("gms-0-0", corev1.PodRunning, "0", "0")
r, c := newCascadeReconciler(runningPod, sibling)
_, err := r.Reconcile(context.Background(), ctrl.Request{
NamespacedName: types.NamespacedName{Name: "ldr-0", Namespace: cascadeTestNamespace},
})
require.NoError(t, err)
var remaining corev1.PodList
require.NoError(t, c.List(context.Background(), &remaining, client.InNamespace(cascadeTestNamespace)))
assert.Len(t, remaining.Items, 2, "running pod should not trigger cascade")
}
func TestFailoverCascade_NotFoundPodIsNoop(t *testing.T) {
r, _ := newCascadeReconciler()
result, err := r.Reconcile(context.Background(), ctrl.Request{
NamespacedName: types.NamespacedName{Name: "gone", Namespace: cascadeTestNamespace},
})
require.NoError(t, err)
assert.Equal(t, ctrl.Result{}, result)
}
func TestFailoverCascade_MissingGroveLabelsIsNoop(t *testing.T) {
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "partial-labels",
Namespace: cascadeTestNamespace,
Labels: map[string]string{
commonconsts.KubeLabelDynamoFailoverEngineGroupMember: commonconsts.KubeLabelValueTrue,
groveLabelPCSG: "my-pcsg",
},
},
Status: corev1.PodStatus{Phase: corev1.PodFailed},
}
r, _ := newCascadeReconciler(pod)
result, err := r.Reconcile(context.Background(), ctrl.Request{
NamespacedName: types.NamespacedName{Name: "partial-labels", Namespace: cascadeTestNamespace},
})
require.NoError(t, err)
assert.Equal(t, ctrl.Result{}, result)
}
func TestFailoverCascade_DifferentPCSGReplicaUnaffected(t *testing.T) {
failedPod := newFailoverPod("ldr-0", corev1.PodFailed, "0", "0")
differentReplica := newFailoverPod("ldr-r1-0", corev1.PodRunning, "1", "0")
r, c := newCascadeReconciler(failedPod, differentReplica)
_, err := r.Reconcile(context.Background(), ctrl.Request{
NamespacedName: types.NamespacedName{Name: "ldr-0", Namespace: cascadeTestNamespace},
})
require.NoError(t, err)
var remaining corev1.PodList
require.NoError(t, c.List(context.Background(), &remaining, client.InNamespace(cascadeTestNamespace)))
assert.Len(t, remaining.Items, 1, "only the different PCSG replica pod should remain")
assert.Equal(t, "ldr-r1-0", remaining.Items[0].Name)
}
func TestFailoverCascade_DeletingPodIsSkipped(t *testing.T) {
now := metav1.Now()
failedPod := newFailoverPod("ldr-0", corev1.PodFailed, "0", "0")
failedPod.DeletionTimestamp = &now
failedPod.DeletionGracePeriodSeconds = ptr.To(int64(0))
failedPod.Finalizers = []string{"test-finalizer"}
sibling := newFailoverPod("gms-0-0", corev1.PodRunning, "0", "0")
r, c := newCascadeReconciler(failedPod, sibling)
result, err := r.Reconcile(context.Background(), ctrl.Request{
NamespacedName: types.NamespacedName{Name: "ldr-0", Namespace: cascadeTestNamespace},
})
require.NoError(t, err)
assert.Equal(t, ctrl.Result{}, result)
var remaining corev1.PodList
require.NoError(t, c.List(context.Background(), &remaining, client.InNamespace(cascadeTestNamespace)))
assert.Len(t, remaining.Items, 2, "already-deleting pod should not trigger a cascade")
}
func TestFailoverCascade_ConcurrentReconcileIsIdempotent(t *testing.T) {
pod1 := newFailoverPod("ldr-0", corev1.PodFailed, "0", "0")
pod2 := newFailoverPod("wkr-1-0", corev1.PodFailed, "0", "0")
r, c := newCascadeReconciler(pod1, pod2)
_, err := r.Reconcile(context.Background(), ctrl.Request{
NamespacedName: types.NamespacedName{Name: "ldr-0", Namespace: cascadeTestNamespace},
})
require.NoError(t, err)
// Second reconcile for the other pod — it's already gone (NotFound).
_, err = r.Reconcile(context.Background(), ctrl.Request{
NamespacedName: types.NamespacedName{Name: "wkr-1-0", Namespace: cascadeTestNamespace},
})
require.NoError(t, err)
var remaining corev1.PodList
require.NoError(t, c.List(context.Background(), &remaining, client.InNamespace(cascadeTestNamespace)))
assert.Empty(t, remaining.Items)
}
......@@ -25,7 +25,11 @@ const (
// ClaimName is the pod-level DRA ResourceClaim name for shared GPU access.
ClaimName = "intrapod-shared-gpu"
defaultDeviceClassName = "gpu.nvidia.com"
// DefaultDeviceClassName is the default DRA DeviceClass name used when a
// component does not specify an explicit gpuType. It matches the
// DeviceClass that ships with the NVIDIA DRA Driver and is the single
// source of truth for this string across the operator.
DefaultDeviceClassName = "gpu.nvidia.com"
)
// ApplyClaim replaces the first container's nvidia.com/gpu resources with a
......@@ -120,7 +124,7 @@ func GenerateResourceClaimTemplate(
}
if deviceClassName == "" {
deviceClassName = defaultDeviceClassName
deviceClassName = DefaultDeviceClassName
}
if cl != nil {
......
......@@ -100,7 +100,7 @@ func TestGenerateResourceClaimTemplate_Enabled(t *testing.T) {
assert.Equal(t, "myapp-worker-gpu", tmpl.Name)
require.Len(t, tmpl.Spec.Spec.Devices.Requests, 1)
req := tmpl.Spec.Spec.Devices.Requests[0]
assert.Equal(t, defaultDeviceClassName, req.Exactly.DeviceClassName)
assert.Equal(t, DefaultDeviceClassName, req.Exactly.DeviceClassName)
assert.Equal(t, int64(4), req.Exactly.Count)
}
......
......@@ -29,6 +29,25 @@ type VLLMBackend struct {
}
func (b *VLLMBackend) UpdateContainer(container *corev1.Container, numberOfNodes int32, role Role, component *v1alpha1.DynamoComponentDeploymentSharedSpec, serviceName string, multinodeDeployer MultinodeDeployer) {
// The inter-pod GMS layout (with or without failover) requires the engine
// to load weights from the dedicated GMS weight-server pod rather than
// from disk. --load-format gms and DYN_VLLM_GMS_SHADOW_MODE activate the
// vLLM-side GMS client path and apply to both standalone inter-pod GMS
// and inter-pod GMS + failover; the "shadow mode" name is a vLLM upstream
// naming convention, not a statement about whether shadow pods are
// present.
if component.IsInterPodGMSEnabled() {
if !containerHasArg(container, "--load-format", "gms") {
injectFlagsIntoContainerCommand(container, "--load-format gms", false, "vllm")
}
// DYN_VLLM_GMS_SHADOW_MODE is a vLLM-engine-specific switch (activates
// the vLLM-side GMS client path for shadow weight loading). It is
// injected here — in the vLLM backend — rather than in the backend-
// agnostic GMS helpers so non-vLLM backends do not inherit a stray,
// meaningless env var if/when inter-pod GMS is extended to them.
container.Env = append(container.Env, corev1.EnvVar{Name: "DYN_VLLM_GMS_SHADOW_MODE", Value: "true"})
}
isMultinode := numberOfNodes > 1
if isMultinode {
......
......@@ -980,3 +980,73 @@ func TestShouldUseMpBackend(t *testing.T) {
})
}
}
// TestVLLMBackend_UpdateContainer_InterPodGMS asserts that when the inter-pod
// GMS layout is enabled (gpuMemoryService.mode=interPod, with or without
// failover), the vLLM backend is the one responsible for injecting both the
// --load-format=gms flag and the DYN_VLLM_GMS_SHADOW_MODE env var. These are
// vLLM-runtime switches and must live in the backend adapter, not in the
// backend-agnostic GMS helpers (see gmsEngineEnvVars).
func TestVLLMBackend_UpdateContainer_InterPodGMS(t *testing.T) {
backend := &VLLMBackend{}
component := &v1alpha1.DynamoComponentDeploymentSharedSpec{
GPUMemoryService: &v1alpha1.GPUMemoryServiceSpec{
Enabled: true,
Mode: v1alpha1.GMSModeInterPod,
},
Failover: &v1alpha1.FailoverSpec{
Enabled: true,
Mode: v1alpha1.GMSModeInterPod,
},
}
container := &corev1.Container{
Command: []string{"python3"},
Args: []string{"-m", "dynamo.vllm"},
}
backend.UpdateContainer(container, 1, RoleMain, component, "svc", &GroveMultinodeDeployer{})
// --load-format gms flag must be injected into the container args.
joined := ""
for _, a := range container.Args {
joined += " " + a
}
if !reflect.DeepEqual(containerHasArg(container, "--load-format", "gms"), true) {
t.Errorf("expected --load-format gms to be injected; got args=%q", joined)
}
// DYN_VLLM_GMS_SHADOW_MODE must be set exactly once.
count := 0
for _, e := range container.Env {
if e.Name == "DYN_VLLM_GMS_SHADOW_MODE" {
count++
if e.Value != "true" {
t.Errorf("DYN_VLLM_GMS_SHADOW_MODE value = %q, want %q", e.Value, "true")
}
}
}
if count != 1 {
t.Errorf("DYN_VLLM_GMS_SHADOW_MODE env var count = %d, want 1", count)
}
}
// TestVLLMBackend_UpdateContainer_NoInterPodGMS asserts the complementary
// invariant: when inter-pod GMS failover is NOT enabled, the vLLM backend
// must not inject the GMS-specific env var (it is meaningless outside the
// inter-pod layout).
func TestVLLMBackend_UpdateContainer_NoInterPodGMS(t *testing.T) {
backend := &VLLMBackend{}
component := &v1alpha1.DynamoComponentDeploymentSharedSpec{}
container := &corev1.Container{
Command: []string{"python3"},
Args: []string{"-m", "dynamo.vllm"},
}
backend.UpdateContainer(container, 1, RoleMain, component, "svc", &GroveMultinodeDeployer{})
for _, e := range container.Env {
if e.Name == "DYN_VLLM_GMS_SHADOW_MODE" {
t.Errorf("DYN_VLLM_GMS_SHADOW_MODE must not be injected when inter-pod GMS is disabled")
}
}
}
......@@ -93,6 +93,10 @@ func (b *BaseComponentDefaults) getCommonContainer(context ComponentContext) cor
},
}
container.Env = []corev1.EnvVar{
{
Name: "CONTAINER_NAME",
Value: commonconsts.MainContainerName,
},
{
Name: commonconsts.DynamoNamespaceEnvVar,
Value: context.DynamoNamespace,
......@@ -144,10 +148,9 @@ func (b *BaseComponentDefaults) getCommonContainer(context ComponentContext) cor
}
if context.Discovery.Mode == configv1alpha1.KubeDiscoveryModeContainer {
container.Env = append(container.Env, corev1.EnvVar{
Name: "CONTAINER_NAME",
Value: container.Name,
})
// CONTAINER_NAME is already injected unconditionally above with
// MainContainerName (which equals container.Name here); do not append
// it again or we end up with two env entries of the same name.
container.Env = append(container.Env, corev1.EnvVar{
Name: "DYN_KUBE_DISCOVERY_MODE",
Value: string(configv1alpha1.KubeDiscoveryModeContainer),
......
......@@ -82,6 +82,7 @@ func TestPlannerDefaults_GetBaseContainer(t *testing.T) {
FailureThreshold: 720,
},
Env: []corev1.EnvVar{
{Name: "CONTAINER_NAME", Value: commonconsts.MainContainerName},
{Name: commonconsts.DynamoNamespaceEnvVar, Value: "dynamo-namespace"},
{Name: commonconsts.DynamoComponentEnvVar, Value: commonconsts.ComponentTypePlanner},
{Name: "DYN_PARENT_DGD_K8S_NAME", Value: "name"},
......
/*
* SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dynamo
......@@ -9,22 +21,389 @@ import (
"fmt"
"path/filepath"
"strconv"
"strings"
"github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
commonconsts "github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/dra"
gmsruntime "github.com/ai-dynamo/dynamo/deploy/operator/internal/gms"
grovev1alpha1 "github.com/ai-dynamo/grove/operator/api/core/v1alpha1"
corev1 "k8s.io/api/core/v1"
resourcev1 "k8s.io/api/resource/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/utils/ptr"
)
var failoverLockFile = filepath.Join(gmsruntime.SharedMountPath, "failover.lock")
// ──────────────────────────────────────────────────────────────────────────────
// Inter-pod GMS failover (Mode: interPod)
//
// A dedicated GMS weight server pod is created per rank. Engine pods share GPU
// memory via DRA ResourceClaims and a hostPath volume for UDS sockets.
// ──────────────────────────────────────────────────────────────────────────────
const (
gmsSharedVolumeName = "gms-shared"
gmsHostPathBase = "/run/gms"
gmsSharedMountPath = "/run/gms/shared"
gmsFailoverLockFile = "failover.lock"
gmsPermFixInitName = "fix-gms-perms"
)
// gmsWrapperScript generates a bash script that launches the GMS server
// (gpu_memory_service.cli.server), which auto-discovers DRA-allocated GPUs
// and exposes both "weights" and "kv_cache" UDS sockets per device. The
// wrapper cleans up stale sockets from a previous run, forwards SIGTERM/SIGINT
// to the process group, and propagates the GMS server's exit code so the
// container's exitCode in the Pod status reflects the actual failure mode
// (rather than always being 1).
func gmsWrapperScript() string {
return fmt.Sprintf(
`rm -f %s/gms_*.sock
rc=1
cleanup() { kill -- -$$ 2>/dev/null; exit "$rc"; }
trap cleanup SIGTERM SIGINT
python3 -m %s &
echo "Started GMS server pid=$!"
wait -n
rc=$?
echo "GMS server exited (code=$rc), shutting down"
cleanup`, gmsSharedMountPath, gmsruntime.ServerModule)
}
// gmsStartupProbeCommand returns the exec probe command that verifies the GMS
// server has opened both the weights and kv_cache UDS sockets for every
// allocated GPU (2 sockets per device).
func gmsStartupProbeCommand(gpuCount int) []string {
return []string{
"sh", "-c",
fmt.Sprintf("test $(ls %s/gms_*.sock 2>/dev/null | wc -l) -ge %d", gmsSharedMountPath, 2*gpuCount),
}
}
// applyGMSSharedResources attaches the resources common to both GMS weight
// server pods and engine pods: strips GPU limits (DRA handles allocation),
// adds the GPU toleration, mounts the rank-isolated hostPath shared volume,
// and prepends the permission-fix init container.
func applyGMSSharedResources(podSpec *corev1.PodSpec, c *corev1.Container, rank int32) {
removeGPUFromLimits(c)
addGPUToleration(podSpec)
vol, mount := gmsSharedVolume(rank)
podSpec.Volumes = append(podSpec.Volumes, vol)
c.VolumeMounts = append(c.VolumeMounts, mount)
podSpec.InitContainers = append(podSpec.InitContainers, gmsPermFixInitContainer(rank, c.Image))
}
// gmsWeightServerPodSpec builds a GMS weight server pod spec by cloning and
// modifying a base engine pod spec. The GMS pod runs a different command,
// has no liveness/readiness probes, and uses a startup probe that checks
// for the expected number of GMS UDS sockets.
//
// RestartPolicy is intentionally left unset here (i.e. inherits the base /
// Grove default, which is Always). A GMS server process holds only local
// state — GPU allocations (via DRA, which survive the container), hostPath
// UDS sockets (recreated by gmsWrapperScript on startup), and in-memory
// weight buffers (re-sharded on reconnection by the engine clients). So an
// in-place kubelet restart is a fast, correct recovery path.
//
// The paired engine pod mirrors this policy in the standalone inter-pod GMS
// layout (a restarted engine re-imports IPC handles from the still-running
// GMS server). In the inter-pod GMS failover layout, augmentEngineForGMS
// overrides the engine's RestartPolicy to Never so the cohort can only be
// recovered via FailoverCascadeReconciler; see the comment there.
func gmsWeightServerPodSpec(basePodSpec *corev1.PodSpec, rank int32, gpuCount int) *corev1.PodSpec {
podSpec := basePodSpec.DeepCopy()
if len(podSpec.Containers) == 0 {
return podSpec
}
c := &podSpec.Containers[0]
c.Command = []string{"bash", "-c"}
c.Args = []string{gmsWrapperScript()}
c.StartupProbe = &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
Exec: &corev1.ExecAction{Command: gmsStartupProbeCommand(gpuCount)},
},
PeriodSeconds: 2,
TimeoutSeconds: 2,
FailureThreshold: 150, // 2s * 150 = 5 min
}
c.LivenessProbe = nil
c.ReadinessProbe = nil
c.Env = append(c.Env, corev1.EnvVar{
Name: gmsruntime.EnvSocketDir,
Value: gmsSharedMountPath,
})
applyGMSSharedResources(podSpec, c, rank)
return podSpec
}
// gmsEngineEnvVars returns the backend-agnostic environment variables injected
// into engine pods when GMS failover is enabled. Backend-specific switches
// (e.g. the vLLM DYN_VLLM_GMS_SHADOW_MODE flag) are injected by the backend's
// UpdateContainer path so non-vLLM backends do not inherit stray env vars.
func gmsEngineEnvVars() []corev1.EnvVar {
return []corev1.EnvVar{
{
Name: "ENGINE_ID",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.labels['grove.io/podclique-pod-index']",
},
},
},
{Name: gmsruntime.EnvSocketDir, Value: gmsSharedMountPath},
{Name: "FAILOVER_LOCK_PATH", Value: gmsSharedMountPath + "/" + gmsFailoverLockFile},
{Name: "DYN_SYSTEM_STARTING_HEALTH_STATUS", Value: "notready"},
}
}
// augmentEngineForGMS modifies an engine pod spec in-place to work with the
// inter-pod GMS layout: injects env vars, shared volume, strips GPU limits,
// adds toleration, and prepends an init container to fix hostPath directory
// permissions.
//
// RestartPolicy behavior is layout-dependent and is the one asymmetry between
// standalone inter-pod GMS and inter-pod GMS failover:
//
// - Standalone inter-pod GMS (isInterPodFailover=false): RestartPolicy is
// left unset (inherits Always), matching the GMS weight-server pod. A
// crashed engine is restarted in place by kubelet; the GMS server keeps
// running and the new engine container reconnects to the existing UDS
// sockets and re-imports CUDA IPC handles during --load-format gms
// startup. There is no cohort state to protect because there is no
// cohort — just one engine paired with one GMS server per rank.
//
// - Inter-pod GMS failover (isInterPodFailover=true): RestartPolicy is
// forced to Never. Engine pods in a failover cohort hold distributed
// state that cannot survive an in-place container restart — active NCCL
// collectives, torch.distributed TCPStore membership, and primary/shadow
// coordination via the failover lock file and DYN_VLLM_GMS_SHADOW_MODE.
// An in-place restart leaves the cohort in a half-torn-down state and
// blocks recovery. The correct recovery path is for the pod to exit,
// FailoverCascadeReconciler (see failover_cascade_controller.go) to
// force-delete the full engine group based on the
// KubeLabelDynamoFailoverEngineGroupMember label, and Grove to recreate
// the cohort from scratch. That label is applied in graph.go only when
// isInterPodFailover is true, so forcing Never in the standalone case
// would strand engine pods in Failed state with nothing listening to
// force-delete them.
func augmentEngineForGMS(podSpec *corev1.PodSpec, rank int32, isInterPodFailover bool) {
if len(podSpec.Containers) == 0 {
return
}
c := &podSpec.Containers[0]
c.Env = append(c.Env, gmsEngineEnvVars()...)
removeEnvVar(c, "DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS")
applyGMSSharedResources(podSpec, c, rank)
if isInterPodFailover {
podSpec.RestartPolicy = corev1.RestartPolicyNever
}
}
// gmsSharedVolume returns a hostPath volume and mount with a subPathExpr that
// isolates the shared directory per PCSG replica and per rank.
func gmsSharedVolume(rank int32) (corev1.Volume, corev1.VolumeMount) {
hostPathType := corev1.HostPathDirectoryOrCreate
vol := corev1.Volume{
Name: gmsSharedVolumeName,
VolumeSource: corev1.VolumeSource{
HostPath: &corev1.HostPathVolumeSource{
Path: gmsHostPathBase,
Type: &hostPathType,
},
},
}
mount := corev1.VolumeMount{
Name: gmsSharedVolumeName,
MountPath: gmsSharedMountPath,
SubPathExpr: fmt.Sprintf("$(GROVE_PCSG_NAME)-$(GROVE_PCSG_INDEX)/rank-%d", rank),
}
return vol, mount
}
// gmsPermFixInitContainer returns an init container that runs as root and
// fixes the hostPath directory permissions so the non-root application user
// can write UDS sockets and lock files. It uses the same subPathExpr as the
// main container so kubelet creates the isolated subdirectory first.
func gmsPermFixInitContainer(rank int32, image string) corev1.Container {
_, mount := gmsSharedVolume(rank)
return corev1.Container{
Name: gmsPermFixInitName,
Image: image,
Command: []string{"sh", "-c", fmt.Sprintf("chmod 1777 %s", gmsSharedMountPath)},
SecurityContext: &corev1.SecurityContext{
// Must run as uid 0 to chmod the hostPath mount for the non-root
// engine/server processes. Explicitly set RunAsNonRoot=false so
// cluster-wide baseline/restricted PodSecurity policies and some
// pod-level SecurityContext defaults do not silently reject this
// init container on admission.
RunAsUser: ptr.To[int64](0),
RunAsNonRoot: ptr.To(false),
},
VolumeMounts: []corev1.VolumeMount{mount},
}
}
// removeGPUFromLimits strips nvidia.com/gpu from the container's resource
// limits and requests because DRA handles GPU allocation for GMS pods.
func removeGPUFromLimits(c *corev1.Container) {
delete(c.Resources.Limits, "nvidia.com/gpu")
delete(c.Resources.Requests, "nvidia.com/gpu")
}
// addGPUToleration ensures pods without explicit GPU limits still get
// scheduled on GPU nodes.
func addGPUToleration(podSpec *corev1.PodSpec) {
toleration := corev1.Toleration{
Key: "nvidia.com/gpu",
Operator: corev1.TolerationOpExists,
Effect: corev1.TaintEffectNoSchedule,
}
for _, t := range podSpec.Tolerations {
if t.Key == toleration.Key && t.Effect == toleration.Effect {
return
}
}
podSpec.Tolerations = append(podSpec.Tolerations, toleration)
}
// removeEnvVar removes all occurrences of the named env var from a container.
func removeEnvVar(c *corev1.Container, name string) {
filtered := c.Env[:0]
for _, e := range c.Env {
if e.Name != name {
filtered = append(filtered, e)
}
}
c.Env = filtered
}
// getGPUCount extracts the GPU count from the component's resource limits.
func getGPUCount(resources *v1alpha1.Resources) int32 {
if resources == nil || resources.Limits == nil || resources.Limits.GPU == "" {
return 0
}
if n, err := strconv.ParseInt(resources.Limits.GPU, 10, 32); err == nil {
return int32(n)
}
return 0
}
// getDeviceClassName returns the DRA device class name from gpuType,
// falling back to the default device class shipped with the NVIDIA DRA
// driver. The literal "gpu.nvidia.com" is intentionally not duplicated
// here — it is the single source of truth in the dra package.
func getDeviceClassName(resources *v1alpha1.Resources) string {
if resources != nil && resources.Limits != nil && resources.Limits.GPUType != "" {
return resources.Limits.GPUType
}
return dra.DefaultDeviceClassName
}
// gmsRCTName returns a deterministic ResourceClaimTemplate name for a given rank.
func gmsRCTName(serviceName string, rank int32) string {
return fmt.Sprintf("%s-gpu-rank-%d", serviceName, rank)
}
// gmsResourceClaimTemplateConfigs builds one PCS-level ResourceClaimTemplateConfig
// per rank. Each RCT has the same GPU spec but a distinct per-rank name so that
// each rank's GMS + engine pods get their own ResourceClaim.
func gmsResourceClaimTemplateConfigs(serviceName string, resources *v1alpha1.Resources, roles []ServiceRole) []grovev1alpha1.ResourceClaimTemplateConfig {
seen := map[int32]bool{}
configs := make([]grovev1alpha1.ResourceClaimTemplateConfig, 0, len(roles))
for _, r := range roles {
if seen[r.Rank] {
continue
}
seen[r.Rank] = true
configs = append(configs, grovev1alpha1.ResourceClaimTemplateConfig{
Name: gmsRCTName(serviceName, r.Rank),
TemplateSpec: resourcev1.ResourceClaimTemplateSpec{
Spec: resourcev1.ResourceClaimSpec{
Devices: resourcev1.DeviceClaim{
Requests: []resourcev1.DeviceRequest{
{
Name: "gpu",
Exactly: &resourcev1.ExactDeviceRequest{
DeviceClassName: getDeviceClassName(resources),
AllocationMode: resourcev1.DeviceAllocationModeExactCount,
Count: int64(getGPUCount(resources)),
},
},
},
},
},
},
})
}
return configs
}
// gmsResourceSharingEntries builds one PCSG-level ResourceSharingSpec per rank.
// Each entry uses PerReplica scope and a filter listing only the GMS clique
// and the engine clique for that rank, ensuring GPU isolation between ranks.
func gmsResourceSharingEntries(serviceName string, roles []ServiceRole) []grovev1alpha1.PCSGResourceSharingSpec {
type rankGroup struct {
cliqueNames []string
}
groups := map[int32]*rankGroup{}
var rankOrder []int32
for _, r := range roles {
g, ok := groups[r.Rank]
if !ok {
g = &rankGroup{}
groups[r.Rank] = g
rankOrder = append(rankOrder, r.Rank)
}
g.cliqueNames = append(g.cliqueNames, strings.ToLower(r.Name))
}
refs := make([]grovev1alpha1.PCSGResourceSharingSpec, 0, len(groups))
for _, rank := range rankOrder {
g := groups[rank]
refs = append(refs, grovev1alpha1.PCSGResourceSharingSpec{
ResourceSharingSpec: grovev1alpha1.ResourceSharingSpec{
Name: gmsRCTName(serviceName, rank),
Scope: grovev1alpha1.ResourceSharingScopePerReplica,
},
Filter: &grovev1alpha1.PCSGResourceSharingFilter{
ChildCliqueNames: g.cliqueNames,
},
})
}
return refs
}
// ──────────────────────────────────────────────────────────────────────────────
// Intra-pod GMS failover (Mode: intraPod)
//
// The main container is cloned into two engine containers (active + standby)
// within the same pod. GPU access is shared via DRA and a GMS sidecar
// injects weights via the shared emptyDir volume.
// ──────────────────────────────────────────────────────────────────────────────
// intraPodFailoverLockFile is the lock file path used by engine containers to
// coordinate active/standby election within the same pod.
var intraPodFailoverLockFile = filepath.Join(gmsruntime.SharedMountPath, "failover.lock")
const (
failoverEngineCount = 2
)
// isFailoverEnabled returns true only for intra-pod failover mode, where the
// main container is cloned into active + standby containers within the same pod.
// Inter-pod failover (Mode=interPod) is handled separately via expandRolesForService
// and generatePodSpecForRole — it does not use container cloning.
func isFailoverEnabled(component *v1alpha1.DynamoComponentDeploymentSharedSpec) bool {
return component.Failover != nil && component.Failover.Enabled
return component.Failover != nil && component.Failover.Enabled &&
component.Failover.Mode == v1alpha1.GMSModeIntraPod
}
// buildFailoverPod clones the main container into two engine containers (active + standby).
......@@ -95,11 +474,10 @@ func buildEngineContainer(base corev1.Container, engineID int, systemPort int) c
}
}
containerName := fmt.Sprintf("engine-%d", engineID)
failoverEnvs := []corev1.EnvVar{
{Name: "ENGINE_ID", Value: strconv.Itoa(engineID)},
{Name: "CONTAINER_NAME", Value: containerName},
{Name: "FAILOVER_LOCK_PATH", Value: failoverLockFile},
{Name: "CONTAINER_NAME", Value: engine.Name},
{Name: "FAILOVER_LOCK_PATH", Value: intraPodFailoverLockFile},
{Name: "DYN_SYSTEM_STARTING_HEALTH_STATUS", Value: "notready"},
{Name: "DYN_SYSTEM_PORT", Value: strconv.Itoa(systemPort)},
{Name: "DYN_SYSTEM_ENABLED", Value: "true"},
......
This diff is collapsed.
......@@ -23,31 +23,55 @@ import (
type GroveMultinodeDeployer struct {
MultinodeDeployer
// IsInterPodGMS is true when this deployer produces pod specs for an
// engine PCLQ that uses the inter-pod GMS *layout* (one engine pod per
// rank, per shadow, with a dedicated GMS weight server pod). It is a
// layout/topology flag — not a failover policy flag — and governs how
// hostnames, node ranks, and per-pod wiring are computed. Today this
// layout is only produced when inter-pod GMS failover is enabled, but
// the deployer itself should not encode that assumption.
IsInterPodGMS bool
Rank int32 // explicit node rank (used when IsInterPodGMS is true)
}
func (d *GroveMultinodeDeployer) GetLeaderHostname(serviceName string) string {
return fmt.Sprintf("$(GROVE_PCSG_NAME)-$(GROVE_PCSG_INDEX)-%s-%s-0.$(GROVE_HEADLESS_SERVICE)", strings.ToLower(serviceName), commonconsts.GroveRoleSuffixLeader)
if d.IsInterPodGMS {
// GMS: each PCLQ has multiple replicas; pods at the same index across
// ranks form a communication group, so use the dynamic pod index.
return fmt.Sprintf("$(GROVE_PCSG_NAME)-$(GROVE_PCSG_INDEX)-%s-%s-$(GROVE_PCLQ_POD_INDEX).$(GROVE_HEADLESS_SERVICE)",
strings.ToLower(serviceName), commonconsts.GroveRoleSuffixLeader)
}
return fmt.Sprintf("$(GROVE_PCSG_NAME)-$(GROVE_PCSG_INDEX)-%s-%s-0.$(GROVE_HEADLESS_SERVICE)",
strings.ToLower(serviceName), commonconsts.GroveRoleSuffixLeader)
}
func (d *GroveMultinodeDeployer) GetNodeRank() (string, bool) {
// This requires shell expansion for arithmetic expression
if d.IsInterPodGMS {
return fmt.Sprintf("%d", d.Rank), false
}
return "$((GROVE_PCLQ_POD_INDEX + 1))", true
}
func (d *GroveMultinodeDeployer) NeedsDNSWait() bool {
// Grove doesn't need DNS wait - it handles startup coordination differently
return false
}
func (d *GroveMultinodeDeployer) GetHostNames(serviceName string, numberOfNodes int32) []string {
hostnames := make([]string, 0, numberOfNodes)
leaderHostname := d.GetLeaderHostname(serviceName)
hostnames = append(hostnames, leaderHostname)
// Add worker hostnames
hostnames = append(hostnames, d.GetLeaderHostname(serviceName))
if d.IsInterPodGMS {
for rank := int32(1); rank < numberOfNodes; rank++ {
hostname := fmt.Sprintf("$(GROVE_PCSG_NAME)-$(GROVE_PCSG_INDEX)-%s-%s-%d-$(GROVE_PCLQ_POD_INDEX).$(GROVE_HEADLESS_SERVICE)",
strings.ToLower(serviceName), commonconsts.GroveRoleSuffixWorker, rank)
hostnames = append(hostnames, hostname)
}
} else {
for i := int32(0); i < numberOfNodes-1; i++ {
workerHostname := fmt.Sprintf("$(GROVE_PCSG_NAME)-$(GROVE_PCSG_INDEX)-%s-%s-%d.$(GROVE_HEADLESS_SERVICE)",
hostname := fmt.Sprintf("$(GROVE_PCSG_NAME)-$(GROVE_PCSG_INDEX)-%s-%s-%d.$(GROVE_HEADLESS_SERVICE)",
strings.ToLower(serviceName), commonconsts.GroveRoleSuffixWorker, i)
hostnames = append(hostnames, workerHostname)
hostnames = append(hostnames, hostname)
}
}
return hostnames
}
......@@ -63,18 +87,16 @@ func GetComponentReadinessAndServiceReplicaStatuses(ctx context.Context, client
serviceStatuses := make(map[string]v1alpha1.ServiceReplicaStatus, len(dgd.Spec.Services))
for serviceName, component := range dgd.Spec.Services {
isMultinode := component.GetNumberOfNodes() > 1
usesPCSG := component.GetNumberOfNodes() > 1 || component.IsInterPodGMSEnabled()
resourceName := fmt.Sprintf("%s-0-%s", dgd.Name, strings.ToLower(serviceName))
if isMultinode {
// Check PodCliqueScalingGroup: spec.replicas == status.availableReplicas
if usesPCSG {
ok, reason, serviceStatus := CheckPCSGReady(ctx, client, resourceName, dgd.Namespace, logger)
serviceStatuses[serviceName] = serviceStatus
if !ok {
notReadyComponents = append(notReadyComponents, fmt.Sprintf("pcsg/%s: %s", resourceName, reason))
}
} else {
// Check PodClique: spec.replicas == status.readyReplicas
ok, reason, serviceStatus := CheckPodCliqueReady(ctx, client, resourceName, dgd.Namespace, logger)
serviceStatuses[serviceName] = serviceStatus
if !ok {
......
......@@ -54,6 +54,27 @@ func shellQuoteForBashC(s string) string {
return s
}
// containerHasArg reports whether the container already carries the given
// flag/value pair in its Args (either as adjacent tokens "flag", "value" or
// as a single token "flag=value" or "flag value" embedded inside a shell
// string). It is used to make flag injection idempotent.
func containerHasArg(container *corev1.Container, flag, value string) bool {
if container == nil {
return false
}
joined := flag + " " + value
equals := flag + "=" + value
for i, arg := range container.Args {
if strings.Contains(arg, joined) || strings.Contains(arg, equals) {
return true
}
if arg == flag && i+1 < len(container.Args) && container.Args[i+1] == value {
return true
}
}
return false
}
func injectFlagsIntoContainerCommand(container *corev1.Container, flags string, needsShell bool, framework string) {
if len(container.Command) > 0 && isPythonCommand(container.Command[0]) {
// Direct python command case
......
......@@ -43,6 +43,11 @@ const (
// Pod names follow formats like: <pcs-name>-<pcs-index>-<pcsg-name>-<pcsg-index>-<pclq-name>-<random>
// The random string and hyphens consume additional characters, leaving 45 for the resource names.
maxCombinedResourceNameLength = 45
// backendFrameworkVLLM is the spec.backendFramework value that identifies
// a vLLM deployment. Duplicated here (instead of importing from
// internal/dynamo) to avoid a webhook -> dynamo import cycle.
backendFrameworkVLLM = "vllm"
)
// DynamoGraphDeploymentValidator validates DynamoGraphDeployment resources.
......@@ -50,21 +55,24 @@ const (
type DynamoGraphDeploymentValidator struct {
deployment *nvidiacomv1alpha1.DynamoGraphDeployment
mgr ctrl.Manager // Optional: for API group detection via discovery client
groveEnabled bool
}
// NewDynamoGraphDeploymentValidator creates a new validator for DynamoGraphDeployment.
func NewDynamoGraphDeploymentValidator(deployment *nvidiacomv1alpha1.DynamoGraphDeployment) *DynamoGraphDeploymentValidator {
// groveEnabled should reflect the operator's runtime config (global.grove.enabled).
func NewDynamoGraphDeploymentValidator(deployment *nvidiacomv1alpha1.DynamoGraphDeployment, groveEnabled bool) *DynamoGraphDeploymentValidator {
return &DynamoGraphDeploymentValidator{
deployment: deployment,
mgr: nil,
groveEnabled: groveEnabled,
}
}
// NewDynamoGraphDeploymentValidatorWithManager creates a validator with a manager for API group detection.
func NewDynamoGraphDeploymentValidatorWithManager(deployment *nvidiacomv1alpha1.DynamoGraphDeployment, mgr ctrl.Manager) *DynamoGraphDeploymentValidator {
func NewDynamoGraphDeploymentValidatorWithManager(deployment *nvidiacomv1alpha1.DynamoGraphDeployment, mgr ctrl.Manager, groveEnabled bool) *DynamoGraphDeploymentValidator {
return &DynamoGraphDeploymentValidator{
deployment: deployment,
mgr: mgr,
groveEnabled: groveEnabled,
}
}
......@@ -176,6 +184,44 @@ func (v *DynamoGraphDeploymentValidator) validateImmutableFields(old *nvidiacomv
}
}
// Validate inter-pod GMS layout and failover immutability.
//
// Flipping the inter-pod GMS layout or toggling failover within an
// inter-pod layout both change the PodClique topology (weight-server PCLQ,
// per-rank engine PCLQs, shadow PCLQs, DRA ResourceClaimTemplates), which
// Grove cannot transform in place. Force the user to delete and recreate.
for serviceName, newService := range v.deployment.Spec.Services {
oldService, exists := old.Spec.Services[serviceName]
if !exists {
continue
}
oldInterPodGMS := oldService.IsInterPodGMSEnabled()
newInterPodGMS := newService.IsInterPodGMSEnabled()
if oldInterPodGMS != newInterPodGMS {
errs = append(errs, fmt.Errorf(
"spec.services[%s].gpuMemoryService.mode: the inter-pod GMS layout cannot be toggled after creation; "+
"delete and recreate the DynamoGraphDeployment",
serviceName,
))
}
oldInterPodFailover := oldService.IsInterPodFailoverEnabled()
newInterPodFailover := newService.IsInterPodFailoverEnabled()
if oldInterPodFailover != newInterPodFailover {
errs = append(errs, fmt.Errorf(
"spec.services[%s].failover: inter-pod GMS failover cannot be toggled after creation; "+
"delete and recreate the DynamoGraphDeployment",
serviceName,
))
}
if oldInterPodFailover && newInterPodFailover && oldService.Failover.NumShadows != newService.Failover.NumShadows {
errs = append(errs, fmt.Errorf(
"spec.services[%s].failover.numShadows is immutable for inter-pod GMS failover; "+
"delete and recreate the DynamoGraphDeployment to change it",
serviceName,
))
}
}
// Validate topology constraint immutability
if err := v.validateTopologyConstraintImmutability(old); err != nil {
errs = append(errs, err)
......@@ -279,6 +325,41 @@ func (v *DynamoGraphDeploymentValidator) validateReplicasChanges(old *nvidiacomv
// validateService validates a single service configuration using SharedSpecValidator.
// Returns warnings and error.
func (v *DynamoGraphDeploymentValidator) validateService(ctx context.Context, serviceName string, service *nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec) (admission.Warnings, error) {
// The inter-pod GMS layout (with or without failover) requires the Grove
// pathway: the weight-server pod, per-rank PCLQs, and DRA ResourceClaim
// templates are all wired at the PodCliqueScalingGroup level, which only
// the Grove renderer produces.
if service.IsInterPodGMSEnabled() && !v.isGrovePathway() {
if !v.groveEnabled {
return nil, fmt.Errorf(
"spec.services[%s]: gpuMemoryService.mode=%q requires the Grove pathway, but Grove is disabled at the operator level (global.grove.enabled=false)",
serviceName, nvidiacomv1alpha1.GMSModeInterPod)
}
return nil, fmt.Errorf(
"spec.services[%s]: gpuMemoryService.mode=%q requires the Grove pathway; remove or unset the %q annotation (currently %q)",
serviceName, nvidiacomv1alpha1.GMSModeInterPod,
consts.KubeAnnotationEnableGrove, v.deployment.Annotations[consts.KubeAnnotationEnableGrove])
}
// The inter-pod GMS layout is currently implemented only for vLLM (the
// engine relies on vLLM-specific runtime hooks like --load-format gms and
// DYN_VLLM_GMS_SHADOW_MODE that activate the GMS client path). Fail fast
// at admission rather than producing a broken deployment when another or
// no backend is configured — an empty BackendFramework means the operator
// cannot confirm the engine speaks vLLM, which is a hard prerequisite for
// inter-pod GMS (both standalone and with failover).
if service.IsInterPodGMSEnabled() &&
v.deployment.Spec.BackendFramework != backendFrameworkVLLM {
detected := v.deployment.Spec.BackendFramework
if detected == "" {
detected = "<unset>"
}
return nil, fmt.Errorf(
"spec.services[%s]: the inter-pod GMS layout (gpuMemoryService.mode=%q) is currently supported only for vLLM (detected: %s); "+
"set spec.backendFramework=%q",
serviceName, nvidiacomv1alpha1.GMSModeInterPod, detected, backendFrameworkVLLM)
}
// Validate service name length constraints for Grove PodCliqueSet naming
// Only validate when Grove pathway may be in use
if v.isGrovePathway() {
......@@ -318,44 +399,69 @@ func (v *DynamoGraphDeploymentValidator) validateServiceNameLength(serviceName s
dgdName := v.deployment.Name
lowerServiceName := strings.ToLower(serviceName)
// Check if this is a multinode service
isMultinode := service.GetNumberOfNodes() > 1
isInterPodGMS := service.IsInterPodGMSEnabled()
// Determine the longest PodClique name that will be generated.
// Grove validates: len(PCS name) + len(PCSG name) + len(PCLQ name) <= 45
var longestPCLQName string
var pcsgName string
switch {
case isInterPodGMS:
// GMS services always get a PCSG named after the service.
// Longest PCLQ name is "serviceName-gms-0" (len + 6) or "serviceName-wkr-N".
pcsgName = lowerServiceName
gmsName := fmt.Sprintf("%s-%s-0", lowerServiceName, consts.GroveRoleSuffixGMS)
longestPCLQName = gmsName
if isMultinode {
// For multinode: PodCliqueSet name + PodCliqueScalingGroup name + PodClique name (with leader suffix)
// The PodClique name is serviceName + "-ldr" (using GroveRoleSuffixLeader)
leaderPodCliqueName := lowerServiceName + "-" + consts.GroveRoleSuffixLeader
combinedLength := len(dgdName) + len(lowerServiceName) + len(leaderPodCliqueName)
// For high node counts, "svc-wkr-NN" can be longer than "svc-gms-0"
maxRank := service.GetNumberOfNodes() - 1
workerName := fmt.Sprintf("%s-%s-%d", lowerServiceName, consts.GroveRoleSuffixWorker, maxRank)
if len(workerName) > len(longestPCLQName) {
longestPCLQName = workerName
}
}
case isMultinode:
pcsgName = lowerServiceName
longestPCLQName = lowerServiceName + "-" + consts.GroveRoleSuffixLeader
default:
// Single-node non-GMS: no PCSG, only PCS + PCLQ
combinedLength := len(dgdName) + len(lowerServiceName)
if combinedLength > maxCombinedResourceNameLength {
return fmt.Errorf("spec.services[%s]: combined resource name length %d exceeds %d-character limit required for pod naming. "+
"Consider shortening the DynamoGraphDeployment name '%s' (length %d) or service name '%s' (length %d). "+
"For multinode services, the combined length of DGD name + service name + service name with role suffix (e.g., '%s-ldr') must not exceed %d characters",
"The combined length of DGD name + service name must not exceed %d characters",
serviceName, combinedLength, maxCombinedResourceNameLength,
dgdName, len(dgdName), serviceName, len(serviceName),
lowerServiceName, maxCombinedResourceNameLength)
maxCombinedResourceNameLength)
}
return nil
}
} else {
// For single-node: PodCliqueSet name + PodClique name
combinedLength := len(dgdName) + len(lowerServiceName)
// For services with PCSG: PCS name + PCSG name + longest PCLQ name
combinedLength := len(dgdName) + len(pcsgName) + len(longestPCLQName)
if combinedLength > maxCombinedResourceNameLength {
return fmt.Errorf("spec.services[%s]: combined resource name length %d exceeds %d-character limit required for pod naming. "+
"Consider shortening the DynamoGraphDeployment name '%s' (length %d) or service name '%s' (length %d). "+
"The combined length of DGD name + service name must not exceed %d characters",
"The combined length of DGD name + PCSG name + longest PodClique name ('%s') must not exceed %d characters",
serviceName, combinedLength, maxCombinedResourceNameLength,
dgdName, len(dgdName), serviceName, len(serviceName),
maxCombinedResourceNameLength)
}
longestPCLQName, maxCombinedResourceNameLength)
}
return nil
}
// isGrovePathway determines if Grove pathway may be used for this deployment.
// Grove is used when the nvidia.com/enable-grove annotation is NOT explicitly set to "false".
// This is a conservative check - if Grove might be used, we validate the name length constraints.
// Grove requires both operator-level enablement (global.grove.enabled) and the
// per-DGD annotation not being explicitly set to "false".
func (v *DynamoGraphDeploymentValidator) isGrovePathway() bool {
if !v.groveEnabled {
return false
}
return v.deployment.Annotations == nil ||
strings.ToLower(v.deployment.Annotations[consts.KubeAnnotationEnableGrove]) != consts.KubeLabelValueFalse
}
......@@ -797,18 +903,22 @@ func (v *DynamoGraphDeploymentValidator) validateNoRestartDuringRollingUpdate(ol
}
// validateFailoverRequiresDiscoveryMode checks that when any service has
// failover enabled, the DGD carries the nvidia.com/dynamo-kube-discovery-mode
// annotation set to "container". Failover pods produce multiple engine
// containers that each need their own discovery identity.
// intra-pod failover enabled, the DGD carries the nvidia.com/dynamo-kube-discovery-mode
// annotation set to "container". Intra-pod failover produces multiple engine
// containers within the same pod that each need their own discovery identity.
// Inter-pod failover uses separate pods, so the annotation is not required.
func (v *DynamoGraphDeploymentValidator) validateFailoverRequiresDiscoveryMode() error {
hasFailover := false
hasIntraPodFailover := false
for _, svc := range v.deployment.Spec.Services {
if svc != nil && svc.Failover != nil && svc.Failover.Enabled {
hasFailover = true
if svc == nil || svc.Failover == nil || !svc.Failover.Enabled {
continue
}
if svc.Failover.Mode == nvidiacomv1alpha1.GMSModeIntraPod {
hasIntraPodFailover = true
break
}
}
if !hasFailover {
if !hasIntraPodFailover {
return nil
}
......
......@@ -43,15 +43,18 @@ const (
type DynamoGraphDeploymentHandler struct {
mgr manager.Manager
operatorPrincipal string
groveEnabled bool
}
// NewDynamoGraphDeploymentHandler creates a new handler for DynamoGraphDeployment Webhook.
// operatorPrincipal is the full Kubernetes SA username of the operator, used to authorize
// replica changes on scaling-adapter-enabled services (#7656).
func NewDynamoGraphDeploymentHandler(mgr manager.Manager, operatorPrincipal string) *DynamoGraphDeploymentHandler {
// groveEnabled reflects the operator's runtime config (global.grove.enabled).
func NewDynamoGraphDeploymentHandler(mgr manager.Manager, operatorPrincipal string, groveEnabled bool) *DynamoGraphDeploymentHandler {
return &DynamoGraphDeploymentHandler{
mgr: mgr,
operatorPrincipal: operatorPrincipal,
groveEnabled: groveEnabled,
}
}
......@@ -67,7 +70,7 @@ func (h *DynamoGraphDeploymentHandler) ValidateCreate(ctx context.Context, obj r
logger.Info("validate create", "name", deployment.Name, "namespace", deployment.Namespace)
// Create validator with manager for API group detection and perform validation
validator := NewDynamoGraphDeploymentValidatorWithManager(deployment, h.mgr)
validator := NewDynamoGraphDeploymentValidatorWithManager(deployment, h.mgr, h.groveEnabled)
return validator.Validate(ctx)
}
......@@ -94,7 +97,7 @@ func (h *DynamoGraphDeploymentHandler) ValidateUpdate(ctx context.Context, oldOb
}
// Create validator with manager for API group detection and perform validation.
validator := NewDynamoGraphDeploymentValidatorWithManager(newDeployment, h.mgr)
validator := NewDynamoGraphDeploymentValidatorWithManager(newDeployment, h.mgr, h.groveEnabled)
warnings, err := validator.Validate(ctx)
if err != nil {
return warnings, err
......
......@@ -42,6 +42,7 @@ func TestDynamoGraphDeploymentValidator_Validate(t *testing.T) {
tests := []struct {
name string
deployment *nvidiacomv1alpha1.DynamoGraphDeployment
groveEnabled bool
wantErr bool
errMsg string
errContains bool
......@@ -511,6 +512,7 @@ func TestDynamoGraphDeploymentValidator_Validate(t *testing.T) {
// Service name length validation tests
{
name: "service name too long for single-node deployment",
groveEnabled: true,
deployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "verylongdynamographdeploymentname",
......@@ -528,6 +530,7 @@ func TestDynamoGraphDeploymentValidator_Validate(t *testing.T) {
},
{
name: "service name too long for multinode deployment",
groveEnabled: true,
deployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "vllm-agg",
......@@ -549,6 +552,7 @@ func TestDynamoGraphDeploymentValidator_Validate(t *testing.T) {
},
{
name: "valid service name length for single-node",
groveEnabled: true,
deployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "dgd",
......@@ -564,6 +568,7 @@ func TestDynamoGraphDeploymentValidator_Validate(t *testing.T) {
},
{
name: "valid service name length for multinode",
groveEnabled: true,
deployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "dgd",
......@@ -583,6 +588,7 @@ func TestDynamoGraphDeploymentValidator_Validate(t *testing.T) {
},
{
name: "boundary case - exactly at 45 char limit for single-node",
groveEnabled: true,
deployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
// DGD name (3 chars) + service name (42 chars) = 45 chars (exactly at limit)
......@@ -600,6 +606,7 @@ func TestDynamoGraphDeploymentValidator_Validate(t *testing.T) {
},
{
name: "boundary case - one char over limit for single-node",
groveEnabled: true,
deployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
// DGD name (3 chars) + service name (43 chars) = 46 chars (over limit)
......@@ -620,6 +627,7 @@ func TestDynamoGraphDeploymentValidator_Validate(t *testing.T) {
// Grove disabled tests - service name length validation should be skipped
{
name: "long service name allowed when Grove disabled via annotation",
groveEnabled: true,
deployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "verylongdynamographdeploymentname",
......@@ -638,6 +646,7 @@ func TestDynamoGraphDeploymentValidator_Validate(t *testing.T) {
},
{
name: "long multinode service name allowed when Grove disabled via annotation",
groveEnabled: true,
deployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "vllm-agg",
......@@ -660,6 +669,7 @@ func TestDynamoGraphDeploymentValidator_Validate(t *testing.T) {
},
{
name: "Grove annotation case insensitive - FALSE",
groveEnabled: true,
deployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "verylongdynamographdeploymentname",
......@@ -676,6 +686,280 @@ func TestDynamoGraphDeploymentValidator_Validate(t *testing.T) {
},
wantErr: false,
},
// GMS failover validation test cases
{
name: "valid GMS failover single-node with GPU",
groveEnabled: true,
deployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-gms",
Namespace: "default",
},
Spec: nvidiacomv1alpha1.DynamoGraphDeploymentSpec{
BackendFramework: "vllm",
Services: map[string]*nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
"worker": {
ComponentType: consts.ComponentTypeWorker,
GPUMemoryService: &nvidiacomv1alpha1.GPUMemoryServiceSpec{
Enabled: true,
Mode: nvidiacomv1alpha1.GMSModeInterPod,
},
Failover: &nvidiacomv1alpha1.FailoverSpec{
Enabled: true,
Mode: nvidiacomv1alpha1.GMSModeInterPod,
NumShadows: 1,
},
Resources: &nvidiacomv1alpha1.Resources{
Limits: &nvidiacomv1alpha1.ResourceItem{GPU: "8"},
},
},
},
},
},
wantErr: false,
},
{
name: "valid standalone inter-pod GMS (no failover) single-node with GPU",
groveEnabled: true,
deployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-gms-standalone",
Namespace: "default",
},
Spec: nvidiacomv1alpha1.DynamoGraphDeploymentSpec{
BackendFramework: "vllm",
Services: map[string]*nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
"worker": {
ComponentType: consts.ComponentTypeWorker,
GPUMemoryService: &nvidiacomv1alpha1.GPUMemoryServiceSpec{
Enabled: true,
Mode: nvidiacomv1alpha1.GMSModeInterPod,
},
Resources: &nvidiacomv1alpha1.Resources{
Limits: &nvidiacomv1alpha1.ResourceItem{GPU: "8"},
},
},
},
},
},
wantErr: false,
},
{
name: "GMS failover without GPU",
groveEnabled: true,
deployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-gms",
Namespace: "default",
},
Spec: nvidiacomv1alpha1.DynamoGraphDeploymentSpec{
BackendFramework: "vllm",
Services: map[string]*nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
"worker": {
ComponentType: consts.ComponentTypeWorker,
GPUMemoryService: &nvidiacomv1alpha1.GPUMemoryServiceSpec{
Enabled: true,
Mode: nvidiacomv1alpha1.GMSModeInterPod,
},
Failover: &nvidiacomv1alpha1.FailoverSpec{
Enabled: true,
Mode: nvidiacomv1alpha1.GMSModeInterPod,
NumShadows: 1,
},
},
},
},
},
wantErr: true,
errContains: true,
// validateGPUMemoryService fires first when the inter-pod layout
// is declared without any GPU resources.
errMsg: "requires resources.limits.gpu",
},
{
name: "inter-pod GMS on frontend component rejected",
groveEnabled: true,
deployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-gms",
Namespace: "default",
},
Spec: nvidiacomv1alpha1.DynamoGraphDeploymentSpec{
BackendFramework: "vllm",
Services: map[string]*nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
"fe": {
ComponentType: "frontend",
GPUMemoryService: &nvidiacomv1alpha1.GPUMemoryServiceSpec{
Enabled: true,
Mode: nvidiacomv1alpha1.GMSModeInterPod,
},
Resources: &nvidiacomv1alpha1.Resources{
Limits: &nvidiacomv1alpha1.ResourceItem{GPU: "1"},
},
},
},
},
},
wantErr: true,
errContains: true,
errMsg: "GPU memory service is only supported for worker components",
},
{
name: "GMS failover requires Grove pathway - annotation disabled",
groveEnabled: true,
deployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-gms",
Namespace: "default",
Annotations: map[string]string{
consts.KubeAnnotationEnableGrove: "false",
},
},
Spec: nvidiacomv1alpha1.DynamoGraphDeploymentSpec{
BackendFramework: "vllm",
Services: map[string]*nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
"worker": {
ComponentType: consts.ComponentTypeWorker,
GPUMemoryService: &nvidiacomv1alpha1.GPUMemoryServiceSpec{
Enabled: true,
Mode: nvidiacomv1alpha1.GMSModeInterPod,
},
Failover: &nvidiacomv1alpha1.FailoverSpec{
Enabled: true,
Mode: nvidiacomv1alpha1.GMSModeInterPod,
NumShadows: 1,
},
Resources: &nvidiacomv1alpha1.Resources{
Limits: &nvidiacomv1alpha1.ResourceItem{GPU: "8"},
},
},
},
},
},
wantErr: true,
errContains: true,
errMsg: "requires the Grove pathway",
},
{
name: "GMS failover requires Grove pathway - operator grove disabled",
groveEnabled: false,
deployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-gms",
Namespace: "default",
},
Spec: nvidiacomv1alpha1.DynamoGraphDeploymentSpec{
BackendFramework: "vllm",
Services: map[string]*nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
"worker": {
ComponentType: consts.ComponentTypeWorker,
GPUMemoryService: &nvidiacomv1alpha1.GPUMemoryServiceSpec{
Enabled: true,
Mode: nvidiacomv1alpha1.GMSModeInterPod,
},
Failover: &nvidiacomv1alpha1.FailoverSpec{
Enabled: true,
Mode: nvidiacomv1alpha1.GMSModeInterPod,
NumShadows: 1,
},
Resources: &nvidiacomv1alpha1.Resources{
Limits: &nvidiacomv1alpha1.ResourceItem{GPU: "8"},
},
},
},
},
},
wantErr: true,
errContains: true,
errMsg: "requires the Grove pathway",
},
{
name: "inter-pod GMS rejected on non-vLLM backend",
groveEnabled: true,
deployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-gms",
Namespace: "default",
},
Spec: nvidiacomv1alpha1.DynamoGraphDeploymentSpec{
BackendFramework: "sglang",
Services: map[string]*nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
"worker": {
ComponentType: consts.ComponentTypeWorker,
GPUMemoryService: &nvidiacomv1alpha1.GPUMemoryServiceSpec{
Enabled: true,
Mode: nvidiacomv1alpha1.GMSModeInterPod,
},
Failover: &nvidiacomv1alpha1.FailoverSpec{
Enabled: true,
Mode: nvidiacomv1alpha1.GMSModeInterPod,
NumShadows: 1,
},
Resources: &nvidiacomv1alpha1.Resources{
Limits: &nvidiacomv1alpha1.ResourceItem{GPU: "8"},
},
},
},
},
},
wantErr: true,
errContains: true,
errMsg: "currently supported only for vLLM",
},
{
name: "inter-pod GMS rejected when backendFramework is unset",
groveEnabled: true,
deployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-gms",
Namespace: "default",
},
Spec: nvidiacomv1alpha1.DynamoGraphDeploymentSpec{
// BackendFramework intentionally left empty — the
// inter-pod gate must fail closed rather than silently
// accept a deployment whose engine may not speak vLLM.
Services: map[string]*nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
"worker": {
ComponentType: consts.ComponentTypeWorker,
GPUMemoryService: &nvidiacomv1alpha1.GPUMemoryServiceSpec{
Enabled: true,
Mode: nvidiacomv1alpha1.GMSModeInterPod,
},
Failover: &nvidiacomv1alpha1.FailoverSpec{
Enabled: true,
Mode: nvidiacomv1alpha1.GMSModeInterPod,
NumShadows: 1,
},
Resources: &nvidiacomv1alpha1.Resources{
Limits: &nvidiacomv1alpha1.ResourceItem{GPU: "8"},
},
},
},
},
},
wantErr: true,
errContains: true,
errMsg: "currently supported only for vLLM",
},
{
name: "GMS failover disabled is valid without GPU",
deployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-gms",
Namespace: "default",
},
Spec: nvidiacomv1alpha1.DynamoGraphDeploymentSpec{
Services: map[string]*nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
"worker": {
Failover: &nvidiacomv1alpha1.FailoverSpec{
Enabled: false,
},
},
},
},
},
wantErr: false,
},
// Annotation validation test cases
{
name: "valid annotation vllm-distributed-executor-backend=mp",
......@@ -1245,7 +1529,7 @@ func TestDynamoGraphDeploymentValidator_Validate(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
validator := NewDynamoGraphDeploymentValidator(tt.deployment)
validator := NewDynamoGraphDeploymentValidator(tt.deployment, tt.groveEnabled)
_, err := validator.Validate(context.Background())
if (err != nil) != tt.wantErr {
......@@ -1928,11 +2212,119 @@ func TestDynamoGraphDeploymentValidator_ValidateUpdate(t *testing.T) {
},
wantErr: false,
},
{
name: "toggling GMS failover is immutable",
oldDeployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
Spec: nvidiacomv1alpha1.DynamoGraphDeploymentSpec{
BackendFramework: "vllm",
Services: map[string]*nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
"worker": {
ComponentType: consts.ComponentTypeWorker,
GPUMemoryService: &nvidiacomv1alpha1.GPUMemoryServiceSpec{
Enabled: true,
Mode: nvidiacomv1alpha1.GMSModeInterPod,
},
},
},
},
},
newDeployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
Spec: nvidiacomv1alpha1.DynamoGraphDeploymentSpec{
BackendFramework: "vllm",
Services: map[string]*nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
"worker": {
ComponentType: consts.ComponentTypeWorker,
GPUMemoryService: &nvidiacomv1alpha1.GPUMemoryServiceSpec{
Enabled: true,
Mode: nvidiacomv1alpha1.GMSModeInterPod,
},
Failover: &nvidiacomv1alpha1.FailoverSpec{
Enabled: true,
Mode: nvidiacomv1alpha1.GMSModeInterPod,
NumShadows: 1,
},
},
},
},
},
wantErr: true,
errMsg: "failover cannot be toggled after creation",
},
{
name: "toggling inter-pod GMS layout is immutable",
oldDeployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
Spec: nvidiacomv1alpha1.DynamoGraphDeploymentSpec{
BackendFramework: "vllm",
Services: map[string]*nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
"worker": {},
},
},
},
newDeployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
Spec: nvidiacomv1alpha1.DynamoGraphDeploymentSpec{
BackendFramework: "vllm",
Services: map[string]*nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
"worker": {
ComponentType: consts.ComponentTypeWorker,
GPUMemoryService: &nvidiacomv1alpha1.GPUMemoryServiceSpec{
Enabled: true,
Mode: nvidiacomv1alpha1.GMSModeInterPod,
},
},
},
},
},
wantErr: true,
errMsg: "inter-pod GMS layout cannot be toggled after creation",
},
{
name: "changing numShadows is immutable",
oldDeployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
Spec: nvidiacomv1alpha1.DynamoGraphDeploymentSpec{
BackendFramework: "vllm",
Services: map[string]*nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
"worker": {
ComponentType: consts.ComponentTypeWorker,
GPUMemoryService: &nvidiacomv1alpha1.GPUMemoryServiceSpec{
Enabled: true,
Mode: nvidiacomv1alpha1.GMSModeInterPod,
},
Failover: &nvidiacomv1alpha1.FailoverSpec{
Enabled: true,
Mode: nvidiacomv1alpha1.GMSModeInterPod,
NumShadows: 1,
},
},
},
},
},
newDeployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
Spec: nvidiacomv1alpha1.DynamoGraphDeploymentSpec{
BackendFramework: "vllm",
Services: map[string]*nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
"worker": {
ComponentType: consts.ComponentTypeWorker,
GPUMemoryService: &nvidiacomv1alpha1.GPUMemoryServiceSpec{
Enabled: true,
Mode: nvidiacomv1alpha1.GMSModeInterPod,
},
Failover: &nvidiacomv1alpha1.FailoverSpec{
Enabled: true,
Mode: nvidiacomv1alpha1.GMSModeInterPod,
NumShadows: 3,
},
},
},
},
},
wantErr: true,
errMsg: "failover.numShadows is immutable",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
validator := NewDynamoGraphDeploymentValidator(tt.newDeployment)
validator := NewDynamoGraphDeploymentValidator(tt.newDeployment, true)
// Pass nil userInfo and empty operatorPrincipal - these tests don't modify replicas, so it's safe
warnings, err := validator.ValidateUpdate(tt.oldDeployment, nil, "")
......
......@@ -19,6 +19,7 @@ package validation
import (
"context"
"errors"
"fmt"
"strconv"
"strings"
......@@ -129,12 +130,12 @@ func (v *SharedSpecValidator) Validate(ctx context.Context) (admission.Warnings,
return nil, err
}
// Validate GPU memory service configuration
// Validate GPU memory service configuration (intra-pod GMS)
if err := v.validateGPUMemoryService(); err != nil {
return nil, err
}
// Validate failover configuration
// Validate GMS failover constraints
if err := v.validateFailover(); err != nil {
return nil, err
}
......@@ -266,50 +267,128 @@ func (v *SharedSpecValidator) validateFrontendSidecar() error {
return nil
}
// validateFailover validates the failover configuration for a service.
// Structural checks only — DRA/DeviceClass availability is checked by the controller
// at reconcile time (same pattern as Grove orchestrator availability).
// parseGPUCount extracts the GPU count from a Resources block, preferring
// Limits then Requests. Returns (0, nil) when no GPU is requested, or an
// error if the value is non-numeric.
func parseGPUCount(r *nvidiacomv1alpha1.Resources) (int, error) {
gpuStr := ""
switch {
case r != nil && r.Limits != nil && r.Limits.GPU != "":
gpuStr = r.Limits.GPU
case r != nil && r.Requests != nil && r.Requests.GPU != "":
gpuStr = r.Requests.GPU
}
if gpuStr == "" {
return 0, nil
}
n, err := strconv.Atoi(gpuStr)
if err != nil {
return 0, fmt.Errorf("invalid value %q: %w", gpuStr, err)
}
return n, nil
}
// validateFailover validates GMS failover configuration constraints.
//
// The layout (intra-pod sidecar vs. inter-pod weight-server pod) is declared
// by gpuMemoryService.mode. failover is an independent toggle: when enabled,
// failover.mode MUST match gpuMemoryService.mode so the two knobs describe a
// consistent topology. It is also valid to configure gpuMemoryService without
// failover (no shadows; a single engine + GMS server pair) — see
// validateGPUMemoryService below.
func (v *SharedSpecValidator) validateFailover() error {
if v.spec.Failover == nil || !v.spec.Failover.Enabled {
// When failover.enabled is false the sub-fields (mode, numShadows)
// are dormant configuration and the render path ignores them
// (GetNumShadows returns 0). We deliberately do not validate them
// here so users can stage a failover config before flipping
// enabled=true — matching the K8s convention that fields on a
// disabled feature are not constrained.
return nil
}
// Failover requires GPU memory service
var errs []error
// For intra-pod mode: require gpuMemoryService.enabled and validate mode matching.
if v.spec.Failover.Mode == nvidiacomv1alpha1.GMSModeIntraPod {
if v.spec.GPUMemoryService == nil || !v.spec.GPUMemoryService.Enabled {
return fmt.Errorf(
"%s.failover: failover requires gpuMemoryService.enabled to be true",
v.fieldPath)
errs = append(errs, fmt.Errorf(
"%s.failover: intraPod failover requires gpuMemoryService.enabled to be true",
v.fieldPath))
} else if v.spec.GPUMemoryService.Mode != "" &&
v.spec.GPUMemoryService.Mode != nvidiacomv1alpha1.GMSModeIntraPod {
errs = append(errs, fmt.Errorf(
"%s.failover: failover.mode %q must match gpuMemoryService.mode %q",
v.fieldPath, v.spec.Failover.Mode, v.spec.GPUMemoryService.Mode))
}
// Failover mode must match GMS mode when both are set
if v.spec.Failover.Mode != "" && v.spec.GPUMemoryService.Mode != "" &&
v.spec.Failover.Mode != v.spec.GPUMemoryService.Mode {
return fmt.Errorf(
"%s.failover: failover.mode %q must match gpuMemoryService.mode %q",
v.fieldPath, v.spec.Failover.Mode, v.spec.GPUMemoryService.Mode)
// intraPod is a fixed 1 primary + 1 shadow sidecar layout; numShadows
// is meaningless here and any value other than the implicit 1 is
// almost certainly a configuration error (user probably wanted
// mode=interPod).
if v.spec.Failover.NumShadows != 0 && v.spec.Failover.NumShadows != 1 {
errs = append(errs, fmt.Errorf(
"%s.failover.numShadows=%d is invalid for mode=%q: intraPod uses a fixed 1 primary + 1 shadow sidecar; "+
"use failover.mode=%q to configure numShadows",
v.fieldPath, v.spec.Failover.NumShadows, nvidiacomv1alpha1.GMSModeIntraPod, nvidiacomv1alpha1.GMSModeInterPod))
}
}
// interPod failover is not yet supported
// For inter-pod mode: require the inter-pod GMS layout (gpuMemoryService
// with mode=interPod) so failover hot-spares are added on top of an
// already-declared weight-server pod layout.
if v.spec.Failover.Mode == nvidiacomv1alpha1.GMSModeInterPod {
return fmt.Errorf(
"%s.failover: mode \"interPod\" is not yet supported",
v.fieldPath)
if v.spec.GPUMemoryService == nil || !v.spec.GPUMemoryService.Enabled {
errs = append(errs, fmt.Errorf(
"%s.failover: interPod failover requires gpuMemoryService.enabled=true and gpuMemoryService.mode=%q",
v.fieldPath, nvidiacomv1alpha1.GMSModeInterPod))
} else if v.spec.GPUMemoryService.Mode != nvidiacomv1alpha1.GMSModeInterPod {
// An unset gpuMemoryService.mode defaults to the intra-pod sidecar
// layout, which is incompatible with inter-pod failover; the user
// must set gpuMemoryService.mode=interPod explicitly.
detected := string(v.spec.GPUMemoryService.Mode)
if detected == "" {
detected = "<unset>"
}
errs = append(errs, fmt.Errorf(
"%s.failover: interPod failover requires gpuMemoryService.mode=%q (got %q)",
v.fieldPath, nvidiacomv1alpha1.GMSModeInterPod, detected))
}
return nil
if v.spec.Failover.NumShadows < 1 {
errs = append(errs, fmt.Errorf("%s.failover.numShadows must be >= 1", v.fieldPath))
}
gpuCount, err := parseGPUCount(v.spec.Resources)
if err != nil {
errs = append(errs, fmt.Errorf("%s.resources.limits.gpu: %w", v.fieldPath, err))
} else if gpuCount < 1 {
errs = append(errs, fmt.Errorf("%s: GMS failover requires at least 1 GPU in resources.limits.gpu", v.fieldPath))
}
switch v.spec.ComponentType {
case consts.ComponentTypeEPP, consts.ComponentTypeFrontend, consts.ComponentTypePlanner:
errs = append(errs, fmt.Errorf("%s: GMS failover is not supported for componentType %q", v.fieldPath, v.spec.ComponentType))
}
}
return errors.Join(errs...)
}
// validateGPUMemoryService validates gpuMemoryService constraints.
//
// gpuMemoryService declares the GMS layout (intra-pod sidecar vs. inter-pod
// dedicated weight-server pod) and may be enabled independently of failover:
// the intra-pod layout gives the engine a GMS sidecar in the same pod, and
// the inter-pod layout gives it a dedicated weight-server pod paired with one
// engine pod. Failover adds shadow engine pods on top of the declared layout
// (see validateFailover); it is not the sole way to request the inter-pod
// layout.
func (v *SharedSpecValidator) validateGPUMemoryService() error {
if v.spec.GPUMemoryService == nil || !v.spec.GPUMemoryService.Enabled {
return nil
}
if v.spec.GPUMemoryService.Mode == nvidiacomv1alpha1.GMSModeInterPod {
return fmt.Errorf(
"%s.gpuMemoryService: mode \"interPod\" is not yet supported",
v.fieldPath)
}
isWorker := v.spec.ComponentType == consts.ComponentTypeWorker ||
v.spec.ComponentType == consts.ComponentTypePrefill ||
v.spec.ComponentType == consts.ComponentTypeDecode
......@@ -319,27 +398,7 @@ func (v *SharedSpecValidator) validateGPUMemoryService() error {
v.fieldPath)
}
if v.spec.Resources == nil {
return fmt.Errorf(
"%s.gpuMemoryService: GPU memory service requires resources.limits.gpu >= 1",
v.fieldPath)
}
gpuStr := ""
switch {
case v.spec.Resources.Limits != nil && v.spec.Resources.Limits.GPU != "":
gpuStr = v.spec.Resources.Limits.GPU
case v.spec.Resources.Requests != nil && v.spec.Resources.Requests.GPU != "":
gpuStr = v.spec.Resources.Requests.GPU
}
if gpuStr == "" {
return fmt.Errorf(
"%s.gpuMemoryService: GPU memory service requires resources.limits.gpu >= 1",
v.fieldPath)
}
gpuCount, err := strconv.Atoi(gpuStr)
gpuCount, err := parseGPUCount(v.spec.Resources)
if err != nil || gpuCount < 1 {
return fmt.Errorf(
"%s.gpuMemoryService: GPU memory service requires resources.limits.gpu >= 1",
......
......@@ -396,6 +396,195 @@ func TestSharedSpecValidator_Validate_Warnings(t *testing.T) {
}
}
// TestSharedSpecValidator_Failover_ModeConstraints covers the layout/failover
// symmetry invariants enforced by validateFailover / validateGPUMemoryService:
//
// 1. gpuMemoryService declares the layout (intra-pod sidecar vs. inter-pod
// weight-server pod). Both modes are valid on their own (standalone GMS
// with no failover), and both may be paired with failover of a matching
// mode.
// 2. failover.mode=intraPod requires gpuMemoryService.enabled=true and a
// matching (or unset) gpuMemoryService.mode.
// 3. failover.mode=interPod requires gpuMemoryService.enabled=true AND
// gpuMemoryService.mode=interPod — the symmetric counterpart of (2).
// 4. intraPod failover with numShadows != 1 is rejected (intraPod is a
// fixed 1 primary + 1 shadow layout).
// 5. When failover.enabled=false, sub-fields (mode, numShadows) are dormant
// configuration and are intentionally NOT validated — the render path
// ignores them and users may stage a config before enabling failover.
func TestSharedSpecValidator_Failover_ModeConstraints(t *testing.T) {
workerGPU := &nvidiacomv1alpha1.Resources{
Limits: &nvidiacomv1alpha1.ResourceItem{GPU: "1"},
}
tests := []struct {
name string
spec *nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec
wantErr bool
errSubstr string
}{
{
name: "standalone inter-pod GMS (no failover) is accepted",
spec: &nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
ComponentType: consts.ComponentTypeWorker,
Resources: workerGPU,
GPUMemoryService: &nvidiacomv1alpha1.GPUMemoryServiceSpec{
Enabled: true,
Mode: nvidiacomv1alpha1.GMSModeInterPod,
},
},
wantErr: false,
},
{
name: "sidecar gpuMemoryService mode=intraPod is accepted",
spec: &nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
ComponentType: consts.ComponentTypeWorker,
Resources: workerGPU,
GPUMemoryService: &nvidiacomv1alpha1.GPUMemoryServiceSpec{
Enabled: true,
Mode: nvidiacomv1alpha1.GMSModeIntraPod,
},
},
wantErr: false,
},
{
name: "sidecar gpuMemoryService mode unset is accepted",
spec: &nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
ComponentType: consts.ComponentTypeWorker,
Resources: workerGPU,
GPUMemoryService: &nvidiacomv1alpha1.GPUMemoryServiceSpec{
Enabled: true,
},
},
wantErr: false,
},
{
name: "inter-pod failover requires gpuMemoryService.enabled",
spec: &nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
ComponentType: consts.ComponentTypeWorker,
Resources: workerGPU,
Failover: &nvidiacomv1alpha1.FailoverSpec{
Enabled: true,
Mode: nvidiacomv1alpha1.GMSModeInterPod,
NumShadows: 1,
},
},
wantErr: true,
errSubstr: "gpuMemoryService.enabled=true",
},
{
name: "inter-pod failover requires gpuMemoryService.mode=interPod",
spec: &nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
ComponentType: consts.ComponentTypeWorker,
Resources: workerGPU,
GPUMemoryService: &nvidiacomv1alpha1.GPUMemoryServiceSpec{
Enabled: true,
Mode: nvidiacomv1alpha1.GMSModeIntraPod,
},
Failover: &nvidiacomv1alpha1.FailoverSpec{
Enabled: true,
Mode: nvidiacomv1alpha1.GMSModeInterPod,
NumShadows: 1,
},
},
wantErr: true,
errSubstr: "requires gpuMemoryService.mode",
},
{
name: "inter-pod failover with matching gpuMemoryService.mode=interPod is accepted",
spec: &nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
ComponentType: consts.ComponentTypeWorker,
Resources: workerGPU,
GPUMemoryService: &nvidiacomv1alpha1.GPUMemoryServiceSpec{
Enabled: true,
Mode: nvidiacomv1alpha1.GMSModeInterPod,
},
Failover: &nvidiacomv1alpha1.FailoverSpec{
Enabled: true,
Mode: nvidiacomv1alpha1.GMSModeInterPod,
NumShadows: 1,
},
},
wantErr: false,
},
{
// numShadows is dormant configuration when failover.enabled=false
// and GetNumShadows returns 0; validateFailover deliberately does
// not constrain sub-fields on a disabled feature so users can
// stage a config before flipping enabled=true.
name: "numShadows with failover.enabled=false is accepted (dormant config)",
spec: &nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
ComponentType: consts.ComponentTypeWorker,
Resources: workerGPU,
GPUMemoryService: &nvidiacomv1alpha1.GPUMemoryServiceSpec{
Enabled: true,
Mode: nvidiacomv1alpha1.GMSModeInterPod,
},
Failover: &nvidiacomv1alpha1.FailoverSpec{
Enabled: false,
NumShadows: 2,
},
},
wantErr: false,
},
{
name: "intraPod failover with numShadows=2 is rejected",
spec: &nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
ComponentType: consts.ComponentTypeWorker,
Resources: workerGPU,
GPUMemoryService: &nvidiacomv1alpha1.GPUMemoryServiceSpec{
Enabled: true,
Mode: nvidiacomv1alpha1.GMSModeIntraPod,
},
Failover: &nvidiacomv1alpha1.FailoverSpec{
Enabled: true,
Mode: nvidiacomv1alpha1.GMSModeIntraPod,
NumShadows: 2,
},
},
wantErr: true,
errSubstr: "numShadows",
},
{
name: "intraPod failover with numShadows=1 is accepted",
spec: &nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
ComponentType: consts.ComponentTypeWorker,
Resources: workerGPU,
GPUMemoryService: &nvidiacomv1alpha1.GPUMemoryServiceSpec{
Enabled: true,
Mode: nvidiacomv1alpha1.GMSModeIntraPod,
},
Failover: &nvidiacomv1alpha1.FailoverSpec{
Enabled: true,
Mode: nvidiacomv1alpha1.GMSModeIntraPod,
NumShadows: 1,
},
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
v := NewSharedSpecValidator(tt.spec, "spec", "default-my-dgd")
_, err := v.Validate(context.Background())
if tt.wantErr {
if err == nil {
t.Fatalf("expected error, got nil")
}
if tt.errSubstr != "" && !contains(err.Error(), tt.errSubstr) {
t.Errorf("error %q does not contain %q", err.Error(), tt.errSubstr)
}
return
}
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
})
}
}
// contains checks if s contains substr
func contains(s, substr string) bool {
return len(s) >= len(substr) && (s == substr || len(substr) == 0 ||
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment