Unverified Commit 45364b5f authored by mohammedabdulwahhab's avatar mohammedabdulwahhab Committed by GitHub
Browse files

feat(operator): enable failover API (#8157)


Signed-off-by: default avatarmohammedabdulwahhab <furkhan324@berkeley.edu>
Co-authored-by: default avatarClaude Opus 4.6 (1M context) <noreply@anthropic.com>
parent b13a440e
......@@ -10549,6 +10549,37 @@ spec:
x-kubernetes-list-type: map
required: []
type: object
failover:
description: |-
Failover configures active-passive GPU failover for this service.
When enabled, the main container is cloned into two engine containers
(active + standby) sharing GPUs via DRA. Requires gpuMemoryService.enabled.
properties:
enabled:
description: |-
Enabled activates failover mode. The main container is cloned into two
engine containers (active + standby) sharing GPUs via DRA. The standby
acquires the flock when the active engine fails.
type: boolean
mode:
default: intraPod
description: Mode selects the failover deployment topology. Must match gpuMemoryService.mode.
enum:
- intraPod
- interPod
type: string
numShadows:
default: 1
description: |-
NumShadows is the number of shadow (standby) engine containers per rank.
Reserved for future use — the operator currently creates exactly one shadow.
format: int32
maximum: 1
minimum: 1
type: integer
required:
- enabled
type: object
frontendSidecar:
description: |-
FrontendSidecar configures an auto-generated frontend sidecar container.
......
......@@ -10772,6 +10772,37 @@ spec:
x-kubernetes-list-type: map
required: []
type: object
failover:
description: |-
Failover configures active-passive GPU failover for this service.
When enabled, the main container is cloned into two engine containers
(active + standby) sharing GPUs via DRA. Requires gpuMemoryService.enabled.
properties:
enabled:
description: |-
Enabled activates failover mode. The main container is cloned into two
engine containers (active + standby) sharing GPUs via DRA. The standby
acquires the flock when the active engine fails.
type: boolean
mode:
default: intraPod
description: Mode selects the failover deployment topology. Must match gpuMemoryService.mode.
enum:
- intraPod
- interPod
type: string
numShadows:
default: 1
description: |-
NumShadows is the number of shadow (standby) engine containers per rank.
Reserved for future use — the operator currently creates exactly one shadow.
format: int32
maximum: 1
minimum: 1
type: integer
required:
- enabled
type: object
frontendSidecar:
description: |-
FrontendSidecar configures an auto-generated frontend sidecar container.
......
......@@ -184,6 +184,28 @@ type GPUMemoryServiceSpec struct {
DeviceClassName string `json:"deviceClassName,omitempty"`
}
// FailoverSpec configures active-passive failover for a worker component.
// Requires gpuMemoryService.enabled and the nvidia.com/dynamo-kube-discovery-mode: container
// annotation on the DGD.
type FailoverSpec struct {
// Enabled activates failover mode. The main container is cloned into two
// engine containers (active + standby) sharing GPUs via DRA. The standby
// acquires the flock when the active engine fails.
Enabled bool `json:"enabled"`
// Mode selects the failover deployment topology. Must match gpuMemoryService.mode.
// +kubebuilder:default=intraPod
// +kubebuilder:validation:Enum=intraPod;interPod
// +optional
Mode GPUMemoryServiceMode `json:"mode,omitempty"`
// NumShadows is the number of shadow (standby) engine containers per rank.
// Reserved for future use — the operator currently creates exactly one shadow.
// +kubebuilder:default=1
// +kubebuilder:validation:Minimum=1
// +kubebuilder:validation:Maximum=1
// +optional
NumShadows int32 `json:"numShadows,omitempty"`
}
// ScalingAdapter configures whether a service uses the DynamoGraphDeploymentScalingAdapter
// for replica management. When enabled, the DGDSA owns the replicas field and
// external autoscalers (HPA, KEDA, Planner) can control scaling via the Scale subresource.
......
......@@ -154,6 +154,12 @@ type DynamoComponentDeploymentSharedSpec struct {
// When enabled, a GMS sidecar is injected and GPU access is managed via DRA.
// +optional
GPUMemoryService *GPUMemoryServiceSpec `json:"gpuMemoryService,omitempty"`
// Failover configures active-passive GPU failover for this service.
// When enabled, the main container is cloned into two engine containers
// (active + standby) sharing GPUs via DRA. Requires gpuMemoryService.enabled.
// +optional
Failover *FailoverSpec `json:"failover,omitempty"`
}
type MultinodeSpec struct {
......
......@@ -567,6 +567,11 @@ func (in *DynamoComponentDeploymentSharedSpec) DeepCopyInto(out *DynamoComponent
*out = new(GPUMemoryServiceSpec)
**out = **in
}
if in.Failover != nil {
in, out := &in.Failover, &out.Failover
*out = new(FailoverSpec)
**out = **in
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DynamoComponentDeploymentSharedSpec.
......@@ -1229,6 +1234,21 @@ func (in *ExtraPodSpec) DeepCopy() *ExtraPodSpec {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *FailoverSpec) DeepCopyInto(out *FailoverSpec) {
*out = *in
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FailoverSpec.
func (in *FailoverSpec) DeepCopy() *FailoverSpec {
if in == nil {
return nil
}
out := new(FailoverSpec)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *FrontendSidecarSpec) DeepCopyInto(out *FrontendSidecarSpec) {
*out = *in
......
......@@ -10549,6 +10549,37 @@ spec:
x-kubernetes-list-type: map
required: []
type: object
failover:
description: |-
Failover configures active-passive GPU failover for this service.
When enabled, the main container is cloned into two engine containers
(active + standby) sharing GPUs via DRA. Requires gpuMemoryService.enabled.
properties:
enabled:
description: |-
Enabled activates failover mode. The main container is cloned into two
engine containers (active + standby) sharing GPUs via DRA. The standby
acquires the flock when the active engine fails.
type: boolean
mode:
default: intraPod
description: Mode selects the failover deployment topology. Must match gpuMemoryService.mode.
enum:
- intraPod
- interPod
type: string
numShadows:
default: 1
description: |-
NumShadows is the number of shadow (standby) engine containers per rank.
Reserved for future use — the operator currently creates exactly one shadow.
format: int32
maximum: 1
minimum: 1
type: integer
required:
- enabled
type: object
frontendSidecar:
description: |-
FrontendSidecar configures an auto-generated frontend sidecar container.
......
......@@ -10772,6 +10772,37 @@ spec:
x-kubernetes-list-type: map
required: []
type: object
failover:
description: |-
Failover configures active-passive GPU failover for this service.
When enabled, the main container is cloned into two engine containers
(active + standby) sharing GPUs via DRA. Requires gpuMemoryService.enabled.
properties:
enabled:
description: |-
Enabled activates failover mode. The main container is cloned into two
engine containers (active + standby) sharing GPUs via DRA. The standby
acquires the flock when the active engine fails.
type: boolean
mode:
default: intraPod
description: Mode selects the failover deployment topology. Must match gpuMemoryService.mode.
enum:
- intraPod
- interPod
type: string
numShadows:
default: 1
description: |-
NumShadows is the number of shadow (standby) engine containers per rank.
Reserved for future use — the operator currently creates exactly one shadow.
format: int32
maximum: 1
minimum: 1
type: integer
required:
- enabled
type: object
frontendSidecar:
description: |-
FrontendSidecar configures an auto-generated frontend sidecar container.
......
/*
* SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*/
package dynamo
import (
"fmt"
"path/filepath"
"strconv"
"github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
commonconsts "github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"
)
var failoverLockFile = filepath.Join(gmsSharedMountPath, "failover.lock")
const (
failoverEngineCount = 2
)
func isFailoverEnabled(component *v1alpha1.DynamoComponentDeploymentSharedSpec) bool {
return component.Failover != nil && component.Failover.Enabled
}
// buildFailoverPod clones the main container into two engine containers (active + standby).
// This runs AFTER applyGPUMemoryService, so the main container already has DRA claims,
// shared volume mount, and TMPDIR set. This function only handles engine duplication
// and failover-specific env vars.
//
// Non-main containers (e.g. frontend sidecar) are preserved in the final pod spec.
func buildFailoverPod(
podSpec *corev1.PodSpec,
numberOfNodes int32,
backendFramework BackendFramework,
) error {
if len(podSpec.Containers) == 0 {
return fmt.Errorf("pod spec must have at least one container for failover transformation")
}
mainContainer := podSpec.Containers[0]
sidecars := podSpec.Containers[1:]
engines := make([]corev1.Container, failoverEngineCount)
for i := range failoverEngineCount {
engines[i] = buildEngineContainer(mainContainer, i, commonconsts.DynamoSystemPort+i)
}
podSpec.Containers = append(engines, sidecars...)
// Backend-specific overrides
switch backendFramework {
case BackendFrameworkVLLM:
applyVLLMOverrides(podSpec, numberOfNodes)
default:
return fmt.Errorf("failover is currently supported only for vLLM (detected: %s)", backendFramework)
}
return nil
}
// buildEngineContainer clones the main container with ENGINE_ID and failover env vars.
// Each engine gets a unique system port and named port for probe targeting.
func buildEngineContainer(base corev1.Container, engineID int, systemPort int) corev1.Container {
engine := *base.DeepCopy()
engine.Name = fmt.Sprintf("engine-%d", engineID)
portName := fmt.Sprintf("system-%d", engineID)
engine.Ports = []corev1.ContainerPort{
{
Protocol: corev1.ProtocolTCP,
Name: portName,
ContainerPort: int32(systemPort),
},
}
// Env vars to remove: replaced by failover-specific values or intentionally omitted.
removeSet := map[string]bool{
"DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS": true,
"DYN_SYSTEM_PORT": true,
"DYN_SYSTEM_ENABLED": true,
"DYN_HEALTH_CHECK_ENABLED": true,
"CONTAINER_NAME": true,
}
var filtered []corev1.EnvVar
for _, env := range engine.Env {
if !removeSet[env.Name] {
filtered = append(filtered, env)
}
}
containerName := fmt.Sprintf("engine-%d", engineID)
failoverEnvs := []corev1.EnvVar{
{Name: "ENGINE_ID", Value: strconv.Itoa(engineID)},
{Name: "CONTAINER_NAME", Value: containerName},
{Name: "FAILOVER_LOCK_PATH", Value: failoverLockFile},
{Name: "DYN_SYSTEM_STARTING_HEALTH_STATUS", Value: "notready"},
{Name: "DYN_SYSTEM_PORT", Value: strconv.Itoa(systemPort)},
{Name: "DYN_SYSTEM_ENABLED", Value: "true"},
}
engine.Env = append(filtered, failoverEnvs...)
// Retarget HTTP probes to this engine's named port. Each engine runs its
// system server on a staggered port (e.g. 9090, 9091), and the probes
// inherited from the base container still reference the original port name.
portRef := intstr.FromString(portName)
if engine.StartupProbe != nil && engine.StartupProbe.HTTPGet != nil {
engine.StartupProbe.HTTPGet.Port = portRef
}
if engine.LivenessProbe != nil && engine.LivenessProbe.HTTPGet != nil {
engine.LivenessProbe.HTTPGet.Port = portRef
}
if engine.ReadinessProbe != nil && engine.ReadinessProbe.HTTPGet != nil {
engine.ReadinessProbe.HTTPGet.Port = portRef
}
return engine
}
/*
* SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*/
package dynamo
import (
"fmt"
"strconv"
"testing"
"github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
commonconsts "github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"
)
// failoverPodSpec returns a pod spec that has already been transformed by
// applyGPUMemoryService (DRA claims, shared volume, TMPDIR set), including
// a frontend sidecar to verify sidecar preservation.
func failoverPodSpec() corev1.PodSpec {
httpPort := intstr.FromString("system")
return corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "main",
Image: "test-image:latest",
Command: []string{"python3", "-m", "dynamo.vllm"},
Env: []corev1.EnvVar{
{Name: "DYN_SYSTEM_PORT", Value: "9090"},
{Name: "DYN_SYSTEM_ENABLED", Value: "true"},
{Name: "DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS", Value: "true"},
{Name: "DYN_HEALTH_CHECK_ENABLED", Value: "true"},
{Name: commonconsts.DynamoDiscoveryBackendEnvVar, Value: "kubernetes"},
{Name: "TMPDIR", Value: gmsSharedMountPath},
},
Ports: []corev1.ContainerPort{
{Name: "system", ContainerPort: 9090, Protocol: corev1.ProtocolTCP},
},
StartupProbe: &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{Path: "/health", Port: httpPort},
},
},
LivenessProbe: &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{Path: "/health", Port: httpPort},
},
},
ReadinessProbe: &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{Path: "/health", Port: httpPort},
},
},
Resources: corev1.ResourceRequirements{
Claims: []corev1.ResourceClaim{{Name: gmsDRAClaimName}},
},
VolumeMounts: []corev1.VolumeMount{
{Name: gmsSharedVolumeName, MountPath: gmsSharedMountPath},
},
},
{
Name: "frontend-sidecar",
Image: "test-image:latest",
},
},
}
}
// --- buildFailoverPod ---
func TestBuildFailoverPod_TwoEnginesPlusSidecar(t *testing.T) {
ps := failoverPodSpec()
err := buildFailoverPod(&ps, 1, BackendFrameworkVLLM)
require.NoError(t, err)
// 2 engines + 1 preserved sidecar
assert.Len(t, ps.Containers, 3)
assert.Equal(t, "engine-0", ps.Containers[0].Name)
assert.Equal(t, "engine-1", ps.Containers[1].Name)
assert.Equal(t, "frontend-sidecar", ps.Containers[2].Name)
}
func TestBuildFailoverPod_EmptyContainers(t *testing.T) {
ps := corev1.PodSpec{}
err := buildFailoverPod(&ps, 1, BackendFrameworkVLLM)
require.Error(t, err)
assert.Contains(t, err.Error(), "at least one container")
}
func TestBuildFailoverPod_RejectsNonVLLM(t *testing.T) {
ps := failoverPodSpec()
err := buildFailoverPod(&ps, 1, BackendFrameworkSGLang)
require.Error(t, err)
assert.Contains(t, err.Error(), "currently supported only for vLLM")
}
func TestBuildFailoverPod_EngineEnvVars(t *testing.T) {
ps := failoverPodSpec()
err := buildFailoverPod(&ps, 1, BackendFrameworkVLLM)
require.NoError(t, err)
for i := range 2 {
engine := ps.Containers[i]
env := envToMap(engine.Env)
assert.Equal(t, strconv.Itoa(i), env["ENGINE_ID"], "engine-%d ENGINE_ID", i)
assert.Equal(t, fmt.Sprintf("engine-%d", i), env["CONTAINER_NAME"], "engine-%d CONTAINER_NAME", i)
assert.Equal(t, failoverLockFile, env["FAILOVER_LOCK_PATH"], "engine-%d FAILOVER_LOCK_PATH", i)
assert.Equal(t, "true", env["DYN_VLLM_GMS_SHADOW_MODE"], "engine-%d shadow mode", i)
assert.Equal(t, "notready", env["DYN_SYSTEM_STARTING_HEALTH_STATUS"], "engine-%d starting health", i)
assert.Equal(t, "true", env["DYN_SYSTEM_ENABLED"], "engine-%d system enabled", i)
// Removed env vars should not be present
_, hasOldHealth := env["DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS"]
assert.False(t, hasOldHealth, "engine-%d should not have DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS", i)
_, hasHealthCheck := env["DYN_HEALTH_CHECK_ENABLED"]
assert.False(t, hasHealthCheck, "engine-%d should not have DYN_HEALTH_CHECK_ENABLED", i)
}
}
func TestBuildFailoverPod_StaggeredPorts(t *testing.T) {
ps := failoverPodSpec()
err := buildFailoverPod(&ps, 1, BackendFrameworkVLLM)
require.NoError(t, err)
for i := range 2 {
engine := ps.Containers[i]
env := envToMap(engine.Env)
assert.Equal(t, strconv.Itoa(commonconsts.DynamoSystemPort+i), env["DYN_SYSTEM_PORT"])
require.Len(t, engine.Ports, 1)
assert.Equal(t, int32(commonconsts.DynamoSystemPort+i), engine.Ports[0].ContainerPort)
assert.Equal(t, fmt.Sprintf("system-%d", i), engine.Ports[0].Name)
}
}
func TestBuildFailoverPod_ProbesRetargetedToNamedPort(t *testing.T) {
ps := failoverPodSpec()
err := buildFailoverPod(&ps, 1, BackendFrameworkVLLM)
require.NoError(t, err)
for i := range 2 {
engine := ps.Containers[i]
expectedPort := intstr.FromString(fmt.Sprintf("system-%d", i))
if engine.StartupProbe != nil && engine.StartupProbe.HTTPGet != nil {
assert.Equal(t, expectedPort, engine.StartupProbe.HTTPGet.Port)
}
if engine.LivenessProbe != nil && engine.LivenessProbe.HTTPGet != nil {
assert.Equal(t, expectedPort, engine.LivenessProbe.HTTPGet.Port)
}
if engine.ReadinessProbe != nil && engine.ReadinessProbe.HTTPGet != nil {
assert.Equal(t, expectedPort, engine.ReadinessProbe.HTTPGet.Port)
}
}
}
func TestBuildFailoverPod_PreservesDRAClaim(t *testing.T) {
ps := failoverPodSpec()
err := buildFailoverPod(&ps, 1, BackendFrameworkVLLM)
require.NoError(t, err)
for i := range 2 {
engine := ps.Containers[i]
require.Len(t, engine.Resources.Claims, 1, "engine-%d should retain DRA claim", i)
assert.Equal(t, gmsDRAClaimName, engine.Resources.Claims[0].Name)
}
}
func TestBuildFailoverPod_PreservesDiscoveryBackend(t *testing.T) {
ps := failoverPodSpec()
err := buildFailoverPod(&ps, 1, BackendFrameworkVLLM)
require.NoError(t, err)
for i := range 2 {
env := envToMap(ps.Containers[i].Env)
assert.Equal(t, "kubernetes", env[commonconsts.DynamoDiscoveryBackendEnvVar])
}
}
func TestBuildFailoverPod_MultinodeNNODES(t *testing.T) {
ps := failoverPodSpec()
err := buildFailoverPod(&ps, 4, BackendFrameworkVLLM)
require.NoError(t, err)
for i := range 2 {
env := envToMap(ps.Containers[i].Env)
assert.Equal(t, "4", env["NNODES"], "engine-%d should have NNODES=4", i)
}
}
func TestBuildFailoverPod_SingleNodeNoNNODES(t *testing.T) {
ps := failoverPodSpec()
err := buildFailoverPod(&ps, 1, BackendFrameworkVLLM)
require.NoError(t, err)
for i := range 2 {
env := envToMap(ps.Containers[i].Env)
_, has := env["NNODES"]
assert.False(t, has, "engine-%d should not have NNODES for single-node", i)
}
}
// --- isFailoverEnabled ---
func TestIsFailoverEnabled(t *testing.T) {
assert.True(t, isFailoverEnabled(&v1alpha1.DynamoComponentDeploymentSharedSpec{
Failover: &v1alpha1.FailoverSpec{Enabled: true},
}))
assert.False(t, isFailoverEnabled(&v1alpha1.DynamoComponentDeploymentSharedSpec{
Failover: &v1alpha1.FailoverSpec{Enabled: false},
}))
assert.False(t, isFailoverEnabled(&v1alpha1.DynamoComponentDeploymentSharedSpec{}))
}
/*
* SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*/
package dynamo
import (
"strconv"
"strings"
corev1 "k8s.io/api/core/v1"
)
const (
vllmMasterPortFlag = "--master-port"
vllmMasterPortStride = 100
)
// applyVLLMOverrides injects vLLM-specific env vars into all engine containers.
// Port staggering (NIXL side channel, KV event, master port) prevents collisions
// between engines sharing the same pod network namespace.
// For multinode deployments, it also injects NNODES so engines know the group size.
func applyVLLMOverrides(podSpec *corev1.PodSpec, numberOfNodes int32) {
for i := range podSpec.Containers {
c := &podSpec.Containers[i]
if !strings.HasPrefix(c.Name, "engine-") {
continue
}
engineID, _ := strconv.Atoi(strings.TrimPrefix(c.Name, "engine-"))
c.Env = append(c.Env,
corev1.EnvVar{Name: "DYN_VLLM_GMS_SHADOW_MODE", Value: "true"},
corev1.EnvVar{Name: "VLLM_NIXL_SIDE_CHANNEL_PORT", Value: strconv.Itoa(5600 + engineID)},
corev1.EnvVar{Name: "DYN_VLLM_KV_EVENT_PORT", Value: strconv.Itoa(20080 + engineID)},
)
// Stagger --master-port for TP so each engine group uses a distinct
// torch.distributed TCP store. engine-0 keeps the default (29500),
// engine-1 gets 29500 + stride.
if engineID > 0 {
if hasMasterPortFlag(c) {
staggerMasterPort(c, engineID)
} else {
c.Args = append(c.Args, vllmMasterPortFlag, strconv.Itoa(29500+engineID*vllmMasterPortStride))
}
}
if numberOfNodes > 1 {
c.Env = append(c.Env,
corev1.EnvVar{Name: "NNODES", Value: strconv.Itoa(int(numberOfNodes))},
)
}
}
}
// hasMasterPortFlag checks if --master-port appears in the container args or command.
func hasMasterPortFlag(container *corev1.Container) bool {
for _, arg := range container.Args {
if arg == vllmMasterPortFlag || strings.Contains(arg, vllmMasterPortFlag+" ") {
return true
}
}
for _, cmd := range container.Command {
if strings.Contains(cmd, vllmMasterPortFlag+" ") {
return true
}
}
return false
}
func staggerMasterPort(container *corev1.Container, engineID int) {
offset := engineID * vllmMasterPortStride
staggerFlagValue(container, vllmMasterPortFlag, offset)
}
// staggerFlagValue finds a --flag VALUE pair in container args and adds offset
// to the integer value. Handles both separate-token args (["--flag", "29500"])
// and shell-wrapped args (["sh", "-c", "... --flag 29500 ..."]).
func staggerFlagValue(container *corev1.Container, flag string, offset int) {
for i, arg := range container.Args {
if arg == flag && i+1 < len(container.Args) {
if port, err := strconv.Atoi(container.Args[i+1]); err == nil {
container.Args[i+1] = strconv.Itoa(port + offset)
return
}
}
}
for i, arg := range container.Args {
if strings.Contains(arg, flag+" ") {
parts := strings.Split(arg, flag+" ")
if len(parts) < 2 {
continue
}
var portStr string
for _, ch := range parts[1] {
if ch >= '0' && ch <= '9' {
portStr += string(ch)
} else {
break
}
}
if port, err := strconv.Atoi(portStr); err == nil {
container.Args[i] = strings.Replace(arg, flag+" "+portStr, flag+" "+strconv.Itoa(port+offset), 1)
return
}
}
}
for i, cmd := range container.Command {
if strings.Contains(cmd, flag+" ") {
parts := strings.Split(cmd, flag+" ")
if len(parts) < 2 {
continue
}
var portStr string
for _, ch := range parts[1] {
if ch >= '0' && ch <= '9' {
portStr += string(ch)
} else {
break
}
}
if port, err := strconv.Atoi(portStr); err == nil {
container.Command[i] = strings.Replace(cmd, flag+" "+portStr, flag+" "+strconv.Itoa(port+offset), 1)
return
}
}
}
}
......@@ -1190,6 +1190,14 @@ func GenerateBasePodSpec(
}
}
// Clone main container into two engine containers (active + standby) for failover.
// Runs after GMS so the main container already has DRA claims and shared volume.
if isFailoverEnabled(component) {
if err := buildFailoverPod(&podSpec, numberOfNodes, backendFramework); err != nil {
return nil, fmt.Errorf("failed to build failover pod: %w", err)
}
}
return &podSpec, nil
}
......
......@@ -98,6 +98,11 @@ func (v *DynamoGraphDeploymentValidator) Validate(ctx context.Context) (admissio
return nil, err
}
// Validate that failover-enabled services have the required discovery mode annotation
if err := v.validateFailoverRequiresDiscoveryMode(); err != nil {
return nil, err
}
var allWarnings admission.Warnings
// Validate each service
......@@ -790,3 +795,29 @@ func (v *DynamoGraphDeploymentValidator) validateNoRestartDuringRollingUpdate(ol
return nil
}
// validateFailoverRequiresDiscoveryMode checks that when any service has
// failover enabled, the DGD carries the nvidia.com/dynamo-kube-discovery-mode
// annotation set to "container". Failover pods produce multiple engine
// containers that each need their own discovery identity.
func (v *DynamoGraphDeploymentValidator) validateFailoverRequiresDiscoveryMode() error {
hasFailover := false
for _, svc := range v.deployment.Spec.Services {
if svc != nil && svc.Failover != nil && svc.Failover.Enabled {
hasFailover = true
break
}
}
if !hasFailover {
return nil
}
annotations := v.deployment.GetAnnotations()
if annotations == nil || annotations[consts.KubeAnnotationDynamoKubeDiscoveryMode] != "container" {
return fmt.Errorf(
"failover requires per-container K8s discovery; set annotation %q to %q on the DynamoGraphDeployment",
consts.KubeAnnotationDynamoKubeDiscoveryMode, "container")
}
return nil
}
......@@ -134,6 +134,11 @@ func (v *SharedSpecValidator) Validate(ctx context.Context) (admission.Warnings,
return nil, err
}
// Validate failover configuration
if err := v.validateFailover(); err != nil {
return nil, err
}
return warnings, nil
}
......@@ -261,7 +266,39 @@ func (v *SharedSpecValidator) validateFrontendSidecar() error {
return nil
}
// validateGPUMemoryService validates the GPU memory service configuration.
// validateFailover validates the failover configuration for a service.
// Structural checks only — DRA/DeviceClass availability is checked by the controller
// at reconcile time (same pattern as Grove orchestrator availability).
func (v *SharedSpecValidator) validateFailover() error {
if v.spec.Failover == nil || !v.spec.Failover.Enabled {
return nil
}
// Failover requires GPU memory service
if v.spec.GPUMemoryService == nil || !v.spec.GPUMemoryService.Enabled {
return fmt.Errorf(
"%s.failover: failover requires gpuMemoryService.enabled to be true",
v.fieldPath)
}
// Failover mode must match GMS mode when both are set
if v.spec.Failover.Mode != "" && v.spec.GPUMemoryService.Mode != "" &&
v.spec.Failover.Mode != v.spec.GPUMemoryService.Mode {
return fmt.Errorf(
"%s.failover: failover.mode %q must match gpuMemoryService.mode %q",
v.fieldPath, v.spec.Failover.Mode, v.spec.GPUMemoryService.Mode)
}
// interPod failover is not yet supported
if v.spec.Failover.Mode == nvidiacomv1alpha1.GMSModeInterPod {
return fmt.Errorf(
"%s.failover: mode \"interPod\" is not yet supported",
v.fieldPath)
}
return nil
}
func (v *SharedSpecValidator) validateGPUMemoryService() error {
if v.spec.GPUMemoryService == nil || !v.spec.GPUMemoryService.Enabled {
return nil
......
......@@ -404,6 +404,7 @@ _Appears in:_
| `checkpoint` _[ServiceCheckpointConfig](#servicecheckpointconfig)_ | Checkpoint configures container checkpointing for this service.<br />When enabled, pods can be restored from a checkpoint files for faster cold start. | | Optional: \{\} <br /> |
| `topologyConstraint` _[TopologyConstraint](#topologyconstraint)_ | TopologyConstraint for this service. packDomain is required.<br />When both this and spec.topologyConstraint.packDomain are set, packDomain<br />must be narrower than or equal to the spec-level packDomain. | | Optional: \{\} <br /> |
| `gpuMemoryService` _[GPUMemoryServiceSpec](#gpumemoryservicespec)_ | GPUMemoryService configures the GPU Memory Service (GMS) sidecar.<br />When enabled, a GMS sidecar is injected and GPU access is managed via DRA. | | Optional: \{\} <br /> |
| `failover` _[FailoverSpec](#failoverspec)_ | Failover configures active-passive GPU failover for this service.<br />When enabled, the main container is cloned into two engine containers<br />(active + standby) sharing GPUs via DRA. Requires gpuMemoryService.enabled. | | Optional: \{\} <br /> |
#### DynamoComponentDeploymentSpec
......@@ -447,6 +448,7 @@ _Appears in:_
| `checkpoint` _[ServiceCheckpointConfig](#servicecheckpointconfig)_ | Checkpoint configures container checkpointing for this service.<br />When enabled, pods can be restored from a checkpoint files for faster cold start. | | Optional: \{\} <br /> |
| `topologyConstraint` _[TopologyConstraint](#topologyconstraint)_ | TopologyConstraint for this service. packDomain is required.<br />When both this and spec.topologyConstraint.packDomain are set, packDomain<br />must be narrower than or equal to the spec-level packDomain. | | Optional: \{\} <br /> |
| `gpuMemoryService` _[GPUMemoryServiceSpec](#gpumemoryservicespec)_ | GPUMemoryService configures the GPU Memory Service (GMS) sidecar.<br />When enabled, a GMS sidecar is injected and GPU access is managed via DRA. | | Optional: \{\} <br /> |
| `failover` _[FailoverSpec](#failoverspec)_ | Failover configures active-passive GPU failover for this service.<br />When enabled, the main container is cloned into two engine containers<br />(active + standby) sharing GPUs via DRA. Requires gpuMemoryService.enabled. | | Optional: \{\} <br /> |
#### DynamoGraphDeployment
......@@ -802,6 +804,27 @@ _Appears in:_
| `mainContainer` _[Container](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.28/#container-v1-core)_ | | | |
#### FailoverSpec
FailoverSpec configures active-passive failover for a worker component.
Requires gpuMemoryService.enabled and the nvidia.com/dynamo-kube-discovery-mode: container
annotation on the DGD.
_Appears in:_
- [DynamoComponentDeploymentSharedSpec](#dynamocomponentdeploymentsharedspec)
- [DynamoComponentDeploymentSpec](#dynamocomponentdeploymentspec)
| Field | Description | Default | Validation |
| --- | --- | --- | --- |
| `enabled` _boolean_ | Enabled activates failover mode. The main container is cloned into two<br />engine containers (active + standby) sharing GPUs via DRA. The standby<br />acquires the flock when the active engine fails. | | |
| `mode` _[GPUMemoryServiceMode](#gpumemoryservicemode)_ | Mode selects the failover deployment topology. Must match gpuMemoryService.mode. | intraPod | Enum: [intraPod interPod] <br />Optional: \{\} <br /> |
| `numShadows` _integer_ | NumShadows is the number of shadow (standby) engine containers per rank.<br />Reserved for future use — the operator currently creates exactly one shadow. | 1 | Maximum: 1 <br />Minimum: 1 <br />Optional: \{\} <br /> |
#### FrontendSidecarSpec
......@@ -833,6 +856,7 @@ GPUMemoryServiceMode selects the GMS deployment topology.
_Appears in:_
- [FailoverSpec](#failoverspec)
- [GPUMemoryServiceSpec](#gpumemoryservicespec)
| Field | Description |
......
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Active-passive GPU failover example.
#
# The operator clones the worker's main container into two engine containers
# (engine-0 active, engine-1 standby) sharing GPUs via DRA and the GMS sidecar.
# When the active engine fails, the standby acquires the flock and takes over.
#
# Requires:
# - gpuMemoryService.enabled: true (GMS sidecar + DRA)
# - nvidia.com/dynamo-kube-discovery-mode: container (per-container K8s discovery)
# - Kubernetes 1.32+ with DRA enabled and the NVIDIA GPU DRA driver installed
apiVersion: nvidia.com/v1alpha1
kind: DynamoGraphDeployment
metadata:
name: vllm-agg-failover
annotations:
nvidia.com/dynamo-kube-discovery-mode: container
spec:
services:
Frontend:
envFromSecret: hf-token-secret
componentType: frontend
replicas: 1
extraPodSpec:
mainContainer:
image: nvcr.io/nvidia/ai-dynamo/vllm-runtime:my-tag
VllmWorker:
envFromSecret: hf-token-secret
componentType: worker
replicas: 1
resources:
limits:
gpu: "2"
requests:
custom:
ephemeral-storage: "2Gi"
gpuMemoryService:
enabled: true
failover:
enabled: true
extraPodSpec:
mainContainer:
image: nvcr.io/nvidia/ai-dynamo/vllm-runtime:my-tag
workingDir: /workspace/examples/backends/vllm
command:
- python3
- -m
- dynamo.vllm
args:
- --model
- Qwen/Qwen3-0.6B
- --tensor-parallel-size
- "2"
- --load-format
- gms
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment