"lib/llm/src/entrypoint/input/common.rs" did not exist on "c7067fc2eca4f65d48acf4f4b78fd72c7488ac92"
Unverified Commit a97602a2 authored by julienmancuso's avatar julienmancuso Committed by GitHub
Browse files

fix: Implement scaling Grove subresources (#2531)

parent af3d8aa0
......@@ -19,11 +19,11 @@ maintainers:
url: https://www.nvidia.com
description: A Helm chart for NVIDIA Dynamo Platform.
type: application
version: 0.4.0
version: 0.4.1
home: https://nvidia.com
dependencies:
- name: dynamo-operator
version: 0.4.0
version: 0.4.1
repository: file://components/operator
condition: dynamo-operator.enabled
- name: nats
......
......@@ -27,9 +27,9 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.4.0
version: 0.4.1
# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
appVersion: "0.4.0"
appVersion: "0.4.1"
......@@ -128,6 +128,15 @@ rules:
- patch
- update
- watch
- apiGroups:
- grove.io
resources:
- podcliques/scale
- podcliquescalinggroups/scale
verbs:
- get
- patch
- update
- apiGroups:
- apps
resources:
......
......@@ -31,9 +31,13 @@ import (
clientv3 "go.etcd.io/etcd/client/v3"
corev1 "k8s.io/api/core/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/scale"
k8sCache "k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/cache"
......@@ -65,6 +69,34 @@ var (
setupLog = ctrl.Log.WithName("setup")
)
func createScalesGetter(mgr ctrl.Manager) (scale.ScalesGetter, error) {
config := mgr.GetConfig()
// Create kubernetes client for discovery
kubeClient, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
// Create cached discovery client
cachedDiscovery := memory.NewMemCacheClient(kubeClient.Discovery())
// Create REST mapper
restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cachedDiscovery)
scalesGetter, err := scale.NewForConfig(
config,
restMapper,
dynamic.LegacyAPIPathResolverFunc,
scale.NewDiscoveryScaleKindResolver(cachedDiscovery),
)
if err != nil {
return nil, err
}
return scalesGetter, nil
}
func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
......@@ -321,11 +353,19 @@ func main() {
setupLog.Error(err, "unable to create controller", "controller", "DynamoComponentDeployment")
os.Exit(1)
}
// Create scale client for Grove resource scaling
scaleClient, err := createScalesGetter(mgr)
if err != nil {
setupLog.Error(err, "unable to create scale client")
os.Exit(1)
}
if err = (&controller.DynamoGraphDeploymentReconciler{
Client: mgr.GetClient(),
Recorder: mgr.GetEventRecorderFor("dynamographdeployment"),
Config: ctrlConfig,
DockerSecretRetriever: dockerSecretRetriever,
ScaleClient: scaleClient,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "DynamoGraphDeployment")
os.Exit(1)
......
......@@ -98,6 +98,15 @@ rules:
- patch
- update
- watch
- apiGroups:
- grove.io
resources:
- podcliques/scale
- podcliquescalinggroups/scale
verbs:
- get
- patch
- update
- apiGroups:
- grove.io
resources:
......
......@@ -20,12 +20,17 @@ package controller
import (
"context"
"fmt"
"strings"
grovev1alpha1 "github.com/NVIDIA/grove/operator/api/core/v1alpha1"
"k8s.io/apimachinery/pkg/api/errors"
networkingv1beta1 "istio.io/client-go/pkg/apis/networking/v1beta1"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/scale"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
......@@ -50,6 +55,20 @@ const (
PendingState State = "pending"
)
var (
// Grove GroupVersionResources for scaling operations
podCliqueGVR = schema.GroupVersionResource{
Group: "grove.io",
Version: "v1alpha1",
Resource: "podcliques",
}
podCliqueScalingGroupGVR = schema.GroupVersionResource{
Group: "grove.io",
Version: "v1alpha1",
Resource: "podcliquescalinggroups",
}
)
type etcdStorage interface {
DeleteKeys(ctx context.Context, prefix string) error
}
......@@ -60,12 +79,15 @@ type DynamoGraphDeploymentReconciler struct {
Config commonController.Config
Recorder record.EventRecorder
DockerSecretRetriever dockerSecretRetriever
ScaleClient scale.ScalesGetter
}
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments/finalizers,verbs=update
// +kubebuilder:rbac:groups=grove.io,resources=podgangsets,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=grove.io,resources=podcliques/scale,verbs=get;update;patch
// +kubebuilder:rbac:groups=grove.io,resources=podcliquescalinggroups/scale,verbs=get;update;patch
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
......@@ -156,6 +178,80 @@ func (r *DynamoGraphDeploymentReconciler) reconcileResources(ctx context.Context
}
// scaleGroveResource scales a Grove resource using the generic scaling function
func (r *DynamoGraphDeploymentReconciler) scaleGroveResource(ctx context.Context, resourceName, namespace string, newReplicas int32, resourceType string) error {
logger := log.FromContext(ctx)
// Determine the GroupVersionResource based on resource type
var gvr schema.GroupVersionResource
switch resourceType {
case "PodClique":
gvr = podCliqueGVR
case "PodCliqueScalingGroup":
gvr = podCliqueScalingGroupGVR
default:
return fmt.Errorf("unsupported Grove resource type: %s", resourceType)
}
// Use the generic scaling function
err := commonController.ScaleResource(ctx, r.ScaleClient, gvr, namespace, resourceName, newReplicas)
if err != nil {
if errors.IsNotFound(err) {
// Resource doesn't exist yet - this is normal during initial creation when Grove is still creating the resources asynchronously
logger.V(1).Info("Grove resource not found yet, skipping scaling for now - will retry on next reconciliation", "gvr", gvr, "name", resourceName, "namespace", namespace)
return nil
}
}
return err
}
// reconcileGroveScaling handles scaling operations for Grove resources based on service replica changes
func (r *DynamoGraphDeploymentReconciler) reconcileGroveScaling(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) error {
logger := log.FromContext(ctx)
logger.V(1).Info("Reconciling Grove scaling operations")
replicaIndex := 0
for serviceName, component := range dynamoDeployment.Spec.Services {
// Skip if replicas are not specified
if component.Replicas == nil {
continue
}
numberOfNodes := component.GetNumberOfNodes()
isMultinode := numberOfNodes > 1
if isMultinode {
// Scale PodCliqueScalingGroup for multinode services
// Grove naming pattern: {DGD.name}-{replicaIndex}-{serviceName}
resourceName := fmt.Sprintf("%s-%d-%s", dynamoDeployment.Name, replicaIndex, strings.ToLower(serviceName))
err := r.scaleGroveResource(ctx,
resourceName,
dynamoDeployment.Namespace,
*component.Replicas,
"PodCliqueScalingGroup")
if err != nil {
logger.Error(err, "Failed to scale PodCliqueScalingGroup", "serviceName", serviceName, "resourceName", resourceName, "replicas", *component.Replicas)
return fmt.Errorf("failed to scale PodCliqueScalingGroup %s: %w", resourceName, err)
}
} else {
// Scale individual PodClique for single-node services
// Grove naming pattern: {DGD.name}-{replicaIndex}-{serviceName}
resourceName := fmt.Sprintf("%s-%d-%s", dynamoDeployment.Name, replicaIndex, strings.ToLower(serviceName))
err := r.scaleGroveResource(ctx,
resourceName,
dynamoDeployment.Namespace,
*component.Replicas,
"PodClique")
if err != nil {
logger.Error(err, "Failed to scale PodClique", "serviceName", serviceName, "resourceName", resourceName, "replicas", *component.Replicas)
return fmt.Errorf("failed to scale PodClique %s: %w", resourceName, err)
}
}
}
logger.V(1).Info("Successfully reconciled Grove scaling operations")
return nil
}
func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) (State, Reason, Message, error) {
logger := log.FromContext(ctx)
// generate the dynamoComponentsDeployments from the config
......@@ -177,6 +273,13 @@ func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Co
}
return false
})
// Handle Grove scaling operations after structural changes
if err := r.reconcileGroveScaling(ctx, dynamoDeployment); err != nil {
logger.Error(err, "failed to reconcile Grove scaling")
return FailedState, "grove_scaling_failed", Message(err.Error()), err
}
resources := []Resource{groveGangSetAsResource}
for componentName, component := range dynamoDeployment.Spec.Services {
if component.ComponentType == consts.ComponentTypeFrontend {
......@@ -203,10 +306,6 @@ func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Co
ingressSpec = *component.Ingress
}
mainComponentIngress := dynamo.GenerateComponentIngress(ctx, dynamo.GetDynamoComponentName(dynamoDeployment, componentName), dynamoDeployment.Namespace, ingressSpec)
if err != nil {
logger.Error(err, "failed to generate the main component ingress")
return "", "", "", fmt.Errorf("failed to generate the main component ingress: %w", err)
}
_, syncedMainComponentIngress, err := commonController.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*networkingv1.Ingress, bool, error) {
if !ingressSpec.Enabled || ingressSpec.IngressControllerClassName == nil {
logger.Info("Ingress is not enabled")
......@@ -224,10 +323,6 @@ func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Co
// generate the main component virtual service
if r.Config.IngressConfig.UseVirtualService() {
mainComponentVirtualService := dynamo.GenerateComponentVirtualService(ctx, dynamo.GetDynamoComponentName(dynamoDeployment, componentName), dynamoDeployment.Namespace, ingressSpec)
if err != nil {
logger.Error(err, "failed to generate the main component virtual service")
return "", "", "", fmt.Errorf("failed to generate the main component virtual service: %w", err)
}
_, syncedMainComponentVirtualService, err := commonController.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*networkingv1beta1.VirtualService, bool, error) {
if !ingressSpec.IsVirtualServiceEnabled() {
logger.Info("VirtualService is not enabled")
......
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package controller_common
import (
"context"
"fmt"
autoscalingv1 "k8s.io/api/autoscaling/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/scale"
"sigs.k8s.io/controller-runtime/pkg/log"
)
// ScaleResource scales any Kubernetes resource using the Scale subresource
func ScaleResource(ctx context.Context, scaleClient scale.ScalesGetter, gvr schema.GroupVersionResource, namespace, name string, replicas int32) error {
logger := log.FromContext(ctx)
logger.Info("Scaling resource", "gvr", gvr, "name", name, "namespace", namespace, "replicas", replicas)
if scaleClient == nil {
logger.Error(nil, "Scale client is nil")
return fmt.Errorf("scale client is nil")
}
currentScale, err := scaleClient.Scales(namespace).Get(ctx, gvr.GroupResource(), name, metav1.GetOptions{})
if err != nil {
logger.Error(err, "Failed to get current scale - resource may not support scale subresource", "gvr", gvr, "name", name, "namespace", namespace, "groupResource", gvr.GroupResource())
return fmt.Errorf("failed to get current scale for %s %s (resource may not support scale subresource): %w", gvr.Resource, name, err)
}
if replicas < 0 {
return fmt.Errorf("replicas must be >= 0, got %d", replicas)
}
if currentScale.Spec.Replicas == replicas {
logger.V(1).Info("Resource already at desired replica count", "gvr", gvr, "name", name, "replicas", replicas)
return nil
}
scaleObj := &autoscalingv1.Scale{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
ResourceVersion: currentScale.ObjectMeta.ResourceVersion,
},
Spec: autoscalingv1.ScaleSpec{
Replicas: replicas,
},
}
logger.V(1).Info("Updating scale", "gvr", gvr, "name", name, "newReplicas", replicas)
_, err = scaleClient.Scales(namespace).Update(ctx, gvr.GroupResource(), scaleObj, metav1.UpdateOptions{})
if err != nil {
logger.Error(err, "Failed to update scale", "gvr", gvr, "name", name, "replicas", replicas)
return fmt.Errorf("failed to update scale for %s %s: %w", gvr.Resource, name, err)
}
logger.Info("Successfully scaled resource", "gvr", gvr, "name", name, "oldReplicas", currentScale.Spec.Replicas, "newReplicas", replicas)
return nil
}
......@@ -17,5 +17,5 @@ apiVersion: v2
name: dynamo-graph
description: A Helm chart to deploy a Dynamo graph on Kubernetes
type: application
version: 0.4.0
appVersion: 0.4.0
version: 0.4.1
appVersion: 0.4.1
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment