Unverified Commit 2f658327 authored by mohammedabdulwahhab's avatar mohammedabdulwahhab Committed by GitHub
Browse files

feat: per-container kube discovery for multi-engine pods (#8067)


Signed-off-by: default avatarmohammedabdulwahhab <furkhan324@berkeley.edu>
Co-authored-by: default avatarClaude Opus 4.6 (1M context) <noreply@anthropic.com>
parent 47897f36
...@@ -311,6 +311,16 @@ const ( ...@@ -311,6 +311,16 @@ const (
DiscoveryBackendEtcd DiscoveryBackend = "etcd" DiscoveryBackendEtcd DiscoveryBackend = "etcd"
) )
// KubeDiscoveryMode is the kube discovery identity granularity.
type KubeDiscoveryMode string
const (
// KubeDiscoveryModePod is the default: one identity per pod.
KubeDiscoveryModePod KubeDiscoveryMode = "pod"
// KubeDiscoveryModeContainer: each container registers independently with the discovery plane.
KubeDiscoveryModeContainer KubeDiscoveryMode = "container"
)
// GPUConfiguration holds GPU discovery settings. // GPUConfiguration holds GPU discovery settings.
type GPUConfiguration struct { type GPUConfiguration struct {
// DiscoveryEnabled indicates whether GPU discovery is enabled // DiscoveryEnabled indicates whether GPU discovery is enabled
......
...@@ -42,6 +42,7 @@ const ( ...@@ -42,6 +42,7 @@ const (
KubeAnnotationDisableImagePullSecretDiscovery = "nvidia.com/disable-image-pull-secret-discovery" KubeAnnotationDisableImagePullSecretDiscovery = "nvidia.com/disable-image-pull-secret-discovery"
KubeAnnotationDynamoDiscoveryBackend = "nvidia.com/dynamo-discovery-backend" KubeAnnotationDynamoDiscoveryBackend = "nvidia.com/dynamo-discovery-backend"
KubeAnnotationDynamoKubeDiscoveryMode = "nvidia.com/dynamo-kube-discovery-mode"
KubeLabelDynamoGraphDeploymentName = "nvidia.com/dynamo-graph-deployment-name" KubeLabelDynamoGraphDeploymentName = "nvidia.com/dynamo-graph-deployment-name"
KubeLabelDynamoComponent = "nvidia.com/dynamo-component" KubeLabelDynamoComponent = "nvidia.com/dynamo-component"
......
...@@ -41,7 +41,10 @@ func buildCheckpointWorkerDefaultEnv( ...@@ -41,7 +41,10 @@ func buildCheckpointWorkerDefaultEnv(
DynamoNamespace: dynamoNamespace, DynamoNamespace: dynamoNamespace,
ParentGraphDeploymentName: parentGraphDeploymentName, ParentGraphDeploymentName: parentGraphDeploymentName,
ParentGraphDeploymentNamespace: ckpt.Namespace, ParentGraphDeploymentNamespace: ckpt.Namespace,
DiscoveryBackend: discoveryBackend, Discovery: dynamo.DiscoveryContext{
Backend: discoveryBackend,
Mode: configv1alpha1.KubeDiscoveryModePod,
},
WorkerHashSuffix: workerHashSuffix, WorkerHashSuffix: workerHashSuffix,
}) })
return defaultContainer.Env return defaultContainer.Env
......
...@@ -1095,6 +1095,12 @@ func (r *DynamoComponentDeploymentReconciler) generatePodTemplateSpec(ctx contex ...@@ -1095,6 +1095,12 @@ func (r *DynamoComponentDeploymentReconciler) generatePodTemplateSpec(ctx contex
podLabels[commonconsts.KubeLabelDynamoSelector] = kubeName podLabels[commonconsts.KubeLabelDynamoSelector] = kubeName
// Add discovery labels to pod template for Pod-based daemon filtering
if commonController.IsK8sDiscoveryEnabled(r.Config.Discovery.Backend, opt.dynamoComponentDeployment.Spec.Annotations) {
podLabels[commonconsts.KubeLabelDynamoDiscoveryBackend] = "kubernetes"
podLabels[commonconsts.KubeLabelDynamoDiscoveryEnabled] = commonconsts.KubeLabelValueTrue
}
extraPodMetadata := opt.dynamoComponentDeployment.Spec.ExtraPodMetadata extraPodMetadata := opt.dynamoComponentDeployment.Spec.ExtraPodMetadata
if extraPodMetadata != nil { if extraPodMetadata != nil {
......
...@@ -116,6 +116,14 @@ func IsK8sDiscoveryEnabled(discoveryBackend configv1alpha1.DiscoveryBackend, ann ...@@ -116,6 +116,14 @@ func IsK8sDiscoveryEnabled(discoveryBackend configv1alpha1.DiscoveryBackend, ann
return GetDiscoveryBackend(discoveryBackend, annotations) == configv1alpha1.DiscoveryBackendKubernetes return GetDiscoveryBackend(discoveryBackend, annotations) == configv1alpha1.DiscoveryBackendKubernetes
} }
// GetKubeDiscoveryMode returns the kube discovery mode from annotations, defaulting to pod mode.
func GetKubeDiscoveryMode(annotations map[string]string) configv1alpha1.KubeDiscoveryMode {
if mode, exists := annotations[commonconsts.KubeAnnotationDynamoKubeDiscoveryMode]; exists {
return configv1alpha1.KubeDiscoveryMode(mode)
}
return configv1alpha1.KubeDiscoveryModePod
}
// EphemeralDeploymentEventFilter returns a predicate that filters events based on namespace configuration. // EphemeralDeploymentEventFilter returns a predicate that filters events based on namespace configuration.
func EphemeralDeploymentEventFilter(config *configv1alpha1.OperatorConfiguration, runtimeConfig *RuntimeConfig) predicate.Predicate { func EphemeralDeploymentEventFilter(config *configv1alpha1.OperatorConfiguration, runtimeConfig *RuntimeConfig) predicate.Predicate {
return predicate.NewPredicateFuncs(func(o client.Object) bool { return predicate.NewPredicateFuncs(func(o client.Object) bool {
......
...@@ -9,6 +9,7 @@ import ( ...@@ -9,6 +9,7 @@ import (
configv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/config/v1alpha1" configv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/config/v1alpha1"
"github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1" "github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
commonconsts "github.com/ai-dynamo/dynamo/deploy/operator/internal/consts" commonconsts "github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
controller_common "github.com/ai-dynamo/dynamo/deploy/operator/internal/controller_common"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
"k8s.io/utils/ptr" "k8s.io/utils/ptr"
) )
...@@ -43,13 +44,27 @@ func ComponentDefaultsFactory(componentType string) ComponentDefaults { ...@@ -43,13 +44,27 @@ func ComponentDefaultsFactory(componentType string) ComponentDefaults {
// BaseComponentDefaults provides common defaults shared by all components // BaseComponentDefaults provides common defaults shared by all components
type BaseComponentDefaults struct{} type BaseComponentDefaults struct{}
// DiscoveryContext holds resolved discovery settings for a component.
type DiscoveryContext struct {
Backend configv1alpha1.DiscoveryBackend
Mode configv1alpha1.KubeDiscoveryMode
}
// NewDiscoveryContext resolves discovery settings from operator config and component annotations.
func NewDiscoveryContext(defaultBackend configv1alpha1.DiscoveryBackend, annotations map[string]string) DiscoveryContext {
return DiscoveryContext{
Backend: controller_common.GetDiscoveryBackend(defaultBackend, annotations),
Mode: controller_common.GetKubeDiscoveryMode(annotations),
}
}
type ComponentContext struct { type ComponentContext struct {
numberOfNodes int32 numberOfNodes int32
DynamoNamespace string DynamoNamespace string
ComponentType string ComponentType string
ParentGraphDeploymentName string ParentGraphDeploymentName string
ParentGraphDeploymentNamespace string ParentGraphDeploymentNamespace string
DiscoveryBackend configv1alpha1.DiscoveryBackend Discovery DiscoveryContext
EPPConfig *v1alpha1.EPPConfig EPPConfig *v1alpha1.EPPConfig
WorkerHashSuffix string WorkerHashSuffix string
} }
...@@ -121,12 +136,23 @@ func (b *BaseComponentDefaults) getCommonContainer(context ComponentContext) cor ...@@ -121,12 +136,23 @@ func (b *BaseComponentDefaults) getCommonContainer(context ComponentContext) cor
} }
// Set discovery backend env var to "kubernetes" unless explicitly set to "etcd" // Set discovery backend env var to "kubernetes" unless explicitly set to "etcd"
if context.DiscoveryBackend != "etcd" { if context.Discovery.Backend != "etcd" {
container.Env = append(container.Env, corev1.EnvVar{ container.Env = append(container.Env, corev1.EnvVar{
Name: commonconsts.DynamoDiscoveryBackendEnvVar, Name: commonconsts.DynamoDiscoveryBackendEnvVar,
Value: "kubernetes", Value: "kubernetes",
}) })
} }
if context.Discovery.Mode == configv1alpha1.KubeDiscoveryModeContainer {
container.Env = append(container.Env, corev1.EnvVar{
Name: "CONTAINER_NAME",
Value: container.Name,
})
container.Env = append(container.Env, corev1.EnvVar{
Name: "DYN_KUBE_DISCOVERY_MODE",
Value: string(configv1alpha1.KubeDiscoveryModeContainer),
})
}
return container return container
} }
...@@ -1002,7 +1002,7 @@ func GenerateBasePodSpec( ...@@ -1002,7 +1002,7 @@ func GenerateBasePodSpec(
checkpointInfo *checkpoint.CheckpointInfo, // Optional checkpoint info (resolved by ResolveCheckpointForService) checkpointInfo *checkpoint.CheckpointInfo, // Optional checkpoint info (resolved by ResolveCheckpointForService)
) (*corev1.PodSpec, error) { ) (*corev1.PodSpec, error) {
// Start with base container generated per component type // Start with base container generated per component type
componentContext := generateComponentContext(component, parentGraphDeploymentName, namespace, numberOfNodes, controller_common.GetDiscoveryBackend(operatorConfig.Discovery.Backend, component.Annotations)) componentContext := generateComponentContext(component, parentGraphDeploymentName, namespace, numberOfNodes, NewDiscoveryContext(operatorConfig.Discovery.Backend, component.Annotations))
componentDefaults := ComponentDefaultsFactory(component.ComponentType) componentDefaults := ComponentDefaultsFactory(component.ComponentType)
container, err := componentDefaults.GetBaseContainer(componentContext) container, err := componentDefaults.GetBaseContainer(componentContext)
if err != nil { if err != nil {
...@@ -1204,7 +1204,7 @@ func setMetricsLabels(labels map[string]string, dynamoGraphDeployment *v1alpha1. ...@@ -1204,7 +1204,7 @@ func setMetricsLabels(labels map[string]string, dynamoGraphDeployment *v1alpha1.
labels[commonconsts.KubeLabelMetricsEnabled] = commonconsts.KubeLabelValueTrue labels[commonconsts.KubeLabelMetricsEnabled] = commonconsts.KubeLabelValueTrue
} }
func generateComponentContext(component *v1alpha1.DynamoComponentDeploymentSharedSpec, parentGraphDeploymentName string, namespace string, numberOfNodes int32, discoveryBackend configv1alpha1.DiscoveryBackend) ComponentContext { func generateComponentContext(component *v1alpha1.DynamoComponentDeploymentSharedSpec, parentGraphDeploymentName string, namespace string, numberOfNodes int32, discovery DiscoveryContext) ComponentContext {
dynamoNamespace := v1alpha1.ComputeDynamoNamespace(component.GlobalDynamoNamespace, namespace, parentGraphDeploymentName) dynamoNamespace := v1alpha1.ComputeDynamoNamespace(component.GlobalDynamoNamespace, namespace, parentGraphDeploymentName)
var workerHashSuffix string var workerHashSuffix string
if IsWorkerComponent(component.ComponentType) && component.Labels[commonconsts.KubeLabelDynamoWorkerHash] != "" { if IsWorkerComponent(component.ComponentType) && component.Labels[commonconsts.KubeLabelDynamoWorkerHash] != "" {
...@@ -1216,7 +1216,7 @@ func generateComponentContext(component *v1alpha1.DynamoComponentDeploymentShare ...@@ -1216,7 +1216,7 @@ func generateComponentContext(component *v1alpha1.DynamoComponentDeploymentShare
ComponentType: component.ComponentType, ComponentType: component.ComponentType,
ParentGraphDeploymentName: parentGraphDeploymentName, ParentGraphDeploymentName: parentGraphDeploymentName,
ParentGraphDeploymentNamespace: namespace, ParentGraphDeploymentNamespace: namespace,
DiscoveryBackend: discoveryBackend, Discovery: discovery,
DynamoNamespace: dynamoNamespace, DynamoNamespace: dynamoNamespace,
EPPConfig: component.EPPConfig, EPPConfig: component.EPPConfig,
WorkerHashSuffix: workerHashSuffix, WorkerHashSuffix: workerHashSuffix,
...@@ -1238,7 +1238,7 @@ func generateFrontendSidecar( ...@@ -1238,7 +1238,7 @@ func generateFrontendSidecar(
ComponentType: commonconsts.ComponentTypeFrontend, ComponentType: commonconsts.ComponentTypeFrontend,
ParentGraphDeploymentName: parentContext.ParentGraphDeploymentName, ParentGraphDeploymentName: parentContext.ParentGraphDeploymentName,
ParentGraphDeploymentNamespace: parentContext.ParentGraphDeploymentNamespace, ParentGraphDeploymentNamespace: parentContext.ParentGraphDeploymentNamespace,
DiscoveryBackend: parentContext.DiscoveryBackend, Discovery: parentContext.Discovery,
DynamoNamespace: parentContext.DynamoNamespace, DynamoNamespace: parentContext.DynamoNamespace,
} }
...@@ -1305,6 +1305,7 @@ func GeneratePodSpecForComponent( ...@@ -1305,6 +1305,7 @@ func GeneratePodSpecForComponent(
var dgdPropagatedAnnotationKeys = []string{ var dgdPropagatedAnnotationKeys = []string{
commonconsts.KubeAnnotationEnableMetrics, commonconsts.KubeAnnotationEnableMetrics,
commonconsts.KubeAnnotationDynamoDiscoveryBackend, commonconsts.KubeAnnotationDynamoDiscoveryBackend,
commonconsts.KubeAnnotationDynamoKubeDiscoveryMode,
commonconsts.KubeAnnotationDynamoOperatorOriginVersion, commonconsts.KubeAnnotationDynamoOperatorOriginVersion,
commonconsts.KubeAnnotationVLLMDistributedExecutorBackend, commonconsts.KubeAnnotationVLLMDistributedExecutorBackend,
} }
...@@ -1387,6 +1388,7 @@ func GenerateGrovePodCliqueSet( ...@@ -1387,6 +1388,7 @@ func GenerateGrovePodCliqueSet(
} }
discoveryBackend := controller_common.GetDiscoveryBackend(operatorConfig.Discovery.Backend, dynamoDeployment.Annotations) discoveryBackend := controller_common.GetDiscoveryBackend(operatorConfig.Discovery.Backend, dynamoDeployment.Annotations)
discoveryContext := NewDiscoveryContext(operatorConfig.Discovery.Backend, dynamoDeployment.Annotations)
var scalingGroups []grovev1alpha1.PodCliqueScalingGroupConfig var scalingGroups []grovev1alpha1.PodCliqueScalingGroupConfig
for serviceName, component := range dynamoDeployment.Spec.Services { for serviceName, component := range dynamoDeployment.Spec.Services {
...@@ -1432,6 +1434,7 @@ func GenerateGrovePodCliqueSet( ...@@ -1432,6 +1434,7 @@ func GenerateGrovePodCliqueSet(
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to generate podSpec for role %s: %w", r.Name, err) return nil, fmt.Errorf("failed to generate podSpec for role %s: %w", r.Name, err)
} }
if operatorConfig.Checkpoint.Enabled { if operatorConfig.Checkpoint.Enabled {
if err := checkpoint.InjectCheckpointIntoPodSpec( if err := checkpoint.InjectCheckpointIntoPodSpec(
ctx, ctx,
...@@ -1465,7 +1468,7 @@ func GenerateGrovePodCliqueSet( ...@@ -1465,7 +1468,7 @@ func GenerateGrovePodCliqueSet(
if !isMultinode { if !isMultinode {
clique.TopologyConstraint = toGroveTopologyConstraint(component.TopologyConstraint) clique.TopologyConstraint = toGroveTopologyConstraint(component.TopologyConstraint)
} }
labels, err := generateLabels(component, dynamoDeployment, serviceName) labels, err := generateLabels(component, dynamoDeployment, serviceName, discoveryContext)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to generate labels: %w", err) return nil, fmt.Errorf("failed to generate labels: %w", err)
} }
...@@ -1525,6 +1528,7 @@ func generateLabels( ...@@ -1525,6 +1528,7 @@ func generateLabels(
component *v1alpha1.DynamoComponentDeploymentSharedSpec, component *v1alpha1.DynamoComponentDeploymentSharedSpec,
dynamoDeployment *v1alpha1.DynamoGraphDeployment, dynamoDeployment *v1alpha1.DynamoGraphDeployment,
componentName string, componentName string,
discovery DiscoveryContext,
) (map[string]string, error) { ) (map[string]string, error) {
labels := make(map[string]string) labels := make(map[string]string)
labels[commonconsts.KubeLabelDynamoSelector] = GetDCDResourceName(dynamoDeployment, componentName, "") labels[commonconsts.KubeLabelDynamoSelector] = GetDCDResourceName(dynamoDeployment, componentName, "")
...@@ -1553,6 +1557,7 @@ func generateLabels( ...@@ -1553,6 +1557,7 @@ func generateLabels(
return nil, fmt.Errorf("failed to merge extraPodMetadata labels: %w", err) return nil, fmt.Errorf("failed to merge extraPodMetadata labels: %w", err)
} }
} }
// Re-apply system labels after user merge to prevent override
labels[commonconsts.KubeLabelDynamoGraphDeploymentName] = dynamoDeployment.Name labels[commonconsts.KubeLabelDynamoGraphDeploymentName] = dynamoDeployment.Name
if component.ComponentType != "" { if component.ComponentType != "" {
labels[commonconsts.KubeLabelDynamoComponentType] = component.ComponentType labels[commonconsts.KubeLabelDynamoComponentType] = component.ComponentType
...@@ -1563,6 +1568,11 @@ func generateLabels( ...@@ -1563,6 +1568,11 @@ func generateLabels(
if workerHash := component.Labels[commonconsts.KubeLabelDynamoWorkerHash]; workerHash != "" { if workerHash := component.Labels[commonconsts.KubeLabelDynamoWorkerHash]; workerHash != "" {
labels[commonconsts.KubeLabelDynamoWorkerHash] = workerHash labels[commonconsts.KubeLabelDynamoWorkerHash] = workerHash
} }
// Discovery labels on pod template — needed for Pod reflector filtering in container mode
if discovery.Backend == configv1alpha1.DiscoveryBackendKubernetes {
labels[commonconsts.KubeLabelDynamoDiscoveryBackend] = "kubernetes"
labels[commonconsts.KubeLabelDynamoDiscoveryEnabled] = commonconsts.KubeLabelValueTrue
}
return labels, nil return labels, nil
} }
......
...@@ -887,7 +887,7 @@ func TestGenerateComponentContext(t *testing.T) { ...@@ -887,7 +887,7 @@ func TestGenerateComponentContext(t *testing.T) {
tt.parentGraphDeploymentName, tt.parentGraphDeploymentName,
tt.namespace, tt.namespace,
tt.numberOfNodes, tt.numberOfNodes,
tt.discoveryBackend, DiscoveryContext{Backend: tt.discoveryBackend, Mode: configv1alpha1.KubeDiscoveryModePod},
) )
assert.Equal(t, tt.expectedDynamoNamespace, ctx.DynamoNamespace, assert.Equal(t, tt.expectedDynamoNamespace, ctx.DynamoNamespace,
...@@ -896,7 +896,7 @@ func TestGenerateComponentContext(t *testing.T) { ...@@ -896,7 +896,7 @@ func TestGenerateComponentContext(t *testing.T) {
assert.Equal(t, tt.expectedParentDGDName, ctx.ParentGraphDeploymentName) assert.Equal(t, tt.expectedParentDGDName, ctx.ParentGraphDeploymentName)
assert.Equal(t, tt.expectedParentDGDNamespace, ctx.ParentGraphDeploymentNamespace) assert.Equal(t, tt.expectedParentDGDNamespace, ctx.ParentGraphDeploymentNamespace)
assert.Equal(t, tt.numberOfNodes, ctx.numberOfNodes) assert.Equal(t, tt.numberOfNodes, ctx.numberOfNodes)
assert.Equal(t, tt.discoveryBackend, ctx.DiscoveryBackend) assert.Equal(t, tt.discoveryBackend, ctx.Discovery.Backend)
}) })
} }
} }
...@@ -5421,8 +5421,7 @@ func TestGenerateBasePodSpec_Worker(t *testing.T) { ...@@ -5421,8 +5421,7 @@ func TestGenerateBasePodSpec_Worker(t *testing.T) {
Args: []string{"-m", "dynamo.worker"}, Args: []string{"-m", "dynamo.worker"},
Env: []corev1.EnvVar{ Env: []corev1.EnvVar{
{Name: "ANOTHER_COMPONENTENV", Value: "true"}, {Name: "ANOTHER_COMPONENTENV", Value: "true"},
{Name: "ANOTHER_CONTAINER_ENV", Value: "true"}, {Name: "ANOTHER_CONTAINER_ENV", Value: "true"}, {Name: commonconsts.DynamoComponentEnvVar, Value: "worker"},
{Name: commonconsts.DynamoComponentEnvVar, Value: "worker"},
{Name: commonconsts.DynamoDiscoveryBackendEnvVar, Value: "kubernetes"}, {Name: commonconsts.DynamoDiscoveryBackendEnvVar, Value: "kubernetes"},
{Name: "DYN_HEALTH_CHECK_ENABLED", Value: "false"}, {Name: "DYN_HEALTH_CHECK_ENABLED", Value: "false"},
{Name: commonconsts.DynamoNamespaceEnvVar, Value: "default-test-deployment"}, {Name: commonconsts.DynamoNamespaceEnvVar, Value: "default-test-deployment"},
...@@ -6874,6 +6873,7 @@ func TestGenerateLabels_RemovesStaleRestoreLabelsWhenCheckpointNotReady(t *testi ...@@ -6874,6 +6873,7 @@ func TestGenerateLabels_RemovesStaleRestoreLabelsWhenCheckpointNotReady(t *testi
ObjectMeta: metav1.ObjectMeta{Name: "test-dgd"}, ObjectMeta: metav1.ObjectMeta{Name: "test-dgd"},
}, },
"Worker", "Worker",
DiscoveryContext{Backend: configv1alpha1.DiscoveryBackendKubernetes},
) )
require.NoError(t, err) require.NoError(t, err)
annotations := map[string]string{} annotations := map[string]string{}
...@@ -6908,6 +6908,7 @@ func TestGenerateLabels_OverwritesStaleRestoreLabelsWhenCheckpointReady(t *testi ...@@ -6908,6 +6908,7 @@ func TestGenerateLabels_OverwritesStaleRestoreLabelsWhenCheckpointReady(t *testi
ObjectMeta: metav1.ObjectMeta{Name: "test-dgd"}, ObjectMeta: metav1.ObjectMeta{Name: "test-dgd"},
}, },
"Worker", "Worker",
DiscoveryContext{Backend: configv1alpha1.DiscoveryBackendKubernetes},
) )
require.NoError(t, err) require.NoError(t, err)
annotations := map[string]string{} annotations := map[string]string{}
...@@ -6944,6 +6945,7 @@ func TestGenerateLabels_ReassertsRestoreIdentityLabelsAfterMetadataMerge(t *test ...@@ -6944,6 +6945,7 @@ func TestGenerateLabels_ReassertsRestoreIdentityLabelsAfterMetadataMerge(t *test
ObjectMeta: metav1.ObjectMeta{Name: "test-dgd"}, ObjectMeta: metav1.ObjectMeta{Name: "test-dgd"},
}, },
"Worker", "Worker",
DiscoveryContext{Backend: configv1alpha1.DiscoveryBackendKubernetes},
) )
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, "default-test-dgd", labels[commonconsts.KubeLabelDynamoNamespace]) assert.Equal(t, "default-test-dgd", labels[commonconsts.KubeLabelDynamoNamespace])
...@@ -7054,14 +7056,14 @@ func TestGenerateComponentContext_WorkerHashSuffix(t *testing.T) { ...@@ -7054,14 +7056,14 @@ func TestGenerateComponentContext_WorkerHashSuffix(t *testing.T) {
ComponentType: commonconsts.ComponentTypeWorker, ComponentType: commonconsts.ComponentTypeWorker,
Labels: map[string]string{commonconsts.KubeLabelDynamoWorkerHash: "abc123"}, Labels: map[string]string{commonconsts.KubeLabelDynamoWorkerHash: "abc123"},
} }
compCtx := generateComponentContext(component, "dgd", "ns", 1, "kubernetes") compCtx := generateComponentContext(component, "dgd", "ns", 1, DiscoveryContext{Backend: "kubernetes", Mode: configv1alpha1.KubeDiscoveryModePod})
assert.Equal(t, "abc123", compCtx.WorkerHashSuffix) assert.Equal(t, "abc123", compCtx.WorkerHashSuffix)
// Worker without hash label // Worker without hash label
component2 := &v1alpha1.DynamoComponentDeploymentSharedSpec{ component2 := &v1alpha1.DynamoComponentDeploymentSharedSpec{
ComponentType: commonconsts.ComponentTypeWorker, ComponentType: commonconsts.ComponentTypeWorker,
} }
compCtx2 := generateComponentContext(component2, "dgd", "ns", 1, "kubernetes") compCtx2 := generateComponentContext(component2, "dgd", "ns", 1, DiscoveryContext{Backend: "kubernetes", Mode: configv1alpha1.KubeDiscoveryModePod})
assert.Empty(t, compCtx2.WorkerHashSuffix) assert.Empty(t, compCtx2.WorkerHashSuffix)
// Frontend never gets WorkerHashSuffix, even with the label // Frontend never gets WorkerHashSuffix, even with the label
...@@ -7069,7 +7071,7 @@ func TestGenerateComponentContext_WorkerHashSuffix(t *testing.T) { ...@@ -7069,7 +7071,7 @@ func TestGenerateComponentContext_WorkerHashSuffix(t *testing.T) {
ComponentType: commonconsts.ComponentTypeFrontend, ComponentType: commonconsts.ComponentTypeFrontend,
Labels: map[string]string{commonconsts.KubeLabelDynamoWorkerHash: "abc123"}, Labels: map[string]string{commonconsts.KubeLabelDynamoWorkerHash: "abc123"},
} }
compCtx3 := generateComponentContext(component3, "dgd", "ns", 1, "kubernetes") compCtx3 := generateComponentContext(component3, "dgd", "ns", 1, DiscoveryContext{Backend: "kubernetes", Mode: configv1alpha1.KubeDiscoveryModePod})
assert.Empty(t, compCtx3.WorkerHashSuffix) assert.Empty(t, compCtx3.WorkerHashSuffix)
} }
......
...@@ -467,6 +467,17 @@ func (v *DynamoGraphDeploymentValidator) validateAnnotations() error { ...@@ -467,6 +467,17 @@ func (v *DynamoGraphDeploymentValidator) validateAnnotations() error {
} }
} }
// Validate kube discovery mode
if value, exists := annotations[consts.KubeAnnotationDynamoKubeDiscoveryMode]; exists {
switch value {
case "pod", "container":
// valid
default:
errs = append(errs, fmt.Errorf("annotation %s has invalid value %q: must be \"pod\" or \"container\"",
consts.KubeAnnotationDynamoKubeDiscoveryMode, value))
}
}
return errors.Join(errs...) return errors.Join(errs...)
} }
......
...@@ -1956,6 +1956,8 @@ _Appears in:_ ...@@ -1956,6 +1956,8 @@ _Appears in:_
| `enabled` _boolean_ | Enabled overrides auto-detection. nil = auto-detect. | | | | `enabled` _boolean_ | Enabled overrides auto-detection. nil = auto-detect. | | |
#### LWSConfiguration #### LWSConfiguration
......
...@@ -392,6 +392,15 @@ pub mod zmq_broker { ...@@ -392,6 +392,15 @@ pub mod zmq_broker {
pub const ZMQ_BROKER_NAMESPACE: &str = "ZMQ_BROKER_NAMESPACE"; pub const ZMQ_BROKER_NAMESPACE: &str = "ZMQ_BROKER_NAMESPACE";
} }
/// Discovery environment variables
pub mod discovery {
/// Discovery backend: "kubernetes" or "etcd" (default)
pub const DYN_DISCOVERY_BACKEND: &str = "DYN_DISCOVERY_BACKEND";
/// Kube discovery mode: "pod" (default) or "container" (each container registers independently)
pub const DYN_KUBE_DISCOVERY_MODE: &str = "DYN_KUBE_DISCOVERY_MODE";
}
/// CUDA and GPU environment variables /// CUDA and GPU environment variables
pub mod cuda { pub mod cuda {
/// Path to custom CUDA fatbin file. /// Path to custom CUDA fatbin file.
...@@ -529,6 +538,9 @@ mod tests { ...@@ -529,6 +538,9 @@ mod tests {
zmq_broker::ZMQ_BROKER_XSUB_BIND, zmq_broker::ZMQ_BROKER_XSUB_BIND,
zmq_broker::ZMQ_BROKER_XPUB_BIND, zmq_broker::ZMQ_BROKER_XPUB_BIND,
zmq_broker::ZMQ_BROKER_NAMESPACE, zmq_broker::ZMQ_BROKER_NAMESPACE,
// Discovery
discovery::DYN_DISCOVERY_BACKEND,
discovery::DYN_KUBE_DISCOVERY_MODE,
// CUDA // CUDA
cuda::DYN_FATBIN_PATH, cuda::DYN_FATBIN_PATH,
// Build // Build
......
...@@ -6,11 +6,12 @@ mod daemon; ...@@ -6,11 +6,12 @@ mod daemon;
mod utils; mod utils;
pub use crd::{DynamoWorkerMetadata, DynamoWorkerMetadataSpec}; pub use crd::{DynamoWorkerMetadata, DynamoWorkerMetadataSpec};
// hash_pod_name is used by C bindings (EPP) for pod-level worker ID mapping.
pub use utils::hash_pod_name; pub use utils::hash_pod_name;
use crd::{apply_cr, build_cr}; use crd::{apply_cr, build_cr};
use daemon::DiscoveryDaemon; use daemon::DiscoveryDaemon;
use utils::PodInfo; use utils::{KubeDiscoveryMode, PodInfo};
use crate::CancellationToken; use crate::CancellationToken;
use crate::discovery::{ use crate::discovery::{
...@@ -19,7 +20,7 @@ use crate::discovery::{ ...@@ -19,7 +20,7 @@ use crate::discovery::{
}; };
use anyhow::Result; use anyhow::Result;
use async_trait::async_trait; use async_trait::async_trait;
use kube::Client as KubeClient; use kube::{Api, Client as KubeClient, api::DeleteParams};
use std::collections::HashSet; use std::collections::HashSet;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::RwLock; use tokio::sync::RwLock;
...@@ -45,11 +46,14 @@ impl KubeDiscoveryClient { ...@@ -45,11 +46,14 @@ impl KubeDiscoveryClient {
cancel_token: CancellationToken, cancel_token: CancellationToken,
) -> Result<Self> { ) -> Result<Self> {
let pod_info = PodInfo::from_env()?; let pod_info = PodInfo::from_env()?;
let instance_id = hash_pod_name(&pod_info.pod_name); let instance_id = pod_info.target.instance_id();
let cr_name = pod_info.target.cr_name();
tracing::info!( tracing::info!(
"Initializing KubeDiscoveryClient: pod_name={}, instance_id={:x}, namespace={}, pod_uid={}", "Initializing KubeDiscoveryClient: mode={:?}, target={:?}, cr_name={}, instance_id={:x}, namespace={}, pod_uid={}",
pod_info.pod_name, pod_info.mode,
pod_info.target,
cr_name,
instance_id, instance_id,
pod_info.pod_namespace, pod_info.pod_namespace,
pod_info.pod_uid pod_info.pod_uid
...@@ -59,6 +63,27 @@ impl KubeDiscoveryClient { ...@@ -59,6 +63,27 @@ impl KubeDiscoveryClient {
.await .await
.map_err(|e| anyhow::anyhow!("Failed to create Kubernetes client: {}", e))?; .map_err(|e| anyhow::anyhow!("Failed to create Kubernetes client: {}", e))?;
// In container mode, delete any stale CR from a previous incarnation of this container.
// In failover pods, the pod stays alive when a container crashes and restarts,
// so the old CR persists. Deleting it ensures the daemon doesn't see stale data.
// In pod mode this is unnecessary — pod restart creates a new pod (and new CR name).
if pod_info.mode == KubeDiscoveryMode::Container {
let cr_api: Api<DynamoWorkerMetadata> =
Api::namespaced(kube_client.clone(), &pod_info.pod_namespace);
match cr_api.delete(&cr_name, &DeleteParams::default()).await {
Ok(_) => tracing::info!("Deleted stale CR: {}", cr_name),
Err(kube::Error::Api(err_resp)) if err_resp.code == 404 => {
tracing::debug!("No stale CR to delete: {}", cr_name);
}
Err(e) => {
panic!(
"Failed to clear stale CR '{}': {} — cannot start with stale discovery state",
cr_name, e
);
}
}
}
// Create watch channel with initial empty snapshot // Create watch channel with initial empty snapshot
let (watch_tx, watch_rx) = tokio::sync::watch::channel(Arc::new(MetadataSnapshot::empty())); let (watch_tx, watch_rx) = tokio::sync::watch::channel(Arc::new(MetadataSnapshot::empty()));
...@@ -151,7 +176,13 @@ impl Discovery for KubeDiscoveryClient { ...@@ -151,7 +176,13 @@ impl Discovery for KubeDiscoveryClient {
// Build and apply the CR with the updated metadata // Build and apply the CR with the updated metadata
// This persists the metadata to Kubernetes for other pods to discover // This persists the metadata to Kubernetes for other pods to discover
let cr = build_cr(&self.pod_info.pod_name, &self.pod_info.pod_uid, &metadata)?; let cr_name = self.pod_info.target.cr_name();
let cr = build_cr(
&cr_name,
&self.pod_info.pod_name,
&self.pod_info.pod_uid,
&metadata,
)?;
if let Err(e) = apply_cr(&self.kube_client, &self.pod_info.pod_namespace, &cr).await { if let Err(e) = apply_cr(&self.kube_client, &self.pod_info.pod_namespace, &cr).await {
// Rollback local state on CR persistence failure // Rollback local state on CR persistence failure
...@@ -223,7 +254,13 @@ impl Discovery for KubeDiscoveryClient { ...@@ -223,7 +254,13 @@ impl Discovery for KubeDiscoveryClient {
// Build and apply the CR with the updated metadata // Build and apply the CR with the updated metadata
// This persists the removal to Kubernetes for other pods to see // This persists the removal to Kubernetes for other pods to see
let cr = build_cr(&self.pod_info.pod_name, &self.pod_info.pod_uid, &metadata)?; let cr_name = self.pod_info.target.cr_name();
let cr = build_cr(
&cr_name,
&self.pod_info.pod_name,
&self.pod_info.pod_uid,
&metadata,
)?;
if let Err(e) = apply_cr(&self.kube_client, &self.pod_info.pod_namespace, &cr).await { if let Err(e) = apply_cr(&self.kube_client, &self.pod_info.pod_namespace, &cr).await {
// Rollback local state on CR persistence failure // Rollback local state on CR persistence failure
......
...@@ -45,20 +45,22 @@ impl DynamoWorkerMetadataSpec { ...@@ -45,20 +45,22 @@ impl DynamoWorkerMetadataSpec {
/// Build a DynamoWorkerMetadata CR with owner reference set to the pod /// Build a DynamoWorkerMetadata CR with owner reference set to the pod
/// # Arguments /// # Arguments
/// * `pod_name` - Name of the pod (used as CR name and in owner reference) /// * `cr_name` - Name of the CR (from KubeDiscoveryTarget::cr_name)
/// * `pod_name` - Name of the pod (used in owner reference)
/// * `pod_uid` - UID of the pod (for owner reference - enables garbage collection) /// * `pod_uid` - UID of the pod (for owner reference - enables garbage collection)
/// * `metadata` - The DiscoveryMetadata to serialize into the CR's data field /// * `metadata` - The DiscoveryMetadata to serialize into the CR's data field
/// ///
/// # Returns /// # Returns
/// A `DynamoWorkerMetadata` CR ready to be applied to the cluster /// A `DynamoWorkerMetadata` CR ready to be applied to the cluster
pub fn build_cr( pub fn build_cr(
cr_name: &str,
pod_name: &str, pod_name: &str,
pod_uid: &str, pod_uid: &str,
metadata: &DiscoveryMetadata, metadata: &DiscoveryMetadata,
) -> Result<DynamoWorkerMetadata> { ) -> Result<DynamoWorkerMetadata> {
let data = serde_json::to_value(metadata)?; let data = serde_json::to_value(metadata)?;
let spec = DynamoWorkerMetadataSpec::new(data); let spec = DynamoWorkerMetadataSpec::new(data);
let mut cr = DynamoWorkerMetadata::new(pod_name, spec); let mut cr = DynamoWorkerMetadata::new(cr_name, spec);
// Set owner reference to the pod for automatic garbage collection // Set owner reference to the pod for automatic garbage collection
cr.metadata.owner_references = Some(vec![OwnerReference { cr.metadata.owner_references = Some(vec![OwnerReference {
...@@ -66,8 +68,9 @@ pub fn build_cr( ...@@ -66,8 +68,9 @@ pub fn build_cr(
kind: "Pod".to_string(), kind: "Pod".to_string(),
name: pod_name.to_string(), name: pod_name.to_string(),
uid: pod_uid.to_string(), uid: pod_uid.to_string(),
// Mark pod as the controlling owner - CR will be garbage collected when pod is deleted // Mark pod as the controlling owner - CR will be garbage collected when pod is deleted.
controller: Some(true), // In container mode multiple CRs may share one pod; only one can be controller.
controller: Some(cr_name == pod_name),
// Don't block pod deletion - allow CR cleanup to happen asynchronously // Don't block pod deletion - allow CR cleanup to happen asynchronously
block_owner_deletion: Some(false), block_owner_deletion: Some(false),
}]); }]);
......
...@@ -5,6 +5,7 @@ use crate::CancellationToken; ...@@ -5,6 +5,7 @@ use crate::CancellationToken;
use crate::discovery::{DiscoveryMetadata, MetadataSnapshot}; use crate::discovery::{DiscoveryMetadata, MetadataSnapshot};
use anyhow::Result; use anyhow::Result;
use futures::StreamExt; use futures::StreamExt;
use k8s_openapi::api::core::v1::Pod;
use k8s_openapi::api::discovery::v1::EndpointSlice; use k8s_openapi::api::discovery::v1::EndpointSlice;
use kube::{ use kube::{
Api, Client as KubeClient, Api, Client as KubeClient,
...@@ -16,15 +17,109 @@ use tokio::sync::Notify; ...@@ -16,15 +17,109 @@ use tokio::sync::Notify;
use tokio::time::{Duration, timeout}; use tokio::time::{Duration, timeout};
use super::crd::DynamoWorkerMetadata; use super::crd::DynamoWorkerMetadata;
use super::utils::{PodInfo, extract_endpoint_info}; use super::utils::{KubeDiscoveryMode, PodInfo, extract_endpoint_info, extract_ready_containers};
const DEBOUNCE_DURATION: Duration = Duration::from_millis(500); const DEBOUNCE_DURATION: Duration = Duration::from_millis(500);
/// Readiness data source for the discovery daemon.
///
/// Pod mode watches EndpointSlices (one entry per ready pod).
/// Container mode watches Pods directly (one entry per ready container).
/// Both produce the same (instance_id, cr_key) tuples for snapshot correlation.
enum DiscoverySource {
EndpointSlice(reflector::Store<EndpointSlice>),
Pod(reflector::Store<Pod>),
}
impl DiscoverySource {
async fn new(pod_info: &PodInfo, kube_client: KubeClient, notify: Arc<Notify>) -> Self {
let labels = Config::default()
.labels("nvidia.com/dynamo-discovery-backend=kubernetes")
.labels("nvidia.com/dynamo-discovery-enabled=true");
match pod_info.mode {
KubeDiscoveryMode::Pod => {
let api: Api<EndpointSlice> = Api::namespaced(kube_client, &pod_info.pod_namespace);
let (reader, writer) = reflector::store();
tracing::info!("Daemon watching EndpointSlices (pod mode)");
let stream = reflector(writer, watcher(api, labels))
.default_backoff()
.touched_objects()
.for_each(move |res| {
match res {
Ok(obj) => {
tracing::debug!(
name = obj.metadata.name.as_deref().unwrap_or("?"),
"EndpointSlice reflector updated"
);
notify.notify_one();
}
Err(e) => {
tracing::warn!("EndpointSlice reflector error: {e}");
notify.notify_one();
}
}
futures::future::ready(())
});
tokio::spawn(stream);
Self::EndpointSlice(reader)
}
KubeDiscoveryMode::Container => {
let api: Api<Pod> = Api::namespaced(kube_client, &pod_info.pod_namespace);
let (reader, writer) = reflector::store();
tracing::info!("Daemon watching Pods (container mode)");
let stream = reflector(writer, watcher(api, labels))
.default_backoff()
.touched_objects()
.for_each(move |res| {
match res {
Ok(obj) => {
tracing::debug!(
name = obj.metadata.name.as_deref().unwrap_or("?"),
"Pod reflector updated"
);
notify.notify_one();
}
Err(e) => {
tracing::warn!("Pod reflector error: {e}");
notify.notify_one();
}
}
futures::future::ready(())
});
tokio::spawn(stream);
Self::Pod(reader)
}
}
}
fn ready_entries(&self) -> Vec<(u64, String)> {
match self {
Self::EndpointSlice(reader) => reader
.state()
.iter()
.flat_map(|s| extract_endpoint_info(s.as_ref()))
.collect(),
Self::Pod(reader) => reader
.state()
.iter()
.flat_map(|p| extract_ready_containers(p.as_ref()))
.collect(),
}
}
}
/// Discovers and aggregates metadata from DynamoWorkerMetadata CRs in the cluster /// Discovers and aggregates metadata from DynamoWorkerMetadata CRs in the cluster
#[derive(Clone)] #[derive(Clone)]
pub(super) struct DiscoveryDaemon { pub(super) struct DiscoveryDaemon {
kube_client: KubeClient, kube_client: KubeClient,
// This pod's info
pod_info: PodInfo, pod_info: PodInfo,
cancel_token: CancellationToken, cancel_token: CancellationToken,
} }
...@@ -42,67 +137,27 @@ impl DiscoveryDaemon { ...@@ -42,67 +137,27 @@ impl DiscoveryDaemon {
}) })
} }
/// Run the discovery daemon /// Run the discovery daemon.
/// ///
/// Watches both EndpointSlices (to know which pods are ready) and /// Watches a readiness source and DynamoWorkerMetadata CRs. An entry is
/// DynamoWorkerMetadata CRs (to get the metadata for each pod). /// included in the snapshot only if it appears ready AND has a matching CR.
/// A pod is included in the snapshot only if:
/// 1. It appears as ready in an EndpointSlice
/// 2. It has a corresponding DynamoWorkerMetadata CR
pub async fn run( pub async fn run(
self, self,
watch_tx: tokio::sync::watch::Sender<Arc<MetadataSnapshot>>, watch_tx: tokio::sync::watch::Sender<Arc<MetadataSnapshot>>,
) -> Result<()> { ) -> Result<()> {
tracing::info!("Discovery daemon starting"); tracing::info!("Discovery daemon starting");
// Create notify for watch-driven updates (shared by both reflectors)
let notify = Arc::new(Notify::new()); let notify = Arc::new(Notify::new());
// --- EndpointSlice Reflector --- // Readiness source — EndpointSlice or Pod depending on mode
let endpoint_slices: Api<EndpointSlice> = let source =
Api::namespaced(self.kube_client.clone(), &self.pod_info.pod_namespace); DiscoverySource::new(&self.pod_info, self.kube_client.clone(), notify.clone()).await;
let (ep_reader, ep_writer) = reflector::store();
let ep_watch_config = Config::default()
.labels("nvidia.com/dynamo-discovery-backend=kubernetes")
.labels("nvidia.com/dynamo-discovery-enabled=true");
tracing::info!( // DynamoWorkerMetadata CR reflector
"Daemon watching EndpointSlices with labels: nvidia.com/dynamo-discovery-backend=kubernetes, nvidia.com/dynamo-discovery-enabled=true"
);
let notify_ep = notify.clone();
let ep_reflector_stream = reflector(ep_writer, watcher(endpoint_slices, ep_watch_config))
.default_backoff()
.touched_objects()
.for_each(move |res| {
match res {
Ok(obj) => {
tracing::debug!(
slice_name = obj.metadata.name.as_deref().unwrap_or("unknown"),
"EndpointSlice reflector updated"
);
notify_ep.notify_one();
}
Err(e) => {
tracing::warn!("EndpointSlice reflector error: {e}");
notify_ep.notify_one();
}
}
// for_each expects a Future; ready(()) is an immediately-complete one
futures::future::ready(())
});
tokio::spawn(ep_reflector_stream);
// --- DynamoWorkerMetadata CR Reflector ---
let metadata_crs: Api<DynamoWorkerMetadata> = let metadata_crs: Api<DynamoWorkerMetadata> =
Api::namespaced(self.kube_client.clone(), &self.pod_info.pod_namespace); Api::namespaced(self.kube_client.clone(), &self.pod_info.pod_namespace);
let (cr_reader, cr_writer) = reflector::store(); let (cr_reader, cr_writer) = reflector::store();
// Watch all DynamoWorkerMetadata CRs in the namespace
let cr_watch_config = Config::default(); let cr_watch_config = Config::default();
tracing::info!( tracing::info!(
...@@ -128,7 +183,6 @@ impl DiscoveryDaemon { ...@@ -128,7 +183,6 @@ impl DiscoveryDaemon {
notify_cr.notify_one(); notify_cr.notify_one();
} }
} }
// for_each expects a Future; ready(()) is an immediately-complete one
futures::future::ready(()) futures::future::ready(())
}); });
...@@ -141,16 +195,12 @@ impl DiscoveryDaemon { ...@@ -141,16 +195,12 @@ impl DiscoveryDaemon {
loop { loop {
tokio::select! { tokio::select! {
_ = notify.notified() => { _ = notify.notified() => {
// Debounce: K8s can emit many events in quick succession
// Wait briefly to batch them into a single snapshot update.
tokio::time::sleep(DEBOUNCE_DURATION).await; tokio::time::sleep(DEBOUNCE_DURATION).await;
// Drain any permit that accumulated during the sleep
let _ = timeout(Duration::ZERO, notify.notified()).await; let _ = timeout(Duration::ZERO, notify.notified()).await;
tracing::trace!("Debounce window elapsed, processing snapshot"); tracing::trace!("Debounce window elapsed, processing snapshot");
match self.aggregate_snapshot(&ep_reader, &cr_reader, sequence).await { match self.aggregate_snapshot(&source, &cr_reader, sequence).await {
Ok(snapshot) => { Ok(snapshot) => {
if snapshot.has_changes_from(&prev_snapshot) { if snapshot.has_changes_from(&prev_snapshot) {
prev_snapshot = snapshot.clone(); prev_snapshot = snapshot.clone();
...@@ -165,7 +215,6 @@ impl DiscoveryDaemon { ...@@ -165,7 +215,6 @@ impl DiscoveryDaemon {
} }
Err(e) => { Err(e) => {
tracing::error!("Failed to aggregate snapshot: {e}"); tracing::error!("Failed to aggregate snapshot: {e}");
// Continue on errors - don't crash daemon
} }
} }
} }
...@@ -180,33 +229,22 @@ impl DiscoveryDaemon { ...@@ -180,33 +229,22 @@ impl DiscoveryDaemon {
Ok(()) Ok(())
} }
/// Aggregate metadata from EndpointSlices and DynamoWorkerMetadata CRs into a snapshot
///
/// A pod is included in the snapshot only if:
/// 1. It appears as ready in an EndpointSlice
/// 2. It has a corresponding DynamoWorkerMetadata CR (CR name = pod name)
async fn aggregate_snapshot( async fn aggregate_snapshot(
&self, &self,
ep_reader: &reflector::Store<EndpointSlice>, source: &DiscoverySource,
cr_reader: &reflector::Store<DynamoWorkerMetadata>, cr_reader: &reflector::Store<DynamoWorkerMetadata>,
sequence: u64, sequence: u64,
) -> Result<MetadataSnapshot> { ) -> Result<MetadataSnapshot> {
let start = std::time::Instant::now(); let start = std::time::Instant::now();
// Extract ready pods from EndpointSlices: (instance_id, pod_name) let ready_entries = source.ready_entries();
let ready_pods: Vec<(u64, String)> = ep_reader
.state()
.iter()
.flat_map(|arc_slice| extract_endpoint_info(arc_slice.as_ref()))
.collect();
tracing::trace!( tracing::trace!(
"Daemon found {} ready pods from EndpointSlices", "Daemon found {} ready entries (mode={:?})",
ready_pods.len() ready_entries.len(),
self.pod_info.mode,
); );
// Single read of CR state to extract metadata and generations atomically
// We store (metadata, generation) tuples keyed by CR name (= pod name)
let cr_state = cr_reader.state(); let cr_state = cr_reader.state();
let mut cr_map: HashMap<String, (Arc<DiscoveryMetadata>, i64)> = HashMap::new(); let mut cr_map: HashMap<String, (Arc<DiscoveryMetadata>, i64)> = HashMap::new();
...@@ -217,7 +255,6 @@ impl DiscoveryDaemon { ...@@ -217,7 +255,6 @@ impl DiscoveryDaemon {
let generation = arc_cr.metadata.generation.unwrap_or(0); let generation = arc_cr.metadata.generation.unwrap_or(0);
// Deserialize the data field to DiscoveryMetadata
match serde_json::from_value::<DiscoveryMetadata>(arc_cr.spec.data.clone()) { match serde_json::from_value::<DiscoveryMetadata>(arc_cr.spec.data.clone()) {
Ok(metadata) => { Ok(metadata) => {
tracing::trace!("Loaded metadata from CR '{cr_name}'"); tracing::trace!("Loaded metadata from CR '{cr_name}'");
...@@ -235,26 +272,23 @@ impl DiscoveryDaemon { ...@@ -235,26 +272,23 @@ impl DiscoveryDaemon {
tracing::trace!("Daemon loaded {} DynamoWorkerMetadata CRs", cr_map.len()); tracing::trace!("Daemon loaded {} DynamoWorkerMetadata CRs", cr_map.len());
// Correlate: ready pod + CR exists = include in snapshot
// Both instances and generations are keyed by instance_id with matching keys
let mut instances: HashMap<u64, Arc<DiscoveryMetadata>> = HashMap::new(); let mut instances: HashMap<u64, Arc<DiscoveryMetadata>> = HashMap::new();
let mut generations: HashMap<u64, i64> = HashMap::new(); let mut generations: HashMap<u64, i64> = HashMap::new();
for (instance_id, pod_name) in ready_pods { for (instance_id, cr_key) in ready_entries {
// CR name is the pod name if let Some((metadata, generation)) = cr_map.get(&cr_key) {
if let Some((metadata, generation)) = cr_map.get(&pod_name) {
instances.insert(instance_id, metadata.clone()); instances.insert(instance_id, metadata.clone());
generations.insert(instance_id, *generation); generations.insert(instance_id, *generation);
tracing::trace!( tracing::trace!(
"Included pod '{}' (instance_id={:x}, generation={}) in snapshot", "Included '{}' (instance_id={:x}, generation={}) in snapshot",
pod_name, cr_key,
instance_id, instance_id,
generation generation
); );
} else { } else {
tracing::trace!( tracing::trace!(
"Skipping pod '{}' (instance_id={:x}): no DynamoWorkerMetadata CR found", "Skipping '{}' (instance_id={:x}): no DynamoWorkerMetadata CR found",
pod_name, cr_key,
instance_id instance_id
); );
} }
......
...@@ -2,23 +2,86 @@ ...@@ -2,23 +2,86 @@
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
use anyhow::Result; use anyhow::Result;
use k8s_openapi::api::core::v1::Pod;
use k8s_openapi::api::discovery::v1::EndpointSlice; use k8s_openapi::api::discovery::v1::EndpointSlice;
use std::collections::hash_map::DefaultHasher; use std::collections::hash_map::DefaultHasher;
use std::fs; use std::fs;
use std::hash::{Hash, Hasher}; use std::hash::{Hash, Hasher};
use std::path::Path; use std::path::Path;
/// Hash a pod name to get a consistent instance ID use crate::config::environment_names::discovery;
const INSTANCE_ID_MASK: u64 = 0x001F_FFFF_FFFF_FFFFu64;
const MAIN_CONTAINER_NAME: &str = "main";
/// Kube discovery mode.
///
/// - `Pod`: default. One identity per pod.
/// - `Container`: each container independently registers with the discovery plane.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(super) enum KubeDiscoveryMode {
Pod,
Container,
}
impl KubeDiscoveryMode {
pub fn from_env() -> Result<Self> {
match std::env::var(discovery::DYN_KUBE_DISCOVERY_MODE).as_deref() {
Ok("container") => Ok(Self::Container),
Ok("pod") | Err(_) => Ok(Self::Pod),
Ok(other) => anyhow::bail!(
"Invalid DYN_KUBE_DISCOVERY_MODE value '{}'. Valid values: 'pod', 'container'",
other
),
}
}
}
/// A resolved discovery target identifying either a pod or a specific container within a pod.
#[derive(Debug, Clone, PartialEq, Eq)]
pub(super) enum KubeDiscoveryTarget {
Pod(String),
Container(String, String),
}
impl KubeDiscoveryTarget {
/// CR name for this target, used as the DynamoWorkerMetadata resource name.
pub fn cr_name(&self) -> String {
match self {
Self::Pod(pod_name) => pod_name.clone(),
Self::Container(pod_name, container_name) if container_name == MAIN_CONTAINER_NAME => {
pod_name.clone()
}
Self::Container(pod_name, container_name) => {
format!("{}-{}", pod_name, container_name)
}
}
}
/// Deterministic instance ID derived from cr_name.
pub fn instance_id(&self) -> u64 {
let mut hasher = DefaultHasher::new();
self.cr_name().hash(&mut hasher);
hasher.finish() & INSTANCE_ID_MASK
}
pub fn pod_name(&self) -> &str {
match self {
Self::Pod(pod_name) | Self::Container(pod_name, _) => pod_name,
}
}
}
/// Hash a pod name to get a consistent instance ID (pod-level).
///
/// Used by C bindings (EPP) for pod-level worker ID mapping.
pub fn hash_pod_name(pod_name: &str) -> u64 { pub fn hash_pod_name(pod_name: &str) -> u64 {
// Clear top 11 bits to ensure it can be safely rounded to IEEE-754 f64
const INSTANCE_ID_MASK: u64 = 0x001F_FFFF_FFFF_FFFFu64;
let mut hasher = DefaultHasher::new(); let mut hasher = DefaultHasher::new();
pod_name.hash(&mut hasher); pod_name.hash(&mut hasher);
hasher.finish() & INSTANCE_ID_MASK hasher.finish() & INSTANCE_ID_MASK
} }
/// Extract endpoint information from an EndpointSlice /// Extract (instance_id, pod_name) tuples from an EndpointSlice for ready endpoints.
/// Returns (instance_id, pod_name) tuples for ready endpoints
pub(super) fn extract_endpoint_info(slice: &EndpointSlice) -> Vec<(u64, String)> { pub(super) fn extract_endpoint_info(slice: &EndpointSlice) -> Vec<(u64, String)> {
let mut result = Vec::new(); let mut result = Vec::new();
...@@ -42,53 +105,65 @@ pub(super) fn extract_endpoint_info(slice: &EndpointSlice) -> Vec<(u64, String)> ...@@ -42,53 +105,65 @@ pub(super) fn extract_endpoint_info(slice: &EndpointSlice) -> Vec<(u64, String)>
continue; continue;
} }
let instance_id = hash_pod_name(pod_name); let target = KubeDiscoveryTarget::Pod(pod_name.to_string());
result.push((target.instance_id(), target.cr_name()));
result.push((instance_id, pod_name.to_string()));
} }
result result
} }
/// Pod information extracted from environment /// Extract (instance_id, cr_name) tuples from a Pod for each ready container.
pub(super) fn extract_ready_containers(pod: &Pod) -> Vec<(u64, String)> {
let pod_name = match pod.metadata.name.as_deref() {
Some(name) => name,
None => return vec![],
};
let container_statuses = match pod
.status
.as_ref()
.and_then(|s| s.container_statuses.as_ref())
{
Some(statuses) => statuses,
None => return vec![],
};
container_statuses
.iter()
.filter(|cs| cs.ready)
.map(|cs| {
let target = KubeDiscoveryTarget::Container(pod_name.to_string(), cs.name.clone());
(target.instance_id(), target.cr_name())
})
.collect()
}
/// Pod information extracted from environment.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub(super) struct PodInfo { pub(super) struct PodInfo {
pub pod_name: String, pub pod_name: String,
pub pod_namespace: String, pub pod_namespace: String,
pub pod_uid: String, pub pod_uid: String,
pub system_port: u16, pub system_port: u16,
/// Kube discovery mode for this process, read from DYN_KUBE_DISCOVERY_MODE.
pub mode: KubeDiscoveryMode,
/// Discovery target for this process, derived from mode + pod/container identity.
pub target: KubeDiscoveryTarget,
} }
/// Default path for Kubernetes Downward API volume mount
const DEFAULT_PODINFO_PATH: &str = "/etc/podinfo"; const DEFAULT_PODINFO_PATH: &str = "/etc/podinfo";
impl PodInfo { impl PodInfo {
/// Read a value from a Downward API file, falling back to environment variable
fn read_from_file_or_env(file_path: &Path, env_var: &str) -> Option<String> { fn read_from_file_or_env(file_path: &Path, env_var: &str) -> Option<String> {
// First try reading from file (Downward API volume mount)
// This is preferred after CRIU restore since env vars contain stale values
if let Ok(content) = fs::read_to_string(file_path) { if let Ok(content) = fs::read_to_string(file_path) {
let value = content.trim().to_string(); let value = content.trim().to_string();
if !value.is_empty() { if !value.is_empty() {
return Some(value); return Some(value);
} }
} }
// Fall back to environment variable
std::env::var(env_var).ok() std::env::var(env_var).ok()
} }
/// Discover pod information from Kubernetes Downward API volume mounts or environment variables
///
/// This function first attempts to read pod identity from Downward API volume mounts
/// at /etc/podinfo/{pod_name, pod_uid, pod_namespace}. This is critical for CRIU
/// checkpoint/restore scenarios where environment variables contain stale values
/// from the checkpoint source pod.
///
/// If the Downward API files are not available, falls back to environment variables:
/// - `POD_NAME`: Name of the pod (required)
/// - `POD_UID`: UID of the pod (required for CR owner reference)
/// - `POD_NAMESPACE`: Namespace of the pod (defaults to "default")
pub fn from_env() -> Result<Self> { pub fn from_env() -> Result<Self> {
let podinfo_path = Path::new(DEFAULT_PODINFO_PATH); let podinfo_path = Path::new(DEFAULT_PODINFO_PATH);
...@@ -105,7 +180,20 @@ impl PodInfo { ...@@ -105,7 +180,20 @@ impl PodInfo {
"default".to_string() "default".to_string()
}); });
// Log where we got the pod info from for debugging let mode = KubeDiscoveryMode::from_env()?;
let target = match mode {
KubeDiscoveryMode::Pod => KubeDiscoveryTarget::Pod(pod_name.clone()),
KubeDiscoveryMode::Container => {
let container_name = std::env::var("CONTAINER_NAME").map_err(|_| {
anyhow::anyhow!(
"CONTAINER_NAME is required when DYN_KUBE_DISCOVERY_MODE=container"
)
})?;
KubeDiscoveryTarget::Container(pod_name.clone(), container_name)
}
};
if podinfo_path.join("pod_name").exists() { if podinfo_path.join("pod_name").exists() {
tracing::info!( tracing::info!(
"Pod identity loaded from Downward API volume mount at {}", "Pod identity loaded from Downward API volume mount at {}",
...@@ -115,7 +203,6 @@ impl PodInfo { ...@@ -115,7 +203,6 @@ impl PodInfo {
tracing::info!("Pod identity loaded from environment variables"); tracing::info!("Pod identity loaded from environment variables");
} }
// Read system server port from config
let config = crate::config::RuntimeConfig::from_settings().unwrap_or_default(); let config = crate::config::RuntimeConfig::from_settings().unwrap_or_default();
let system_port = config.system_port as u16; let system_port = config.system_port as u16;
...@@ -124,6 +211,8 @@ impl PodInfo { ...@@ -124,6 +211,8 @@ impl PodInfo {
pod_namespace, pod_namespace,
pod_uid, pod_uid,
system_port, system_port,
mode,
target,
}) })
} }
} }
...@@ -133,46 +222,32 @@ mod tests { ...@@ -133,46 +222,32 @@ mod tests {
use super::*; use super::*;
#[test] #[test]
fn test_hash_json_serialization_roundtrip() { fn test_pod_mode_backward_compat() {
// Verify that JSON serialization/deserialization preserves exact values // Pod mode must produce the same instance_id as hash_pod_name
let pod_names = [ // so existing deployments see no identity change on upgrade.
"worker-0", let target = KubeDiscoveryTarget::Pod("worker-0".into());
"worker-99999", assert_eq!(target.instance_id(), hash_pod_name("worker-0"));
"deployment-with-hash-suffix-a1b2c3d4e5f6", assert_eq!(target.cr_name(), "worker-0");
"fake-name-1-0-worker-nrdfv",
];
for pod_name in &pod_names {
let original_hash = hash_pod_name(pod_name);
let json = serde_json::to_string(&original_hash).unwrap();
let deserialized_hash: u64 = serde_json::from_str(&json).unwrap();
assert_eq!(
original_hash, deserialized_hash,
"JSON roundtrip changed hash value for pod_name={:?}: {} -> {} (json: {})",
pod_name, original_hash, deserialized_hash, json
);
}
} }
#[test] #[test]
fn test_hash_in_struct_serialization() { fn test_container_mode_main_uses_pod_identity() {
// Test serialization when the hash is embedded in a struct // A container named "main" uses pod-level identity so that
#[derive(serde::Serialize, serde::Deserialize, Debug, PartialEq)] // container-mode frontends can discover pod-mode workers.
struct WorkerInfo { let target = KubeDiscoveryTarget::Container("worker-0".into(), "main".into());
instance_id: u64, assert_eq!(target.instance_id(), hash_pod_name("worker-0"));
name: String, assert_eq!(target.cr_name(), "worker-0");
} }
let pod_name = "fake-name-1-0-worker-nrdfv";
let info = WorkerInfo {
instance_id: hash_pod_name(pod_name),
name: pod_name.to_string(),
};
let json = serde_json::to_string(&info).unwrap();
let deserialized: WorkerInfo = serde_json::from_str(&json).unwrap();
assert_eq!(info, deserialized); #[test]
fn test_container_mode_engine_gets_unique_identity() {
// Non-main containers get per-container identity so that
// failover engine containers are independently discoverable.
let e0 = KubeDiscoveryTarget::Container("worker-0".into(), "engine-0".into());
let e1 = KubeDiscoveryTarget::Container("worker-0".into(), "engine-1".into());
assert_eq!(e0.cr_name(), "worker-0-engine-0");
assert_eq!(e1.cr_name(), "worker-0-engine-1");
assert_ne!(e0.instance_id(), e1.instance_id());
assert_ne!(e0.instance_id(), hash_pod_name("worker-0"));
} }
} }
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment