Unverified Commit 9e2a2cc9 authored by Julien Mancuso's avatar Julien Mancuso Committed by GitHub
Browse files

feat: add epp component (#5611)


Signed-off-by: default avatarJulien Mancuso <jmancuso@nvidia.com>
parent 6271a31f
...@@ -51,9 +51,11 @@ import ( ...@@ -51,9 +51,11 @@ import (
"github.com/ai-dynamo/dynamo/deploy/operator/internal/consts" "github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
commoncontroller "github.com/ai-dynamo/dynamo/deploy/operator/internal/controller_common" commoncontroller "github.com/ai-dynamo/dynamo/deploy/operator/internal/controller_common"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/dynamo" "github.com/ai-dynamo/dynamo/deploy/operator/internal/dynamo"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/dynamo/epp"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/observability" "github.com/ai-dynamo/dynamo/deploy/operator/internal/observability"
webhookvalidation "github.com/ai-dynamo/dynamo/deploy/operator/internal/webhook/validation" webhookvalidation "github.com/ai-dynamo/dynamo/deploy/operator/internal/webhook/validation"
rbacv1 "k8s.io/api/rbac/v1" rbacv1 "k8s.io/api/rbac/v1"
gaiev1 "sigs.k8s.io/gateway-api-inference-extension/api/v1"
) )
type State string type State string
...@@ -94,6 +96,7 @@ type DynamoGraphDeploymentReconciler struct { ...@@ -94,6 +96,7 @@ type DynamoGraphDeploymentReconciler struct {
// +kubebuilder:rbac:groups=grove.io,resources=podcliques/scale,verbs=get;update;patch // +kubebuilder:rbac:groups=grove.io,resources=podcliques/scale,verbs=get;update;patch
// +kubebuilder:rbac:groups=grove.io,resources=podcliquescalinggroups/scale,verbs=get;update;patch // +kubebuilder:rbac:groups=grove.io,resources=podcliquescalinggroups/scale,verbs=get;update;patch
// +kubebuilder:rbac:groups=scheduling.run.ai,resources=queues,verbs=get;list // +kubebuilder:rbac:groups=scheduling.run.ai,resources=queues,verbs=get;list
// +kubebuilder:rbac:groups=inference.networking.k8s.io,resources=inferencepools,verbs=get;list;watch;create;update;patch;delete
// Reconcile is part of the main kubernetes reconciliation loop which aims to // Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state. // move the current state of the cluster closer to the desired state.
...@@ -169,7 +172,7 @@ func (r *DynamoGraphDeploymentReconciler) Reconcile(ctx context.Context, req ctr ...@@ -169,7 +172,7 @@ func (r *DynamoGraphDeploymentReconciler) Reconcile(ctx context.Context, req ctr
// Validate the DynamoGraphDeployment spec (defense in depth - only when webhooks are disabled) // Validate the DynamoGraphDeployment spec (defense in depth - only when webhooks are disabled)
if !r.Config.WebhooksEnabled { if !r.Config.WebhooksEnabled {
validator := webhookvalidation.NewDynamoGraphDeploymentValidator(dynamoDeployment) validator := webhookvalidation.NewDynamoGraphDeploymentValidator(dynamoDeployment)
if _, validationErr := validator.Validate(); validationErr != nil { if _, validationErr := validator.Validate(ctx); validationErr != nil {
logger.Error(validationErr, "DynamoGraphDeployment validation failed, refusing to reconcile") logger.Error(validationErr, "DynamoGraphDeployment validation failed, refusing to reconcile")
// Set validation error state and reason (defer will update status) // Set validation error state and reason (defer will update status)
...@@ -239,6 +242,22 @@ func (r *DynamoGraphDeploymentReconciler) reconcileResources(ctx context.Context ...@@ -239,6 +242,22 @@ func (r *DynamoGraphDeploymentReconciler) reconcileResources(ctx context.Context
logger.Error(err, "Failed to ensure planner RBAC") logger.Error(err, "Failed to ensure planner RBAC")
return ReconcileResult{}, fmt.Errorf("failed to ensure planner RBAC: %w", err) return ReconcileResult{}, fmt.Errorf("failed to ensure planner RBAC: %w", err)
} }
// Ensure EPP RBAC exists in cluster-wide mode if EPP service is present
if dynamoDeployment.HasEPPService() {
if r.Config.RBAC.EPPClusterRoleName == "" {
return ReconcileResult{}, fmt.Errorf("EPP ClusterRole name is required in cluster-wide mode when EPP service is present")
}
if err := r.RBACManager.EnsureServiceAccountWithRBAC(
ctx,
dynamoDeployment.Namespace,
consts.EPPServiceAccountName,
r.Config.RBAC.EPPClusterRoleName,
); err != nil {
logger.Error(err, "Failed to ensure EPP RBAC")
return ReconcileResult{}, fmt.Errorf("failed to ensure EPP RBAC: %w", err)
}
}
} }
// Reconcile top-level PVCs first // Reconcile top-level PVCs first
...@@ -262,6 +281,13 @@ func (r *DynamoGraphDeploymentReconciler) reconcileResources(ctx context.Context ...@@ -262,6 +281,13 @@ func (r *DynamoGraphDeploymentReconciler) reconcileResources(ctx context.Context
return ReconcileResult{}, fmt.Errorf("failed to reconcile K8s discovery resources: %w", err) return ReconcileResult{}, fmt.Errorf("failed to reconcile K8s discovery resources: %w", err)
} }
// Reconcile EPP resources (ConfigMaps, Services, InferencePools) if EPP service exists
err = r.reconcileEPPResources(ctx, dynamoDeployment)
if err != nil {
logger.Error(err, "Failed to reconcile EPP resources")
return ReconcileResult{}, fmt.Errorf("failed to reconcile EPP resources: %w", err)
}
// Determine if any service is multinode // Determine if any service is multinode
hasMultinode := dynamoDeployment.HasAnyMultinodeService() hasMultinode := dynamoDeployment.HasAnyMultinodeService()
...@@ -1164,6 +1190,60 @@ func generateAdapterName(dgdName, serviceName string) string { ...@@ -1164,6 +1190,60 @@ func generateAdapterName(dgdName, serviceName string) string {
return fmt.Sprintf("%s-%s", dgdName, strings.ToLower(serviceName)) return fmt.Sprintf("%s-%s", dgdName, strings.ToLower(serviceName))
} }
// hasEPPService checks if the DGD has an EPP service defined
// reconcileEPPResources reconciles all EPP-related resources (ConfigMaps, Services, InferencePools)
func (r *DynamoGraphDeploymentReconciler) reconcileEPPResources(ctx context.Context, dgd *nvidiacomv1alpha1.DynamoGraphDeployment) error {
logger := log.FromContext(ctx)
componentName, eppService, hasEPP := dgd.GetEPPService()
if !hasEPP {
logger.V(1).Info("No EPP service defined, skipping EPP resource reconciliation")
return nil
}
logger.Info("Reconciling EPP resources", "componentName", componentName)
// 1. Reconcile EPP ConfigMap (if needed - not needed when ConfigMapRef is used)
if eppService.EPPConfig == nil || eppService.EPPConfig.ConfigMapRef == nil {
configMap, err := epp.GenerateConfigMap(ctx, dgd, componentName, eppService.EPPConfig)
if err != nil {
logger.Error(err, "Failed to generate EPP ConfigMap")
return fmt.Errorf("failed to generate EPP ConfigMap: %w", err)
}
if configMap != nil {
_, _, err = commoncontroller.SyncResource(ctx, r, dgd, func(ctx context.Context) (*corev1.ConfigMap, bool, error) {
return configMap, false, nil
})
if err != nil {
logger.Error(err, "Failed to sync EPP ConfigMap")
return fmt.Errorf("failed to sync EPP ConfigMap: %w", err)
}
}
}
// 2. Reconcile InferencePool
// Note: EPP Service is created automatically by the standard component reconciliation
// via GenerateComponentService() in graph.go (see ComponentTypeEPP case)
eppServiceName := dynamo.GetDynamoComponentName(dgd, componentName)
inferencePool, err := epp.GenerateInferencePool(dgd, componentName, eppServiceName, eppService.EPPConfig)
if err != nil {
logger.Error(err, "Failed to generate EPP InferencePool")
return fmt.Errorf("failed to generate EPP InferencePool: %w", err)
}
_, _, err = commoncontroller.SyncResource(ctx, r, dgd, func(ctx context.Context) (*gaiev1.InferencePool, bool, error) {
return inferencePool, false, nil
})
if err != nil {
logger.Error(err, "Failed to sync EPP InferencePool")
return fmt.Errorf("failed to sync EPP InferencePool: %w", err)
}
logger.Info("Successfully reconciled EPP resources", "poolName", inferencePool.GetName())
return nil
}
func (r *DynamoGraphDeploymentReconciler) FinalizeResource(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) error { func (r *DynamoGraphDeploymentReconciler) FinalizeResource(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) error {
// for now doing nothing // for now doing nothing
return nil return nil
......
...@@ -92,6 +92,8 @@ type RBACConfig struct { ...@@ -92,6 +92,8 @@ type RBACConfig struct {
PlannerClusterRoleName string PlannerClusterRoleName string
// DGDRProfilingClusterRoleName is the name of the ClusterRole for DGDR profiling jobs (cluster-wide mode only) // DGDRProfilingClusterRoleName is the name of the ClusterRole for DGDR profiling jobs (cluster-wide mode only)
DGDRProfilingClusterRoleName string DGDRProfilingClusterRoleName string
// EPPClusterRoleName is the name of the ClusterRole for EPP (cluster-wide mode only)
EPPClusterRoleName string
} }
type IngressConfig struct { type IngressConfig struct {
...@@ -129,6 +131,12 @@ func DetectKaiSchedulerAvailability(ctx context.Context, mgr ctrl.Manager) bool ...@@ -129,6 +131,12 @@ func DetectKaiSchedulerAvailability(ctx context.Context, mgr ctrl.Manager) bool
return detectAPIGroupAvailability(ctx, mgr, "scheduling.run.ai") return detectAPIGroupAvailability(ctx, mgr, "scheduling.run.ai")
} }
// DetectInferencePoolAvailability checks if the Gateway API Inference Extension is available
// by checking if the inference.networking.k8s.io API group is registered
func DetectInferencePoolAvailability(ctx context.Context, mgr ctrl.Manager) bool {
return detectAPIGroupAvailability(ctx, mgr, "inference.networking.k8s.io")
}
// detectAPIGroupAvailability checks if a specific API group is registered in the cluster // detectAPIGroupAvailability checks if a specific API group is registered in the cluster
func detectAPIGroupAvailability(ctx context.Context, mgr ctrl.Manager, groupName string) bool { func detectAPIGroupAvailability(ctx context.Context, mgr ctrl.Manager, groupName string) bool {
logger := log.FromContext(ctx) logger := log.FromContext(ctx)
......
...@@ -6,6 +6,7 @@ ...@@ -6,6 +6,7 @@
package dynamo package dynamo
import ( import (
"github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
commonconsts "github.com/ai-dynamo/dynamo/deploy/operator/internal/consts" commonconsts "github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
"k8s.io/utils/ptr" "k8s.io/utils/ptr"
...@@ -31,6 +32,8 @@ func ComponentDefaultsFactory(componentType string) ComponentDefaults { ...@@ -31,6 +32,8 @@ func ComponentDefaultsFactory(componentType string) ComponentDefaults {
return NewWorkerDefaults() return NewWorkerDefaults()
case commonconsts.ComponentTypePlanner: case commonconsts.ComponentTypePlanner:
return NewPlannerDefaults() return NewPlannerDefaults()
case commonconsts.ComponentTypeEPP:
return NewEPPDefaults()
default: default:
return &BaseComponentDefaults{} return &BaseComponentDefaults{}
} }
...@@ -46,6 +49,7 @@ type ComponentContext struct { ...@@ -46,6 +49,7 @@ type ComponentContext struct {
ParentGraphDeploymentName string ParentGraphDeploymentName string
ParentGraphDeploymentNamespace string ParentGraphDeploymentNamespace string
DiscoveryBackend string DiscoveryBackend string
EPPConfig *v1alpha1.EPPConfig
} }
func (b *BaseComponentDefaults) GetBaseContainer(context ComponentContext) (corev1.Container, error) { func (b *BaseComponentDefaults) GetBaseContainer(context ComponentContext) (corev1.Container, error) {
......
/*
* SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*/
package dynamo
import (
"fmt"
commonconsts "github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/dynamo/epp"
corev1 "k8s.io/api/core/v1"
"k8s.io/utils/ptr"
)
// EPPDefaults implements ComponentDefaults for EPP (Endpoint Picker Plugin) components
type EPPDefaults struct {
*BaseComponentDefaults
}
func NewEPPDefaults() *EPPDefaults {
return &EPPDefaults{&BaseComponentDefaults{}}
}
func (e *EPPDefaults) GetBaseContainer(context ComponentContext) (corev1.Container, error) {
container := e.getCommonContainer(context)
// EPP uses gRPC, so we need gRPC probes (not HTTP)
// Port 9002: gRPC endpoint for InferencePool communication
// Port 9003: gRPC health check endpoint
// Port 9090: Metrics endpoint
container.Ports = []corev1.ContainerPort{
{
Protocol: corev1.ProtocolTCP,
Name: commonconsts.EPPGRPCPortName,
ContainerPort: commonconsts.EPPGRPCPort,
},
{
Protocol: corev1.ProtocolTCP,
Name: "grpc-health",
ContainerPort: 9003,
},
{
Protocol: corev1.ProtocolTCP,
Name: commonconsts.DynamoMetricsPortName,
ContainerPort: 9090,
},
}
// gRPC-based probes
container.LivenessProbe = &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
GRPC: &corev1.GRPCAction{
Port: 9003,
Service: ptr.To("inference-extension"),
},
},
InitialDelaySeconds: 5,
PeriodSeconds: 10,
}
container.ReadinessProbe = &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
GRPC: &corev1.GRPCAction{
Port: 9003,
Service: ptr.To("inference-extension"),
},
},
InitialDelaySeconds: 5,
PeriodSeconds: 10,
}
// EPP-specific environment variables
container.Env = append(container.Env, []corev1.EnvVar{
{
Name: "DYN_KV_BLOCK_SIZE",
Value: "16",
},
{
// DYN_DISCOVERY_TIMEOUT_SEC is how long to wait for workers to register (in seconds)
Name: "DYN_DISCOVERY_TIMEOUT_SEC",
Value: "300",
},
{
Name: "USE_STREAMING",
Value: "true",
},
{
Name: "RUST_LOG",
Value: "debug,dynamo_llm::kv_router=trace",
},
}...)
// EPP default args
// These can be overridden via extraPodSpec.mainContainer.args (mergo.WithOverride)
poolName := epp.GetPoolName(context.ParentGraphDeploymentName, context.EPPConfig)
poolNamespace := epp.GetPoolNamespace(context.ParentGraphDeploymentNamespace, context.EPPConfig)
configFilePath := epp.GetConfigFilePath()
container.Command = []string{}
container.Args = []string{
"-pool-name", poolName,
"-pool-namespace", poolNamespace,
"-pool-group", epp.InferencePoolGroup,
"-v", "4",
"--zap-encoder", "json",
"-grpc-port", fmt.Sprintf("%d", commonconsts.EPPGRPCPort),
"-grpc-health-port", "9003",
"-config-file", configFilePath,
}
// Mount EPP config
_, volumeMount := epp.GetConfigMapVolumeMount(context.ParentGraphDeploymentName, context.EPPConfig)
container.VolumeMounts = append(container.VolumeMounts, volumeMount)
// Mount HuggingFace cache directory for model config downloads
hfCacheMount := corev1.VolumeMount{
Name: "hf-cache",
MountPath: "/home/nonroot/.cache",
}
container.VolumeMounts = append(container.VolumeMounts, hfCacheMount)
return container, nil
}
func (e *EPPDefaults) GetBasePodSpec(context ComponentContext) (corev1.PodSpec, error) {
podSpec := e.getCommonPodSpec()
// EPP uses global service account (like planner)
podSpec.ServiceAccountName = commonconsts.EPPServiceAccountName
// EPP needs longer grace period for graceful shutdown
podSpec.TerminationGracePeriodSeconds = ptr.To(int64(130))
// Add EPP config volume
volume, _ := epp.GetConfigMapVolumeMount(context.ParentGraphDeploymentName, context.EPPConfig)
podSpec.Volumes = append(podSpec.Volumes, volume)
// Add emptyDir volume for HuggingFace cache (needed for downloading model config files)
hfCacheVolume := corev1.Volume{
Name: "hf-cache",
VolumeSource: corev1.VolumeSource{
EmptyDir: &corev1.EmptyDirVolumeSource{},
},
}
podSpec.Volumes = append(podSpec.Volumes, hfCacheVolume)
return podSpec, nil
}
/*
* SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*/
package epp
import (
"context"
"fmt"
"github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apixv1alpha1 "sigs.k8s.io/gateway-api-inference-extension/apix/config/v1alpha1"
"sigs.k8s.io/yaml"
)
const (
// ConfigMapSuffix is appended to DGD name to create EPP ConfigMap name
ConfigMapSuffix = "epp-config"
// ConfigKey is the key in the ConfigMap containing the EPP configuration
ConfigKey = "epp-config-dynamo.yaml"
)
// GenerateConfigMap generates a ConfigMap for EPP configuration
// Returns nil if ConfigMapRef is used (user provides their own ConfigMap)
// Returns error if neither ConfigMapRef nor Config is provided
func GenerateConfigMap(
ctx context.Context,
dgd *v1alpha1.DynamoGraphDeployment,
componentName string,
eppConfig *v1alpha1.EPPConfig,
) (*corev1.ConfigMap, error) {
// If user provides ConfigMapRef, they manage the ConfigMap themselves
if eppConfig != nil && eppConfig.ConfigMapRef != nil {
return nil, nil
}
// User MUST provide either ConfigMapRef or Config (no default)
if eppConfig == nil || eppConfig.Config == nil {
return nil, fmt.Errorf("EPP configuration is required: either eppConfig.configMapRef or eppConfig.config must be specified")
}
// User provided inline config as Go struct - marshal to YAML
configYAML, err := marshalEndpointPickerConfig(eppConfig.Config)
if err != nil {
return nil, fmt.Errorf("failed to marshal EPP config: %w", err)
}
configMapName := GetConfigMapName(dgd.Name)
configMap := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: configMapName,
Namespace: dgd.Namespace,
Labels: map[string]string{
consts.KubeLabelDynamoGraphDeploymentName: dgd.Name,
consts.KubeLabelDynamoComponent: componentName,
consts.KubeLabelDynamoComponentType: consts.ComponentTypeEPP,
},
},
Data: map[string]string{
ConfigKey: configYAML,
},
}
return configMap, nil
}
// GetConfigMapName returns the ConfigMap name for a given DGD
func GetConfigMapName(dgdName string) string {
return fmt.Sprintf("%s-%s", dgdName, ConfigMapSuffix)
}
// marshalEndpointPickerConfig marshals EndpointPickerConfig to YAML with proper API metadata
func marshalEndpointPickerConfig(config *apixv1alpha1.EndpointPickerConfig) (string, error) {
// Set the TypeMeta fields using upstream constants
config.TypeMeta = metav1.TypeMeta{
APIVersion: apixv1alpha1.SchemeGroupVersion.String(),
Kind: "EndpointPickerConfig",
}
yamlBytes, err := yaml.Marshal(config)
if err != nil {
return "", fmt.Errorf("failed to marshal EndpointPickerConfig to YAML: %w", err)
}
return string(yamlBytes), nil
}
// GetConfigMapVolumeMount returns the volume and volumeMount for EPP config
func GetConfigMapVolumeMount(dgdName string, eppConfig *v1alpha1.EPPConfig) (corev1.Volume, corev1.VolumeMount) {
configMapName := dgdName + "-" + ConfigMapSuffix
configKey := ConfigKey
// If user provides their own ConfigMap, use that
if eppConfig != nil && eppConfig.ConfigMapRef != nil {
configMapName = eppConfig.ConfigMapRef.Name
// Allow user to specify custom key, default to ConfigKey if not specified
if eppConfig.ConfigMapRef.Key != "" {
configKey = eppConfig.ConfigMapRef.Key
}
}
volume := corev1.Volume{
Name: "epp-config",
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: configMapName,
},
Items: []corev1.KeyToPath{
{
Key: configKey,
Path: ConfigKey, // Always mount to the same path regardless of source key
},
},
},
},
}
volumeMount := corev1.VolumeMount{
Name: "epp-config",
MountPath: "/etc/epp",
ReadOnly: true,
}
return volume, volumeMount
}
// GetConfigFilePath returns the path where EPP config is mounted in the container
// Note: The config is always mounted at this path regardless of the source ConfigMap key
// because GetConfigMapVolumeMount() maps any custom key to ConfigKey in the Path field
func GetConfigFilePath() string {
return fmt.Sprintf("/etc/epp/%s", ConfigKey)
}
/*
* SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*/
package epp
import (
"fmt"
"github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
gaiev1 "sigs.k8s.io/gateway-api-inference-extension/api/v1"
)
const (
// InferencePoolGroup is the API group for InferencePool (stable API)
// Using the stable v1 API group instead of the experimental x-k8s.io group
InferencePoolGroup = gaiev1.GroupName
)
// InferencePoolGVK is the GroupVersionKind for InferencePool (stable API)
var InferencePoolGVK = schema.GroupVersionKind{
Group: gaiev1.GroupName,
Version: gaiev1.GroupVersion.Version,
Kind: "InferencePool",
}
// GenerateInferencePool generates an InferencePool resource for EPP
// This solves the chicken-and-egg problem: EPP needs the pool name, pool needs EPP service
// Using the stable inference.networking.k8s.io/v1 API (per PR #5592)
func GenerateInferencePool(
dgd *v1alpha1.DynamoGraphDeployment,
componentName string,
eppServiceName string,
eppConfig *v1alpha1.EPPConfig,
) (*gaiev1.InferencePool, error) {
poolName := GetPoolName(dgd.Name, eppConfig)
poolNamespace := GetPoolNamespace(dgd.Namespace, eppConfig)
dynamoNamespace := dgd.GetDynamoNamespaceForService(dgd.Spec.Services[componentName])
// Build InferencePool using typed API
pool := &gaiev1.InferencePool{
ObjectMeta: metav1.ObjectMeta{
Name: poolName,
Namespace: poolNamespace,
Labels: map[string]string{
consts.KubeLabelDynamoGraphDeploymentName: dgd.Name,
consts.KubeLabelDynamoComponent: componentName,
consts.KubeLabelDynamoComponentType: consts.ComponentTypeEPP,
},
},
Spec: gaiev1.InferencePoolSpec{
TargetPorts: []gaiev1.Port{
{Number: consts.DynamoServicePort}, // Frontend port
},
Selector: gaiev1.LabelSelector{
MatchLabels: map[gaiev1.LabelKey]gaiev1.LabelValue{
consts.KubeLabelDynamoComponentType: consts.ComponentTypeFrontend,
consts.KubeLabelDynamoNamespace: gaiev1.LabelValue(dynamoNamespace),
},
},
EndpointPickerRef: gaiev1.EndpointPickerRef{
Kind: "Service",
Name: gaiev1.ObjectName(eppServiceName),
Port: &gaiev1.Port{
Number: consts.EPPGRPCPort,
},
},
},
}
return pool, nil
}
// GetPoolName returns the InferencePool name for a given DGD
func GetPoolName(dgdName string, eppConfig *v1alpha1.EPPConfig) string {
return fmt.Sprintf("%s-pool", dgdName)
}
// GetPoolNamespace returns the InferencePool namespace for a given DGD
func GetPoolNamespace(dgdNamespace string, eppConfig *v1alpha1.EPPConfig) string {
return dgdNamespace
}
...@@ -552,14 +552,24 @@ func GenerateComponentService(ctx context.Context, dynamoDeployment *v1alpha1.Dy ...@@ -552,14 +552,24 @@ func GenerateComponentService(ctx context.Context, dynamoDeployment *v1alpha1.Dy
componentName = GetDynamoComponentName(dynamoDeployment, componentName) componentName = GetDynamoComponentName(dynamoDeployment, componentName)
var servicePort corev1.ServicePort var servicePort corev1.ServicePort
if component.ComponentType == commonconsts.ComponentTypeFrontend { switch component.ComponentType {
case commonconsts.ComponentTypeFrontend:
servicePort = corev1.ServicePort{ servicePort = corev1.ServicePort{
Name: commonconsts.DynamoServicePortName, Name: commonconsts.DynamoServicePortName,
Port: commonconsts.DynamoServicePort, Port: commonconsts.DynamoServicePort,
TargetPort: intstr.FromString(commonconsts.DynamoContainerPortName), TargetPort: intstr.FromString(commonconsts.DynamoContainerPortName),
Protocol: corev1.ProtocolTCP, Protocol: corev1.ProtocolTCP,
} }
} else { case commonconsts.ComponentTypeEPP:
// EPP only exposes the gRPC endpoint for InferencePool communication
servicePort = corev1.ServicePort{
Name: commonconsts.EPPGRPCPortName,
Port: commonconsts.EPPGRPCPort,
TargetPort: intstr.FromInt(commonconsts.EPPGRPCPort),
Protocol: corev1.ProtocolTCP,
AppProtocol: ptr.To("http2"),
}
default:
servicePort = corev1.ServicePort{ servicePort = corev1.ServicePort{
Name: commonconsts.DynamoSystemPortName, Name: commonconsts.DynamoSystemPortName,
Port: commonconsts.DynamoSystemPort, Port: commonconsts.DynamoSystemPort,
...@@ -1085,6 +1095,7 @@ func generateComponentContext(component *v1alpha1.DynamoComponentDeploymentShare ...@@ -1085,6 +1095,7 @@ func generateComponentContext(component *v1alpha1.DynamoComponentDeploymentShare
ParentGraphDeploymentNamespace: namespace, ParentGraphDeploymentNamespace: namespace,
DiscoveryBackend: discoveryBackend, DiscoveryBackend: discoveryBackend,
DynamoNamespace: dynamoNamespace, DynamoNamespace: dynamoNamespace,
EPPConfig: component.EPPConfig,
} }
return componentContext return componentContext
} }
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
package validation package validation
import ( import (
"context"
"fmt" "fmt"
nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1" nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
...@@ -38,15 +39,16 @@ func NewDynamoComponentDeploymentValidator(deployment *nvidiacomv1alpha1.DynamoC ...@@ -38,15 +39,16 @@ func NewDynamoComponentDeploymentValidator(deployment *nvidiacomv1alpha1.DynamoC
} }
// Validate performs stateless validation on the DynamoComponentDeployment. // Validate performs stateless validation on the DynamoComponentDeployment.
// Context is required for operations that may need to query the cluster (e.g., CRD checks).
// Returns warnings and error. // Returns warnings and error.
func (v *DynamoComponentDeploymentValidator) Validate() (admission.Warnings, error) { func (v *DynamoComponentDeploymentValidator) Validate(ctx context.Context) (admission.Warnings, error) {
// Validate shared spec fields using SharedSpecValidator // Validate shared spec fields using SharedSpecValidator
calculatedNamespace := v.deployment.GetDynamoNamespace() calculatedNamespace := v.deployment.GetDynamoNamespace()
sharedValidator := NewSharedSpecValidator(&v.deployment.Spec.DynamoComponentDeploymentSharedSpec, "spec", calculatedNamespace) sharedValidator := NewSharedSpecValidator(&v.deployment.Spec.DynamoComponentDeploymentSharedSpec, "spec", calculatedNamespace)
// DCD-specific validation would go here (currently none) // DCD-specific validation would go here (currently none)
return sharedValidator.Validate() return sharedValidator.Validate(ctx)
} }
// ValidateUpdate performs stateful validation comparing old and new DynamoComponentDeployment. // ValidateUpdate performs stateful validation comparing old and new DynamoComponentDeployment.
......
...@@ -59,7 +59,7 @@ func (h *DynamoComponentDeploymentHandler) ValidateCreate(ctx context.Context, o ...@@ -59,7 +59,7 @@ func (h *DynamoComponentDeploymentHandler) ValidateCreate(ctx context.Context, o
// Create validator and perform validation // Create validator and perform validation
validator := NewDynamoComponentDeploymentValidator(deployment) validator := NewDynamoComponentDeploymentValidator(deployment)
return validator.Validate() return validator.Validate(ctx)
} }
// ValidateUpdate validates a DynamoComponentDeployment update request. // ValidateUpdate validates a DynamoComponentDeployment update request.
...@@ -88,7 +88,7 @@ func (h *DynamoComponentDeploymentHandler) ValidateUpdate(ctx context.Context, o ...@@ -88,7 +88,7 @@ func (h *DynamoComponentDeploymentHandler) ValidateUpdate(ctx context.Context, o
validator := NewDynamoComponentDeploymentValidator(newDeployment) validator := NewDynamoComponentDeploymentValidator(newDeployment)
// Validate stateless rules // Validate stateless rules
warnings, err := validator.Validate() warnings, err := validator.Validate(ctx)
if err != nil { if err != nil {
return warnings, err return warnings, err
} }
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
package validation package validation
import ( import (
"context"
"testing" "testing"
nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1" nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
...@@ -133,7 +134,7 @@ func TestDynamoComponentDeploymentValidator_Validate(t *testing.T) { ...@@ -133,7 +134,7 @@ func TestDynamoComponentDeploymentValidator_Validate(t *testing.T) {
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
validator := NewDynamoComponentDeploymentValidator(tt.deployment) validator := NewDynamoComponentDeploymentValidator(tt.deployment)
_, err := validator.Validate() _, err := validator.Validate(context.Background())
if (err != nil) != tt.wantErr { if (err != nil) != tt.wantErr {
t.Errorf("DynamoComponentDeploymentValidator.Validate() error = %v, wantErr %v", err, tt.wantErr) t.Errorf("DynamoComponentDeploymentValidator.Validate() error = %v, wantErr %v", err, tt.wantErr)
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
package validation package validation
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
"sort" "sort"
...@@ -25,6 +26,7 @@ import ( ...@@ -25,6 +26,7 @@ import (
nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1" nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
internalwebhook "github.com/ai-dynamo/dynamo/deploy/operator/internal/webhook" internalwebhook "github.com/ai-dynamo/dynamo/deploy/operator/internal/webhook"
authenticationv1 "k8s.io/api/authentication/v1" authenticationv1 "k8s.io/api/authentication/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission" "sigs.k8s.io/controller-runtime/pkg/webhook/admission"
) )
...@@ -32,18 +34,29 @@ import ( ...@@ -32,18 +34,29 @@ import (
// This validator can be used by both webhooks and controllers for consistent validation. // This validator can be used by both webhooks and controllers for consistent validation.
type DynamoGraphDeploymentValidator struct { type DynamoGraphDeploymentValidator struct {
deployment *nvidiacomv1alpha1.DynamoGraphDeployment deployment *nvidiacomv1alpha1.DynamoGraphDeployment
mgr ctrl.Manager // Optional: for API group detection via discovery client
} }
// NewDynamoGraphDeploymentValidator creates a new validator for DynamoGraphDeployment. // NewDynamoGraphDeploymentValidator creates a new validator for DynamoGraphDeployment.
func NewDynamoGraphDeploymentValidator(deployment *nvidiacomv1alpha1.DynamoGraphDeployment) *DynamoGraphDeploymentValidator { func NewDynamoGraphDeploymentValidator(deployment *nvidiacomv1alpha1.DynamoGraphDeployment) *DynamoGraphDeploymentValidator {
return &DynamoGraphDeploymentValidator{ return &DynamoGraphDeploymentValidator{
deployment: deployment, deployment: deployment,
mgr: nil,
}
}
// NewDynamoGraphDeploymentValidatorWithManager creates a validator with a manager for API group detection.
func NewDynamoGraphDeploymentValidatorWithManager(deployment *nvidiacomv1alpha1.DynamoGraphDeployment, mgr ctrl.Manager) *DynamoGraphDeploymentValidator {
return &DynamoGraphDeploymentValidator{
deployment: deployment,
mgr: mgr,
} }
} }
// Validate performs stateless validation on the DynamoGraphDeployment. // Validate performs stateless validation on the DynamoGraphDeployment.
// Context is required for operations that may need to query the cluster (e.g., CRD checks).
// Returns warnings and error. // Returns warnings and error.
func (v *DynamoGraphDeploymentValidator) Validate() (admission.Warnings, error) { func (v *DynamoGraphDeploymentValidator) Validate(ctx context.Context) (admission.Warnings, error) {
// Validate that at least one service is specified // Validate that at least one service is specified
if len(v.deployment.Spec.Services) == 0 { if len(v.deployment.Spec.Services) == 0 {
return nil, fmt.Errorf("spec.services must have at least one service") return nil, fmt.Errorf("spec.services must have at least one service")
...@@ -63,7 +76,7 @@ func (v *DynamoGraphDeploymentValidator) Validate() (admission.Warnings, error) ...@@ -63,7 +76,7 @@ func (v *DynamoGraphDeploymentValidator) Validate() (admission.Warnings, error)
// Validate each service // Validate each service
for serviceName, service := range v.deployment.Spec.Services { for serviceName, service := range v.deployment.Spec.Services {
warnings, err := v.validateService(serviceName, service) warnings, err := v.validateService(ctx, serviceName, service)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -223,12 +236,19 @@ func (v *DynamoGraphDeploymentValidator) validateReplicasChanges(old *nvidiacomv ...@@ -223,12 +236,19 @@ func (v *DynamoGraphDeploymentValidator) validateReplicasChanges(old *nvidiacomv
// validateService validates a single service configuration using SharedSpecValidator. // validateService validates a single service configuration using SharedSpecValidator.
// Returns warnings and error. // Returns warnings and error.
func (v *DynamoGraphDeploymentValidator) validateService(serviceName string, service *nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec) (admission.Warnings, error) { func (v *DynamoGraphDeploymentValidator) validateService(ctx context.Context, serviceName string, service *nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec) (admission.Warnings, error) {
// Use SharedSpecValidator to validate service spec (which is a DynamoComponentDeploymentSharedSpec) // Use SharedSpecValidator to validate service spec (which is a DynamoComponentDeploymentSharedSpec)
fieldPath := fmt.Sprintf("spec.services[%s]", serviceName) fieldPath := fmt.Sprintf("spec.services[%s]", serviceName)
calculatedNamespace := v.deployment.GetDynamoNamespaceForService(service) calculatedNamespace := v.deployment.GetDynamoNamespaceForService(service)
sharedValidator := NewSharedSpecValidator(service, fieldPath, calculatedNamespace)
return sharedValidator.Validate() var sharedValidator *SharedSpecValidator
if v.mgr != nil {
sharedValidator = NewSharedSpecValidatorWithManager(service, fieldPath, calculatedNamespace, v.mgr)
} else {
sharedValidator = NewSharedSpecValidator(service, fieldPath, calculatedNamespace)
}
return sharedValidator.Validate(ctx)
} }
// validatePVCs validates the PVC configurations. // validatePVCs validates the PVC configurations.
......
...@@ -40,11 +40,15 @@ const ( ...@@ -40,11 +40,15 @@ const (
// DynamoGraphDeploymentHandler is a handler for validating DynamoGraphDeployment resources. // DynamoGraphDeploymentHandler is a handler for validating DynamoGraphDeployment resources.
// It is a thin wrapper around DynamoGraphDeploymentValidator. // It is a thin wrapper around DynamoGraphDeploymentValidator.
type DynamoGraphDeploymentHandler struct{} type DynamoGraphDeploymentHandler struct {
mgr manager.Manager
}
// NewDynamoGraphDeploymentHandler creates a new handler for DynamoGraphDeployment Webhook. // NewDynamoGraphDeploymentHandler creates a new handler for DynamoGraphDeployment Webhook.
func NewDynamoGraphDeploymentHandler() *DynamoGraphDeploymentHandler { func NewDynamoGraphDeploymentHandler(mgr manager.Manager) *DynamoGraphDeploymentHandler {
return &DynamoGraphDeploymentHandler{} return &DynamoGraphDeploymentHandler{
mgr: mgr,
}
} }
// ValidateCreate validates a DynamoGraphDeployment create request. // ValidateCreate validates a DynamoGraphDeployment create request.
...@@ -58,9 +62,9 @@ func (h *DynamoGraphDeploymentHandler) ValidateCreate(ctx context.Context, obj r ...@@ -58,9 +62,9 @@ func (h *DynamoGraphDeploymentHandler) ValidateCreate(ctx context.Context, obj r
logger.Info("validate create", "name", deployment.Name, "namespace", deployment.Namespace) logger.Info("validate create", "name", deployment.Name, "namespace", deployment.Namespace)
// Create validator and perform validation // Create validator with manager for API group detection and perform validation
validator := NewDynamoGraphDeploymentValidator(deployment) validator := NewDynamoGraphDeploymentValidatorWithManager(deployment, h.mgr)
return validator.Validate() return validator.Validate(ctx)
} }
// ValidateUpdate validates a DynamoGraphDeployment update request. // ValidateUpdate validates a DynamoGraphDeployment update request.
...@@ -85,11 +89,11 @@ func (h *DynamoGraphDeploymentHandler) ValidateUpdate(ctx context.Context, oldOb ...@@ -85,11 +89,11 @@ func (h *DynamoGraphDeploymentHandler) ValidateUpdate(ctx context.Context, oldOb
return nil, err return nil, err
} }
// Create validator and perform validation // Create validator with manager for API group detection and perform validation
validator := NewDynamoGraphDeploymentValidator(newDeployment) validator := NewDynamoGraphDeploymentValidatorWithManager(newDeployment, h.mgr)
// Validate stateless rules // Validate stateless rules
warnings, err := validator.Validate() warnings, err := validator.Validate(ctx)
if err != nil { if err != nil {
return warnings, err return warnings, err
} }
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
package validation package validation
import ( import (
"context"
"sort" "sort"
"strings" "strings"
"testing" "testing"
...@@ -511,7 +512,7 @@ func TestDynamoGraphDeploymentValidator_Validate(t *testing.T) { ...@@ -511,7 +512,7 @@ func TestDynamoGraphDeploymentValidator_Validate(t *testing.T) {
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
validator := NewDynamoGraphDeploymentValidator(tt.deployment) validator := NewDynamoGraphDeploymentValidator(tt.deployment)
_, err := validator.Validate() _, err := validator.Validate(context.Background())
if (err != nil) != tt.wantErr { if (err != nil) != tt.wantErr {
t.Errorf("DynamoGraphDeploymentValidator.Validate() error = %v, wantErr %v", err, tt.wantErr) t.Errorf("DynamoGraphDeploymentValidator.Validate() error = %v, wantErr %v", err, tt.wantErr)
......
...@@ -18,9 +18,14 @@ ...@@ -18,9 +18,14 @@
package validation package validation
import ( import (
"context"
"fmt" "fmt"
nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1" nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
controllercommon "github.com/ai-dynamo/dynamo/deploy/operator/internal/controller_common"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/dynamo/epp"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission" "sigs.k8s.io/controller-runtime/pkg/webhook/admission"
) )
...@@ -29,8 +34,9 @@ import ( ...@@ -29,8 +34,9 @@ import (
// to provide consistent validation logic for shared spec fields. // to provide consistent validation logic for shared spec fields.
type SharedSpecValidator struct { type SharedSpecValidator struct {
spec *nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec spec *nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec
fieldPath string // e.g., "spec" for DCD, "spec.services[foo]" for DGD fieldPath string // e.g., "spec" for DCD, "spec.services[foo]" for DGD
calculatedNamespace string // The namespace that will be used: {k8s_namespace}-{dgd_name} calculatedNamespace string // The namespace that will be used: {k8s_namespace}-{dgd_name}
mgr ctrl.Manager // Optional: for API group detection via discovery client
} }
// NewSharedSpecValidator creates a new validator for DynamoComponentDeploymentSharedSpec. // NewSharedSpecValidator creates a new validator for DynamoComponentDeploymentSharedSpec.
...@@ -43,12 +49,25 @@ func NewSharedSpecValidator(spec *nvidiacomv1alpha1.DynamoComponentDeploymentSha ...@@ -43,12 +49,25 @@ func NewSharedSpecValidator(spec *nvidiacomv1alpha1.DynamoComponentDeploymentSha
spec: spec, spec: spec,
fieldPath: fieldPath, fieldPath: fieldPath,
calculatedNamespace: calculatedNamespace, calculatedNamespace: calculatedNamespace,
mgr: nil,
}
}
// NewSharedSpecValidatorWithManager creates a validator with a manager for API group detection.
// This allows the validator to check for API group availability (e.g., inference.networking.k8s.io) when validating EPP components.
func NewSharedSpecValidatorWithManager(spec *nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec, fieldPath string, calculatedNamespace string, mgr ctrl.Manager) *SharedSpecValidator {
return &SharedSpecValidator{
spec: spec,
fieldPath: fieldPath,
calculatedNamespace: calculatedNamespace,
mgr: mgr,
} }
} }
// Validate performs validation on the shared spec fields. // Validate performs validation on the shared spec fields.
// Context is required for any operations that may need to query the cluster (e.g., CRD checks).
// Returns warnings (e.g., deprecation notices) and error if validation fails. // Returns warnings (e.g., deprecation notices) and error if validation fails.
func (v *SharedSpecValidator) Validate() (admission.Warnings, error) { func (v *SharedSpecValidator) Validate(ctx context.Context) (admission.Warnings, error) {
// Collect warnings (e.g., deprecation notices) // Collect warnings (e.g., deprecation notices)
var warnings admission.Warnings var warnings admission.Warnings
...@@ -93,6 +112,11 @@ func (v *SharedSpecValidator) Validate() (admission.Warnings, error) { ...@@ -93,6 +112,11 @@ func (v *SharedSpecValidator) Validate() (admission.Warnings, error) {
v.fieldPath)) v.fieldPath))
} }
// Validate EPP-specific constraints
if err := v.validateEPPConfig(ctx); err != nil {
return nil, err
}
return warnings, nil return warnings, nil
} }
...@@ -131,3 +155,72 @@ func (v *SharedSpecValidator) validateSharedMemory() error { ...@@ -131,3 +155,72 @@ func (v *SharedSpecValidator) validateSharedMemory() error {
} }
return nil return nil
} }
// validateEPPConfig validates EPP-specific configuration constraints.
func (v *SharedSpecValidator) validateEPPConfig(ctx context.Context) error {
// Only validate if this is an EPP component
if v.spec.ComponentType != consts.ComponentTypeEPP {
return nil
}
// Check if InferencePool API group is available in the cluster (if manager is provided)
if v.mgr != nil {
if err := v.checkInferencePoolAPIAvailability(ctx); err != nil {
return fmt.Errorf("%s: cannot deploy EPP component: %w", v.fieldPath, err)
}
}
// EPP must be single-node (cannot be multinode)
if v.spec.IsMultinode() {
return fmt.Errorf("%s: EPP component cannot be multinode (multinode field must be nil or nodeCount must be 1)", v.fieldPath)
}
// EPP should have exactly 1 replica (optional constraint - can be relaxed if needed)
if v.spec.Replicas != nil && *v.spec.Replicas != 1 {
return fmt.Errorf("%s: EPP component must have exactly 1 replica (found %d replicas)", v.fieldPath, *v.spec.Replicas)
}
// EPP components MUST have EPPConfig
if v.spec.EPPConfig == nil {
return fmt.Errorf("%s.eppConfig is required for EPP components", v.fieldPath)
}
// Either ConfigMapRef or Config must be specified (no default)
if v.spec.EPPConfig.ConfigMapRef == nil && v.spec.EPPConfig.Config == nil {
return fmt.Errorf("%s.eppConfig: either configMapRef or config must be specified (no default configuration provided)", v.fieldPath)
}
// ConfigMapRef and Config are mutually exclusive
if v.spec.EPPConfig.ConfigMapRef != nil && v.spec.EPPConfig.Config != nil {
return fmt.Errorf("%s.eppConfig: configMapRef and config are mutually exclusive, only one can be specified", v.fieldPath)
}
// If ConfigMapRef is provided, validate it
if v.spec.EPPConfig.ConfigMapRef != nil {
if v.spec.EPPConfig.ConfigMapRef.Name == "" {
return fmt.Errorf("%s.eppConfig.configMapRef.name is required", v.fieldPath)
}
}
return nil
}
// checkInferencePoolAPIAvailability checks if the inference.networking.k8s.io API group is available in the cluster.
// Returns an error if the API group is not available, which prevents EPP deployment.
// This reuses the controller_common.DetectInferencePoolAvailability function.
func (v *SharedSpecValidator) checkInferencePoolAPIAvailability(ctx context.Context) error {
if v.mgr == nil {
// No manager provided, skip the check (e.g., in controller without webhooks)
return nil
}
if !controllercommon.DetectInferencePoolAvailability(ctx, v.mgr) {
return fmt.Errorf(
"InferencePool API group (%s) is not available in the cluster. "+
"EPP requires the Gateway API Inference Extension to be installed. "+
"Please install the Gateway API Inference Extension before deploying EPP components",
epp.InferencePoolGroup)
}
return nil
}
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
package validation package validation
import ( import (
"context"
"testing" "testing"
nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1" nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
...@@ -219,7 +220,7 @@ func TestSharedSpecValidator_Validate(t *testing.T) { ...@@ -219,7 +220,7 @@ func TestSharedSpecValidator_Validate(t *testing.T) {
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
validator := NewSharedSpecValidator(tt.spec, tt.fieldPath, tt.calculatedNamespace) validator := NewSharedSpecValidator(tt.spec, tt.fieldPath, tt.calculatedNamespace)
_, err := validator.Validate() _, err := validator.Validate(context.Background())
if (err != nil) != tt.wantErr { if (err != nil) != tt.wantErr {
t.Errorf("SharedSpecValidator.Validate() error = %v, wantErr %v", err, tt.wantErr) t.Errorf("SharedSpecValidator.Validate() error = %v, wantErr %v", err, tt.wantErr)
...@@ -284,7 +285,7 @@ func TestSharedSpecValidator_Validate_Warnings(t *testing.T) { ...@@ -284,7 +285,7 @@ func TestSharedSpecValidator_Validate_Warnings(t *testing.T) {
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
validator := NewSharedSpecValidator(tt.spec, tt.fieldPath, tt.calculatedNamespace) validator := NewSharedSpecValidator(tt.spec, tt.fieldPath, tt.calculatedNamespace)
warnings, err := validator.Validate() warnings, err := validator.Validate(context.Background())
if err != nil { if err != nil {
t.Errorf("SharedSpecValidator.Validate() unexpected error = %v", err) t.Errorf("SharedSpecValidator.Validate() unexpected error = %v", err)
......
...@@ -202,6 +202,7 @@ _Appears in:_ ...@@ -202,6 +202,7 @@ _Appears in:_
| `replicas` _integer_ | Replicas is the desired number of Pods for this component.<br />When scalingAdapter is enabled, this field is managed by the<br />DynamoGraphDeploymentScalingAdapter and should not be modified directly. | | Minimum: 0 <br /> | | `replicas` _integer_ | Replicas is the desired number of Pods for this component.<br />When scalingAdapter is enabled, this field is managed by the<br />DynamoGraphDeploymentScalingAdapter and should not be modified directly. | | Minimum: 0 <br /> |
| `multinode` _[MultinodeSpec](#multinodespec)_ | Multinode is the configuration for multinode components. | | | | `multinode` _[MultinodeSpec](#multinodespec)_ | Multinode is the configuration for multinode components. | | |
| `scalingAdapter` _[ScalingAdapter](#scalingadapter)_ | ScalingAdapter configures whether this service uses the DynamoGraphDeploymentScalingAdapter.<br />When enabled, replicas are managed via DGDSA and external autoscalers can scale<br />the service using the Scale subresource. When disabled, replicas can be modified directly. | | | | `scalingAdapter` _[ScalingAdapter](#scalingadapter)_ | ScalingAdapter configures whether this service uses the DynamoGraphDeploymentScalingAdapter.<br />When enabled, replicas are managed via DGDSA and external autoscalers can scale<br />the service using the Scale subresource. When disabled, replicas can be modified directly. | | |
| `eppConfig` _[EPPConfig](#eppconfig)_ | EPPConfig defines EPP-specific configuration options for Endpoint Picker Plugin components.<br />Only applicable when ComponentType is "epp". | | |
#### DynamoComponentDeploymentSpec #### DynamoComponentDeploymentSpec
...@@ -240,6 +241,7 @@ _Appears in:_ ...@@ -240,6 +241,7 @@ _Appears in:_
| `replicas` _integer_ | Replicas is the desired number of Pods for this component.<br />When scalingAdapter is enabled, this field is managed by the<br />DynamoGraphDeploymentScalingAdapter and should not be modified directly. | | Minimum: 0 <br /> | | `replicas` _integer_ | Replicas is the desired number of Pods for this component.<br />When scalingAdapter is enabled, this field is managed by the<br />DynamoGraphDeploymentScalingAdapter and should not be modified directly. | | Minimum: 0 <br /> |
| `multinode` _[MultinodeSpec](#multinodespec)_ | Multinode is the configuration for multinode components. | | | | `multinode` _[MultinodeSpec](#multinodespec)_ | Multinode is the configuration for multinode components. | | |
| `scalingAdapter` _[ScalingAdapter](#scalingadapter)_ | ScalingAdapter configures whether this service uses the DynamoGraphDeploymentScalingAdapter.<br />When enabled, replicas are managed via DGDSA and external autoscalers can scale<br />the service using the Scale subresource. When disabled, replicas can be modified directly. | | | | `scalingAdapter` _[ScalingAdapter](#scalingadapter)_ | ScalingAdapter configures whether this service uses the DynamoGraphDeploymentScalingAdapter.<br />When enabled, replicas are managed via DGDSA and external autoscalers can scale<br />the service using the Scale subresource. When disabled, replicas can be modified directly. | | |
| `eppConfig` _[EPPConfig](#eppconfig)_ | EPPConfig defines EPP-specific configuration options for Endpoint Picker Plugin components.<br />Only applicable when ComponentType is "epp". | | |
#### DynamoGraphDeployment #### DynamoGraphDeployment
...@@ -513,6 +515,25 @@ _Appears in:_ ...@@ -513,6 +515,25 @@ _Appears in:_
| `conditions` _[Condition](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.28/#condition-v1-meta) array_ | Conditions represents the latest available observations of the model's state | | | | `conditions` _[Condition](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.28/#condition-v1-meta) array_ | Conditions represents the latest available observations of the model's state | | |
#### EPPConfig
EPPConfig contains configuration for EPP (Endpoint Picker Plugin) components.
EPP is responsible for intelligent endpoint selection and KV-aware routing.
_Appears in:_
- [DynamoComponentDeploymentSharedSpec](#dynamocomponentdeploymentsharedspec)
- [DynamoComponentDeploymentSpec](#dynamocomponentdeploymentspec)
| Field | Description | Default | Validation |
| --- | --- | --- | --- |
| `configMapRef` _[ConfigMapKeySelector](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.28/#configmapkeyselector-v1-core)_ | ConfigMapRef references a user-provided ConfigMap containing EPP configuration.<br />The ConfigMap should contain EndpointPickerConfig YAML.<br />Mutually exclusive with Config. | | |
| `config` _[EndpointPickerConfig](#endpointpickerconfig)_ | Config allows specifying EPP EndpointPickerConfig directly as a structured object.<br />The operator will marshal this to YAML and create a ConfigMap automatically.<br />Mutually exclusive with ConfigMapRef.<br />One of ConfigMapRef or Config must be specified (no default configuration).<br />Uses the upstream type from github.com/kubernetes-sigs/gateway-api-inference-extension | | Type: object <br /> |
#### EndpointInfo #### EndpointInfo
......
...@@ -8,6 +8,7 @@ metadata: ...@@ -8,6 +8,7 @@ metadata:
spec: spec:
services: services:
Frontend: Frontend:
envFromSecret: hf-token-secret
componentType: frontend componentType: frontend
replicas: 1 replicas: 1
extraPodSpec: extraPodSpec:
......
...@@ -57,6 +57,18 @@ pub enum DynamoLlmResult { ...@@ -57,6 +57,18 @@ pub enum DynamoLlmResult {
ERR = 1, ERR = 1,
} }
/// Default timeout for discovery sync (seconds).
const DEFAULT_DISCOVERY_TIMEOUT_SEC: u64 = 10;
/// Get discovery timeout from environment variable or use default.
/// Reads DYN_DISCOVERY_TIMEOUT_SEC env var (in seconds).
fn get_discovery_timeout_secs() -> u64 {
std::env::var("DYN_DISCOVERY_TIMEOUT_SEC")
.ok()
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(DEFAULT_DISCOVERY_TIMEOUT_SEC)
}
/// Wait for the discovery daemon to sync and return at least one instance. /// Wait for the discovery daemon to sync and return at least one instance.
/// This ensures list() calls will have data available. /// This ensures list() calls will have data available.
/// Returns the number of instances found, or 0 if timed out. /// Returns the number of instances found, or 0 if timed out.
...@@ -120,7 +132,8 @@ pub unsafe extern "C" fn dynamo_llm_init( ...@@ -120,7 +132,8 @@ pub unsafe extern "C" fn dynamo_llm_init(
// This is needed because dynamo_create_worker_selection_pipeline() is called // This is needed because dynamo_create_worker_selection_pipeline() is called
// immediately after, and it needs discovery.list() to return data // immediately after, and it needs discovery.list() to return data
// the discovery daemon takes time to query K8s and returns async, so we need to wait. // the discovery daemon takes time to query K8s and returns async, so we need to wait.
let instance_count = wait_for_discovery_sync(drt, 10).await; let timeout_secs = get_discovery_timeout_secs();
let instance_count = wait_for_discovery_sync(drt, timeout_secs).await;
if instance_count == 0 { if instance_count == 0 {
tracing::error!( tracing::error!(
"Discovery sync failed: no worker instances found. Is the backend running?" "Discovery sync failed: no worker instances found. Is the backend running?"
...@@ -1361,7 +1374,8 @@ pub async fn create_worker_selection_pipeline_chat( ...@@ -1361,7 +1374,8 @@ pub async fn create_worker_selection_pipeline_chat(
// Only wait for discovery sync if we just initialized the DRT // Only wait for discovery sync if we just initialized the DRT
// (dynamo_llm_init already does this when it initializes) // (dynamo_llm_init already does this when it initializes)
if needs_sync { if needs_sync {
let instance_count = wait_for_discovery_sync(distributed_runtime, 10).await; let timeout_secs = get_discovery_timeout_secs();
let instance_count = wait_for_discovery_sync(distributed_runtime, timeout_secs).await;
if instance_count == 0 { if instance_count == 0 {
return Err(anyhow::anyhow!( return Err(anyhow::anyhow!(
"Discovery sync failed: no worker instances found. Is the backend running?" "Discovery sync failed: no worker instances found. Is the backend running?"
......
...@@ -84,9 +84,9 @@ spec: ...@@ -84,9 +84,9 @@ spec:
value: "nats://dynamo-platform-nats.$(PLATFORM_NAMESPACE):4222" # update dynamo-platform to appropriate namespace value: "nats://dynamo-platform-nats.$(PLATFORM_NAMESPACE):4222" # update dynamo-platform to appropriate namespace
- name: DYNAMO_NAMESPACE - name: DYNAMO_NAMESPACE
value: "$(POD_NAMESPACE)-llama3-70b-agg" value: "$(POD_NAMESPACE)-llama3-70b-agg"
- name: DYNAMO_MODEL - name: DYN_MODEL
value: "RedHatAI/Llama-3.3-70B-Instruct-FP8-dynamic" value: "RedHatAI/Llama-3.3-70B-Instruct-FP8-dynamic"
- name: DYNAMO_KV_BLOCK_SIZE - name: DYN_KV_BLOCK_SIZE
value: "128" # UPDATE to match the --block-size in your deploy.yaml engine command value: "128" # UPDATE to match the --block-size in your deploy.yaml engine command
- name: USE_STREAMING - name: USE_STREAMING
value: "true" value: "true"
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment