Unverified Commit 9e6972a5 authored by julienmancuso's avatar julienmancuso Committed by GitHub
Browse files

feat: Auto-inject kai-scheduler annotations and label (#2748)


Signed-off-by: default avatarJulien Mancuso <jmancuso@nvidia.com>
parent 700d345d
...@@ -137,6 +137,13 @@ rules: ...@@ -137,6 +137,13 @@ rules:
- get - get
- patch - patch
- update - update
- apiGroups:
- scheduling.run.ai
resources:
- queues
verbs:
- get
- list
- apiGroups: - apiGroups:
- apps - apps
resources: resources:
...@@ -475,6 +482,45 @@ roleRef: ...@@ -475,6 +482,45 @@ roleRef:
{{- end }} {{- end }}
name: '{{ include "dynamo-operator.fullname" . }}-manager-role' name: '{{ include "dynamo-operator.fullname" . }}-manager-role'
subjects: subjects:
- kind: ServiceAccount
name: '{{ include "dynamo-operator.fullname" . }}-controller-manager'
namespace: '{{ .Release.Namespace }}'
---
# ClusterRole for kai-scheduler queue access
# This is always a ClusterRole since Queue resources are cluster-scoped
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: {{ include "dynamo-operator.fullname" . }}-queue-reader
labels:
app.kubernetes.io/component: rbac
app.kubernetes.io/created-by: dynamo-operator
app.kubernetes.io/part-of: dynamo-operator
{{- include "dynamo-operator.labels" . | nindent 4 }}
rules:
- apiGroups:
- scheduling.run.ai
resources:
- queues
verbs:
- get
- list
---
# ClusterRoleBinding for kai-scheduler queue access
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: {{ include "dynamo-operator.fullname" . }}-queue-reader-binding
labels:
app.kubernetes.io/component: rbac
app.kubernetes.io/created-by: dynamo-operator
app.kubernetes.io/part-of: dynamo-operator
{{- include "dynamo-operator.labels" . | nindent 4 }}
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: {{ include "dynamo-operator.fullname" . }}-queue-reader
subjects:
- kind: ServiceAccount - kind: ServiceAccount
name: '{{ include "dynamo-operator.fullname" . }}-controller-manager' name: '{{ include "dynamo-operator.fullname" . }}-controller-manager'
namespace: '{{ .Release.Namespace }}' namespace: '{{ .Release.Namespace }}'
\ No newline at end of file
...@@ -172,6 +172,9 @@ func main() { ...@@ -172,6 +172,9 @@ func main() {
LWS: commonController.LWSConfig{ LWS: commonController.LWSConfig{
Enabled: false, // Will be set after LWS discovery Enabled: false, // Will be set after LWS discovery
}, },
KaiScheduler: commonController.KaiSchedulerConfig{
Enabled: false, // Will be set after Kai-scheduler discovery
},
EtcdAddress: etcdAddr, EtcdAddress: etcdAddr,
NatsAddress: natsAddr, NatsAddress: natsAddr,
IngressConfig: commonController.IngressConfig{ IngressConfig: commonController.IngressConfig{
...@@ -247,6 +250,11 @@ func main() { ...@@ -247,6 +250,11 @@ func main() {
lwsEnabled := commonController.DetectLWSAvailability(mainCtx, mgr) lwsEnabled := commonController.DetectLWSAvailability(mainCtx, mgr)
ctrlConfig.LWS.Enabled = lwsEnabled ctrlConfig.LWS.Enabled = lwsEnabled
// Detect Kai-scheduler availability using discovery client
setupLog.Info("Detecting Kai-scheduler availability...")
kaiSchedulerEnabled := commonController.DetectKaiSchedulerAvailability(mainCtx, mgr)
ctrlConfig.KaiScheduler.Enabled = kaiSchedulerEnabled
// Create etcd client // Create etcd client
cli, err := clientv3.New(clientv3.Config{ cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{etcdAddr}, Endpoints: []string{etcdAddr},
......
...@@ -185,6 +185,13 @@ rules: ...@@ -185,6 +185,13 @@ rules:
- get - get
- patch - patch
- update - update
- apiGroups:
- scheduling.run.ai
resources:
- queues
verbs:
- get
- list
- apiGroups: - apiGroups:
- scheduling.volcano.sh - scheduling.volcano.sh
resources: resources:
......
package consts package consts
import "time" import (
"time"
"k8s.io/apimachinery/pkg/runtime/schema"
)
const ( const (
HPACPUDefaultAverageUtilization = 80 HPACPUDefaultAverageUtilization = 80
...@@ -55,6 +59,12 @@ const ( ...@@ -55,6 +59,12 @@ const (
DefaultSharedMemoryMountPath = "/dev/shm" DefaultSharedMemoryMountPath = "/dev/shm"
DefaultSharedMemorySize = "8Gi" DefaultSharedMemorySize = "8Gi"
// Kai-scheduler related constants
KubeAnnotationKaiSchedulerQueue = "nvidia.com/kai-scheduler-queue" // User-provided annotation to specify queue name
KubeLabelKaiSchedulerQueue = "kai.scheduler/queue" // Label injected into pods for kai-scheduler
KaiSchedulerName = "kai-scheduler" // Scheduler name for kai-scheduler
DefaultKaiSchedulerQueue = "dynamo" // Default queue name when none specified
// Grove multinode role suffixes // Grove multinode role suffixes
GroveRoleSuffixLeader = "ldr" GroveRoleSuffixLeader = "ldr"
GroveRoleSuffixWorker = "wkr" GroveRoleSuffixWorker = "wkr"
...@@ -68,3 +78,25 @@ const ( ...@@ -68,3 +78,25 @@ const (
MultinodeDeploymentTypeGrove MultinodeDeploymentType = "grove" MultinodeDeploymentTypeGrove MultinodeDeploymentType = "grove"
MultinodeDeploymentTypeLWS MultinodeDeploymentType = "lws" MultinodeDeploymentTypeLWS MultinodeDeploymentType = "lws"
) )
// GroupVersionResources for external APIs
var (
// Grove GroupVersionResources for scaling operations
PodCliqueGVR = schema.GroupVersionResource{
Group: "grove.io",
Version: "v1alpha1",
Resource: "podcliques",
}
PodCliqueScalingGroupGVR = schema.GroupVersionResource{
Group: "grove.io",
Version: "v1alpha1",
Resource: "podcliquescalinggroups",
}
// KAI-Scheduler GroupVersionResource for queue validation
QueueGVR = schema.GroupVersionResource{
Group: "scheduling.run.ai",
Version: "v2",
Resource: "queues",
}
)
...@@ -55,20 +55,6 @@ const ( ...@@ -55,20 +55,6 @@ const (
PendingState State = "pending" PendingState State = "pending"
) )
var (
// Grove GroupVersionResources for scaling operations
podCliqueGVR = schema.GroupVersionResource{
Group: "grove.io",
Version: "v1alpha1",
Resource: "podcliques",
}
podCliqueScalingGroupGVR = schema.GroupVersionResource{
Group: "grove.io",
Version: "v1alpha1",
Resource: "podcliquescalinggroups",
}
)
type etcdStorage interface { type etcdStorage interface {
DeleteKeys(ctx context.Context, prefix string) error DeleteKeys(ctx context.Context, prefix string) error
} }
...@@ -88,6 +74,7 @@ type DynamoGraphDeploymentReconciler struct { ...@@ -88,6 +74,7 @@ type DynamoGraphDeploymentReconciler struct {
// +kubebuilder:rbac:groups=grove.io,resources=podgangsets,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=grove.io,resources=podgangsets,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=grove.io,resources=podcliques/scale,verbs=get;update;patch // +kubebuilder:rbac:groups=grove.io,resources=podcliques/scale,verbs=get;update;patch
// +kubebuilder:rbac:groups=grove.io,resources=podcliquescalinggroups/scale,verbs=get;update;patch // +kubebuilder:rbac:groups=grove.io,resources=podcliquescalinggroups/scale,verbs=get;update;patch
// +kubebuilder:rbac:groups=scheduling.run.ai,resources=queues,verbs=get;list
// Reconcile is part of the main kubernetes reconciliation loop which aims to // Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state. // move the current state of the cluster closer to the desired state.
...@@ -201,9 +188,9 @@ func (r *DynamoGraphDeploymentReconciler) scaleGroveResource(ctx context.Context ...@@ -201,9 +188,9 @@ func (r *DynamoGraphDeploymentReconciler) scaleGroveResource(ctx context.Context
var gvr schema.GroupVersionResource var gvr schema.GroupVersionResource
switch resourceType { switch resourceType {
case "PodClique": case "PodClique":
gvr = podCliqueGVR gvr = consts.PodCliqueGVR
case "PodCliqueScalingGroup": case "PodCliqueScalingGroup":
gvr = podCliqueScalingGroupGVR gvr = consts.PodCliqueScalingGroupGVR
default: default:
return fmt.Errorf("unsupported Grove resource type: %s", resourceType) return fmt.Errorf("unsupported Grove resource type: %s", resourceType)
} }
......
...@@ -42,11 +42,17 @@ type LWSConfig struct { ...@@ -42,11 +42,17 @@ type LWSConfig struct {
Enabled bool Enabled bool
} }
type KaiSchedulerConfig struct {
// Enabled is automatically determined by checking if Kai-scheduler CRDs are installed in the cluster
Enabled bool
}
type Config struct { type Config struct {
// Enable resources filtering, only the resources belonging to the given namespace will be handled. // Enable resources filtering, only the resources belonging to the given namespace will be handled.
RestrictedNamespace string RestrictedNamespace string
Grove GroveConfig Grove GroveConfig
LWS LWSConfig LWS LWSConfig
KaiScheduler KaiSchedulerConfig
EtcdAddress string EtcdAddress string
NatsAddress string NatsAddress string
IngressConfig IngressConfig IngressConfig IngressConfig
...@@ -75,6 +81,12 @@ func DetectLWSAvailability(ctx context.Context, mgr ctrl.Manager) bool { ...@@ -75,6 +81,12 @@ func DetectLWSAvailability(ctx context.Context, mgr ctrl.Manager) bool {
return detectAPIGroupAvailability(ctx, mgr, "leaderworkerset.x-k8s.io") return detectAPIGroupAvailability(ctx, mgr, "leaderworkerset.x-k8s.io")
} }
// DetectKaiSchedulerAvailability checks if Kai-scheduler is available by checking if the scheduling.run.ai API group is registered
// This approach uses the discovery client which is simpler and more reliable
func DetectKaiSchedulerAvailability(ctx context.Context, mgr ctrl.Manager) bool {
return detectAPIGroupAvailability(ctx, mgr, "scheduling.run.ai")
}
// detectAPIGroupAvailability checks if a specific API group is registered in the cluster // detectAPIGroupAvailability checks if a specific API group is registered in the cluster
func detectAPIGroupAvailability(ctx context.Context, mgr ctrl.Manager, groupName string) bool { func detectAPIGroupAvailability(ctx context.Context, mgr ctrl.Manager, groupName string) bool {
logger := log.FromContext(ctx) logger := log.FromContext(ctx)
...@@ -107,6 +119,7 @@ func detectAPIGroupAvailability(ctx context.Context, mgr ctrl.Manager, groupName ...@@ -107,6 +119,7 @@ func detectAPIGroupAvailability(ctx context.Context, mgr ctrl.Manager, groupName
logger.Info("API group not available", "group", groupName) logger.Info("API group not available", "group", groupName)
return false return false
} }
func EphemeralDeploymentEventFilter(config Config) predicate.Predicate { func EphemeralDeploymentEventFilter(config Config) predicate.Predicate {
return predicate.NewPredicateFuncs(func(o client.Object) bool { return predicate.NewPredicateFuncs(func(o client.Object) bool {
l := log.FromContext(context.Background()) l := log.FromContext(context.Background())
......
...@@ -882,6 +882,17 @@ func GenerateGrovePodGangSet( ...@@ -882,6 +882,17 @@ func GenerateGrovePodGangSet(
if controllerConfig.Grove.TerminationDelay > 0 { if controllerConfig.Grove.TerminationDelay > 0 {
gangSet.Spec.Template.TerminationDelay = &metav1.Duration{Duration: controllerConfig.Grove.TerminationDelay} gangSet.Spec.Template.TerminationDelay = &metav1.Duration{Duration: controllerConfig.Grove.TerminationDelay}
} }
// Validate kai-scheduler queue once if kai-scheduler is enabled
var validatedQueueName string
if controllerConfig.Grove.Enabled && controllerConfig.KaiScheduler.Enabled {
var err error
validatedQueueName, err = DetermineKaiSchedulerQueue(ctx, dynamoDeployment.Annotations)
if err != nil {
return nil, fmt.Errorf("failed to determine kai-scheduler queue: %w", err)
}
}
dynamoNamespace, err := getDynamoNamespace(dynamoDeployment) dynamoNamespace, err := getDynamoNamespace(dynamoDeployment)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to get the graph dynamo namespace: %w", err) return nil, fmt.Errorf("failed to get the graph dynamo namespace: %w", err)
...@@ -935,6 +946,10 @@ func GenerateGrovePodGangSet( ...@@ -935,6 +946,10 @@ func GenerateGrovePodGangSet(
return nil, fmt.Errorf("failed to generate annotations: %w", err) return nil, fmt.Errorf("failed to generate annotations: %w", err)
} }
clique.Annotations = annotations clique.Annotations = annotations
// Inject kai-scheduler settings if enabled
injectKaiSchedulerIfEnabled(clique, controllerConfig, validatedQueueName)
gangSet.Spec.Template.Cliques = append(gangSet.Spec.Template.Cliques, clique) gangSet.Spec.Template.Cliques = append(gangSet.Spec.Template.Cliques, clique)
cliqueNames = append(cliqueNames, strings.ToLower(r.Name)) cliqueNames = append(cliqueNames, strings.ToLower(r.Name))
} }
......
...@@ -14,6 +14,10 @@ import ( ...@@ -14,6 +14,10 @@ import (
nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/v1alpha1" nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/v1alpha1"
commonconsts "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/consts" commonconsts "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/consts"
"github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/controller_common"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/dynamic"
ctrl "sigs.k8s.io/controller-runtime"
) )
type GroveMultinodeDeployer struct { type GroveMultinodeDeployer struct {
...@@ -130,3 +134,95 @@ func checkPCSGReady(ctx context.Context, client client.Client, resourceName, nam ...@@ -130,3 +134,95 @@ func checkPCSGReady(ctx context.Context, client client.Client, resourceName, nam
return true, "" return true, ""
} }
// resolveKaiSchedulerQueueName extracts the queue name from annotations or returns default
// This is the shared logic between DetermineKaiSchedulerQueue and ResolveKaiSchedulerQueue
func resolveKaiSchedulerQueueName(annotations map[string]string) string {
queueName := commonconsts.DefaultKaiSchedulerQueue
if annotations != nil {
if annotationQueue, exists := annotations[commonconsts.KubeAnnotationKaiSchedulerQueue]; exists && strings.TrimSpace(annotationQueue) != "" {
queueName = strings.TrimSpace(annotationQueue)
}
}
return queueName
}
// ensureQueueExists validates that a Queue resource with the given name exists in the cluster
// Returns an error if the queue doesn't exist or if validation fails
func ensureQueueExists(ctx context.Context, dynamicClient dynamic.Interface, queueName string) error {
logger := log.FromContext(ctx)
// Try to get the queue resource using the predefined GVR
_, err := dynamicClient.Resource(commonconsts.QueueGVR).Get(ctx, queueName, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
logger.Error(err, "Queue not found", "queueName", queueName)
return fmt.Errorf("queue '%s' not found in cluster. Ensure the queue exists before using kai-scheduler", queueName)
}
logger.Error(err, "Failed to validate queue", "queueName", queueName)
return fmt.Errorf("failed to validate queue '%s': %w", queueName, err)
}
logger.Info("Queue validation successful", "queueName", queueName)
return nil
}
// DetermineKaiSchedulerQueue determines the queue name for kai-scheduler from deployment annotations or returns default
// Also validates that the queue exists in the cluster
func DetermineKaiSchedulerQueue(ctx context.Context, annotations map[string]string) (string, error) {
// Get the queue name from annotation or use default
queueName := resolveKaiSchedulerQueueName(annotations)
// Create a dynamic client for CRD validation (Queue CRD might not be in the standard client scheme)
cfg, err := ctrl.GetConfig()
if err != nil {
return "", fmt.Errorf("failed to get kubernetes config for queue validation: %w", err)
}
dynamicClient, err := dynamic.NewForConfig(cfg)
if err != nil {
return "", fmt.Errorf("failed to create dynamic client for queue validation: %w", err)
}
// Validate that the queue exists
if err := ensureQueueExists(ctx, dynamicClient, queueName); err != nil {
return "", fmt.Errorf("kai-scheduler queue validation failed: %w", err)
}
return queueName, nil
}
// ResolveKaiSchedulerQueue determines the queue name for kai-scheduler from deployment annotations or returns default
// Does NOT validate - use DetermineKaiSchedulerQueue for validation
func ResolveKaiSchedulerQueue(annotations map[string]string) string {
return resolveKaiSchedulerQueueName(annotations)
}
// injectKaiSchedulerIfEnabled injects kai-scheduler settings into a clique if kai-scheduler is enabled and grove is enabled
func injectKaiSchedulerIfEnabled(
clique *grovev1alpha1.PodCliqueTemplateSpec,
controllerConfig controller_common.Config,
validatedQueueName string,
) {
// Only proceed if grove is enabled, kai-scheduler is enabled, and no manual schedulerName is set
if !controllerConfig.Grove.Enabled || !controllerConfig.KaiScheduler.Enabled {
return
}
// Check if user has manually set schedulerName - if so, respect their choice
if clique.Spec.PodSpec.SchedulerName != "" && clique.Spec.PodSpec.SchedulerName != commonconsts.KaiSchedulerName {
return
}
// Use the pre-validated queue name
queueName := validatedQueueName
// Inject schedulerName
clique.Spec.PodSpec.SchedulerName = commonconsts.KaiSchedulerName
// Inject queue label
if clique.Labels == nil {
clique.Labels = make(map[string]string)
}
clique.Labels[commonconsts.KubeLabelKaiSchedulerQueue] = queueName
}
package dynamo
import (
"context"
"strings"
"testing"
grovev1alpha1 "github.com/NVIDIA/grove/operator/api/core/v1alpha1"
commonconsts "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/consts"
"github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/controller_common"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
dynamicfake "k8s.io/client-go/dynamic/fake"
)
func TestResolveKaiSchedulerQueueName(t *testing.T) {
tests := []struct {
name string
annotations map[string]string
expected string
}{
{
name: "nil annotations",
annotations: nil,
expected: commonconsts.DefaultKaiSchedulerQueue,
},
{
name: "empty annotations",
annotations: map[string]string{},
expected: commonconsts.DefaultKaiSchedulerQueue,
},
{
name: "no kai-scheduler annotation",
annotations: map[string]string{
"other-annotation": "value",
},
expected: commonconsts.DefaultKaiSchedulerQueue,
},
{
name: "empty kai-scheduler annotation",
annotations: map[string]string{
commonconsts.KubeAnnotationKaiSchedulerQueue: "",
},
expected: commonconsts.DefaultKaiSchedulerQueue,
},
{
name: "custom queue name",
annotations: map[string]string{
commonconsts.KubeAnnotationKaiSchedulerQueue: "custom-queue",
},
expected: "custom-queue",
},
{
name: "whitespace is trimmed",
annotations: map[string]string{
commonconsts.KubeAnnotationKaiSchedulerQueue: " custom-queue ",
},
expected: "custom-queue",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := resolveKaiSchedulerQueueName(tt.annotations)
if result != tt.expected {
t.Errorf("resolveKaiSchedulerQueueName() = %v, expected %v", result, tt.expected)
}
})
}
}
func TestResolveKaiSchedulerQueue(t *testing.T) {
tests := []struct {
name string
annotations map[string]string
expected string
}{
{
name: "default queue",
annotations: nil,
expected: commonconsts.DefaultKaiSchedulerQueue,
},
{
name: "custom queue",
annotations: map[string]string{
commonconsts.KubeAnnotationKaiSchedulerQueue: "my-queue",
},
expected: "my-queue",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := ResolveKaiSchedulerQueue(tt.annotations)
if result != tt.expected {
t.Errorf("ResolveKaiSchedulerQueue() = %v, expected %v", result, tt.expected)
}
})
}
}
func TestInjectKaiSchedulerIfEnabled(t *testing.T) {
tests := []struct {
name string
controllerConfig controller_common.Config
validatedQueueName string
initialClique *grovev1alpha1.PodCliqueTemplateSpec
expectedScheduler string
expectedQueueLabel string
shouldInject bool
}{
{
name: "grove disabled - no injection",
controllerConfig: controller_common.Config{
Grove: controller_common.GroveConfig{Enabled: false},
KaiScheduler: controller_common.KaiSchedulerConfig{Enabled: true},
},
validatedQueueName: "test-queue",
initialClique: &grovev1alpha1.PodCliqueTemplateSpec{
Spec: grovev1alpha1.PodCliqueSpec{
PodSpec: corev1.PodSpec{},
},
},
shouldInject: false,
},
{
name: "kai-scheduler disabled - no injection",
controllerConfig: controller_common.Config{
Grove: controller_common.GroveConfig{Enabled: true},
KaiScheduler: controller_common.KaiSchedulerConfig{Enabled: false},
},
validatedQueueName: "test-queue",
initialClique: &grovev1alpha1.PodCliqueTemplateSpec{
Spec: grovev1alpha1.PodCliqueSpec{
PodSpec: corev1.PodSpec{},
},
},
shouldInject: false,
},
{
name: "manual scheduler set - no injection",
controllerConfig: controller_common.Config{
Grove: controller_common.GroveConfig{Enabled: true},
KaiScheduler: controller_common.KaiSchedulerConfig{Enabled: true},
},
validatedQueueName: "test-queue",
initialClique: &grovev1alpha1.PodCliqueTemplateSpec{
Spec: grovev1alpha1.PodCliqueSpec{
PodSpec: corev1.PodSpec{
SchedulerName: "manual-scheduler",
},
},
},
shouldInject: false,
},
{
name: "both enabled, no manual scheduler - inject",
controllerConfig: controller_common.Config{
Grove: controller_common.GroveConfig{Enabled: true},
KaiScheduler: controller_common.KaiSchedulerConfig{Enabled: true},
},
validatedQueueName: "test-queue",
initialClique: &grovev1alpha1.PodCliqueTemplateSpec{
Spec: grovev1alpha1.PodCliqueSpec{
PodSpec: corev1.PodSpec{},
},
},
expectedScheduler: commonconsts.KaiSchedulerName,
expectedQueueLabel: "test-queue",
shouldInject: true,
},
{
name: "inject with existing labels",
controllerConfig: controller_common.Config{
Grove: controller_common.GroveConfig{Enabled: true},
KaiScheduler: controller_common.KaiSchedulerConfig{Enabled: true},
},
validatedQueueName: "custom-queue",
initialClique: &grovev1alpha1.PodCliqueTemplateSpec{
Labels: map[string]string{
"existing-label": "existing-value",
},
Spec: grovev1alpha1.PodCliqueSpec{
PodSpec: corev1.PodSpec{},
},
},
expectedScheduler: commonconsts.KaiSchedulerName,
expectedQueueLabel: "custom-queue",
shouldInject: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Make a deep copy to avoid modifying the test case
clique := tt.initialClique.DeepCopy()
// Call the function
injectKaiSchedulerIfEnabled(clique, tt.controllerConfig, tt.validatedQueueName)
if tt.shouldInject {
// Verify scheduler name is injected
if clique.Spec.PodSpec.SchedulerName != tt.expectedScheduler {
t.Errorf("expected schedulerName %v, got %v", tt.expectedScheduler, clique.Spec.PodSpec.SchedulerName)
}
// Verify queue label is injected
if clique.Labels == nil {
t.Errorf("expected labels to be set, got nil")
} else {
queueLabel := clique.Labels[commonconsts.KubeLabelKaiSchedulerQueue]
if queueLabel != tt.expectedQueueLabel {
t.Errorf("expected queue label %v, got %v", tt.expectedQueueLabel, queueLabel)
}
}
// Verify existing labels are preserved
if tt.initialClique.Labels != nil {
for key, value := range tt.initialClique.Labels {
if clique.Labels[key] != value {
t.Errorf("existing label %s=%s was not preserved, got %s", key, value, clique.Labels[key])
}
}
}
} else {
// Verify no injection occurred
if clique.Spec.PodSpec.SchedulerName != tt.initialClique.Spec.PodSpec.SchedulerName {
t.Errorf("schedulerName should not have changed, expected %v, got %v",
tt.initialClique.Spec.PodSpec.SchedulerName, clique.Spec.PodSpec.SchedulerName)
}
// Verify queue label was not added (unless it existed before)
if tt.initialClique.Labels == nil || tt.initialClique.Labels[commonconsts.KubeLabelKaiSchedulerQueue] == "" {
if clique.Labels != nil && clique.Labels[commonconsts.KubeLabelKaiSchedulerQueue] != "" {
t.Errorf("queue label should not have been added")
}
}
}
})
}
}
func TestEnsureQueueExists(t *testing.T) {
tests := []struct {
name string
queueName string
setupQueue bool
expectedError bool
errorContains string
}{
{
name: "queue exists",
queueName: "existing-queue",
setupQueue: true,
expectedError: false,
},
{
name: "queue does not exist",
queueName: "missing-queue",
setupQueue: false,
expectedError: true,
errorContains: "not found in cluster",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Create a fake dynamic client
dynamicScheme := runtime.NewScheme()
fakeDynamic := dynamicfake.NewSimpleDynamicClient(dynamicScheme)
if tt.setupQueue {
// Create a fake queue resource
queueGVR := schema.GroupVersionResource{
Group: "scheduling.run.ai",
Version: "v2",
Resource: "queues",
}
queue := &unstructured.Unstructured{}
queue.SetAPIVersion("scheduling.run.ai/v2")
queue.SetKind("Queue")
queue.SetName(tt.queueName)
_, err := fakeDynamic.Resource(queueGVR).Create(context.Background(), queue, metav1.CreateOptions{})
if err != nil {
t.Fatalf("failed to create fake queue: %v", err)
}
}
// This test is limited because we can't easily mock the dynamic client creation
// In a real test environment, you would set up a proper test cluster or use envtest
err := ensureQueueExists(context.Background(), fakeDynamic, tt.queueName)
if tt.expectedError {
if err == nil {
t.Errorf("expected error but got none")
} else if tt.errorContains != "" && !strings.Contains(err.Error(), tt.errorContains) {
t.Errorf("expected error to contain %q, got %v", tt.errorContains, err)
}
} else {
// We expect an error here because we can't properly mock the dynamic client
// In a real test, this would work with proper test setup
if err == nil {
t.Logf("Queue validation passed (this is expected in unit tests)")
}
}
})
}
}
...@@ -24,21 +24,22 @@ Dynamo supports multinode deployments through the `multinode` section in resourc ...@@ -24,21 +24,22 @@ Dynamo supports multinode deployments through the `multinode` section in resourc
For sophisticated multinode deployments, Dynamo integrates with advanced Kubernetes orchestration systems: For sophisticated multinode deployments, Dynamo integrates with advanced Kubernetes orchestration systems:
- **[Grove](https://github.com/NVIDIA/grove/blob/main/docs/installation.md)**: Network topology-aware gang scheduling and auto-scaling for AI workloads - **[Grove](https://github.com/NVIDIA/grove)**: Network topology-aware gang scheduling and auto-scaling for AI workloads
- (optional) **[KAI-Scheduler](https://github.com/NVIDIA/KAI-Scheduler)**: Kubernetes native scheduler optimized for AI workloads at scale - **[KAI-Scheduler](https://github.com/NVIDIA/KAI-Scheduler)**: Kubernetes native scheduler optimized for AI workloads at scale
These systems provide enhanced scheduling capabilities including topology-aware placement, gang scheduling, and coordinated auto-scaling across multiple nodes. These systems provide enhanced scheduling capabilities including topology-aware placement, gang scheduling, and coordinated auto-scaling across multiple nodes.
**Features Enabled with Grove:** **Features Enabled with Grove:**
- Hierarchical gang scheduling with `PodGangSet` and `PodClique` - Declarative composition of AI workloads
- Multi-level horizontal auto-scaling - Multi-level horizontal auto-scaling
- Custom startup ordering for components - Custom startup ordering for components
- Resource-aware rolling updates - Resource-aware rolling updates
[KAI-Scheduler](https://github.com/NVIDIA/KAI-Scheduler) is an optional enhancement that provides a Kubernetes native scheduler optimized for AI workloads at large scale. [KAI-Scheduler](https://github.com/NVIDIA/KAI-Scheduler) is a Kubernetes native scheduler optimized for AI workloads at large scale.
**Features Enabled with KAI-Scheduler:** **Features Enabled with KAI-Scheduler:**
- Gang scheduling
- Network topology-aware pod placement - Network topology-aware pod placement
- AI workload-optimized scheduling algorithms - AI workload-optimized scheduling algorithms
- GPU resource awareness and allocation - GPU resource awareness and allocation
...@@ -46,6 +47,14 @@ These systems provide enhanced scheduling capabilities including topology-aware ...@@ -46,6 +47,14 @@ These systems provide enhanced scheduling capabilities including topology-aware
- Integration with Grove for enhanced capabilities - Integration with Grove for enhanced capabilities
- Performance optimizations for large-scale deployments - Performance optimizations for large-scale deployments
##### Prerequisites
- [Grove](https://github.com/NVIDIA/grove/blob/main/docs/installation.md) installed on the cluster
- (Optional) [KAI-Scheduler](https://github.com/NVIDIA/KAI-Scheduler) installed on the cluster with default queue name `dynamo` created. You can use a different queue name by setting the `nvidia.com/kai-scheduler-queue` annotation on the DGD resource.
KAI-Scheduler is optional but recommended for advanced scheduling capabilities.
#### Using LWS and Volcano #### Using LWS and Volcano
LWS is a simple multinode deployment mechanism that allows you to deploy a workload across multiple nodes. LWS is a simple multinode deployment mechanism that allows you to deploy a workload across multiple nodes.
...@@ -58,16 +67,70 @@ Volcano is a Kubernetes native scheduler optimized for AI workloads at scale. It ...@@ -58,16 +67,70 @@ Volcano is a Kubernetes native scheduler optimized for AI workloads at scale. It
## Core Concepts ## Core Concepts
### Orchestrator Selection Algorithm
Dynamo automatically selects the best available orchestrator for multinode deployments using the following logic:
#### When Both Grove and LWS are Available:
- **Grove is selected by default** (recommended for advanced AI workloads)
- **LWS is selected** if you explicitly set `nvidia.com/enable-grove: "false"` annotation on your DGD resource
#### When Only One Orchestrator is Available:
- The installed orchestrator (Grove or LWS) is automatically selected
#### Scheduler Integration:
- **With Grove**: Automatically integrates with [KAI-Scheduler](https://github.com/NVIDIA/KAI-Scheduler) when available, providing:
- Advanced queue management via `nvidia.com/kai-scheduler-queue` annotation
- AI-optimized scheduling policies
- Resource-aware workload placement
- **With LWS**: Uses Volcano scheduler for gang scheduling and resource coordination
#### Configuration Examples:
**Default (Grove with KAI-Scheduler):**
```yaml
apiVersion: nvidia.com/v1alpha1
kind: DynamoGraphDeployment
metadata:
name: my-multinode-deployment
annotations:
nvidia.com/kai-scheduler-queue: "gpu-intensive" # Optional: defaults to "dynamo"
spec:
# ... your deployment spec
```
**Force LWS usage:**
```yaml
apiVersion: nvidia.com/v1alpha1
kind: DynamoGraphDeployment
metadata:
name: my-multinode-deployment
annotations:
nvidia.com/enable-grove: "false"
spec:
# ... your deployment spec
```
### The `multinode` Section ### The `multinode` Section
The `multinode` section in a resource specification defines how many physical nodes the workload should span: The `multinode` section in a resource specification defines how many physical nodes the workload should span:
```yaml ```yaml
multinode: apiVersion: nvidia.com/v1alpha1
nodeCount: 2 kind: DynamoGraphDeployment
resources: metadata:
limits: name: my-multinode-deployment
gpu: "2" # 2 GPUs per node spec:
# ... your deployment spec
services:
my-service:
...
multinode:
nodeCount: 2
resources:
limits:
gpu: "2" # 2 GPUs per node
``` ```
### GPU Distribution ### GPU Distribution
...@@ -88,16 +151,28 @@ The tensor parallelism (`tp-size` or `--tp`) in your command/args must match the ...@@ -88,16 +151,28 @@ The tensor parallelism (`tp-size` or `--tp`) in your command/args must match the
```yaml ```yaml
# Example: 2 multinode.nodeCount × 4 GPUs = 8 total GPUs # Example: 2 multinode.nodeCount × 4 GPUs = 8 total GPUs
multinode: apiVersion: nvidia.com/v1alpha1
nodeCount: 2 kind: DynamoGraphDeployment
resources: metadata:
limits: name: my-multinode-deployment
gpu: "4" spec:
# ... your deployment spec
# Command args must use tp-size=8 services:
args: my-service:
- "--tp-size" ...
- "8" # Must equal multinode.nodeCount × gpu multinode:
nodeCount: 2
resources:
limits:
gpu: "4"
extraPodSpec:
mainContainer:
...
args:
# Command args must use tp-size=8
- "--tp-size"
- "8" # Must equal multinode.nodeCount × gpu
``` ```
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment