Unverified Commit 5d90e530 authored by Rohan Varma's avatar Rohan Varma Committed by GitHub
Browse files

fix: mpi flow and add resourceClaim (#3446)


Signed-off-by: default avatarRohan Varma <rohanv@nvidia.com>
parent 2626126a
...@@ -10173,6 +10173,26 @@ spec: ...@@ -10173,6 +10173,26 @@ spec:
Resources requested and limits for this component, including CPU, memory, Resources requested and limits for this component, including CPU, memory,
GPUs/devices, and any runtime-specific resources. GPUs/devices, and any runtime-specific resources.
properties: properties:
claims:
items:
description: ResourceClaim references one entry in PodSpec.ResourceClaims.
properties:
name:
description: |-
Name must match the name of one entry in pod.spec.resourceClaims of
the Pod where this field is used. It makes that resource available
inside a container.
type: string
request:
description: |-
Request is the name chosen for a request in the referenced claim.
If empty, everything from the claim is made available, otherwise
only the result of this request.
type: string
required:
- name
type: object
type: array
limits: limits:
properties: properties:
cpu: cpu:
......
...@@ -10307,6 +10307,26 @@ spec: ...@@ -10307,6 +10307,26 @@ spec:
Resources requested and limits for this component, including CPU, memory, Resources requested and limits for this component, including CPU, memory,
GPUs/devices, and any runtime-specific resources. GPUs/devices, and any runtime-specific resources.
properties: properties:
claims:
items:
description: ResourceClaim references one entry in PodSpec.ResourceClaims.
properties:
name:
description: |-
Name must match the name of one entry in pod.spec.resourceClaims of
the Pod where this field is used. It makes that resource available
inside a container.
type: string
request:
description: |-
Request is the name chosen for a request in the referenced claim.
If empty, everything from the claim is made available, otherwise
only the result of this request.
type: string
required:
- name
type: object
type: array
limits: limits:
properties: properties:
cpu: cpu:
......
...@@ -32,8 +32,9 @@ type ResourceItem struct { ...@@ -32,8 +32,9 @@ type ResourceItem struct {
} }
type Resources struct { type Resources struct {
Requests *ResourceItem `json:"requests,omitempty"` Requests *ResourceItem `json:"requests,omitempty"`
Limits *ResourceItem `json:"limits,omitempty"` Limits *ResourceItem `json:"limits,omitempty"`
Claims []corev1.ResourceClaim `json:"claims,omitempty"`
} }
type DeploymentTargetHPAConf struct { type DeploymentTargetHPAConf struct {
......
...@@ -190,6 +190,11 @@ func (in *Resources) DeepCopyInto(out *Resources) { ...@@ -190,6 +190,11 @@ func (in *Resources) DeepCopyInto(out *Resources) {
*out = new(ResourceItem) *out = new(ResourceItem)
(*in).DeepCopyInto(*out) (*in).DeepCopyInto(*out)
} }
if in.Claims != nil {
in, out := &in.Claims, &out.Claims
*out = make([]v1.ResourceClaim, len(*in))
copy(*out, *in)
}
} }
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Resources. // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Resources.
......
...@@ -10173,6 +10173,26 @@ spec: ...@@ -10173,6 +10173,26 @@ spec:
Resources requested and limits for this component, including CPU, memory, Resources requested and limits for this component, including CPU, memory,
GPUs/devices, and any runtime-specific resources. GPUs/devices, and any runtime-specific resources.
properties: properties:
claims:
items:
description: ResourceClaim references one entry in PodSpec.ResourceClaims.
properties:
name:
description: |-
Name must match the name of one entry in pod.spec.resourceClaims of
the Pod where this field is used. It makes that resource available
inside a container.
type: string
request:
description: |-
Request is the name chosen for a request in the referenced claim.
If empty, everything from the claim is made available, otherwise
only the result of this request.
type: string
required:
- name
type: object
type: array
limits: limits:
properties: properties:
cpu: cpu:
......
...@@ -10307,6 +10307,26 @@ spec: ...@@ -10307,6 +10307,26 @@ spec:
Resources requested and limits for this component, including CPU, memory, Resources requested and limits for this component, including CPU, memory,
GPUs/devices, and any runtime-specific resources. GPUs/devices, and any runtime-specific resources.
properties: properties:
claims:
items:
description: ResourceClaim references one entry in PodSpec.ResourceClaims.
properties:
name:
description: |-
Name must match the name of one entry in pod.spec.resourceClaims of
the Pod where this field is used. It makes that resource available
inside a container.
type: string
request:
description: |-
Request is the name chosen for a request in the referenced claim.
If empty, everything from the claim is made available, otherwise
only the result of this request.
type: string
required:
- name
type: object
type: array
limits: limits:
properties: properties:
cpu: cpu:
......
...@@ -468,6 +468,12 @@ func GetResourcesConfig(resources *common.Resources) (*corev1.ResourceRequiremen ...@@ -468,6 +468,12 @@ func GetResourcesConfig(resources *common.Resources) (*corev1.ResourceRequiremen
currentResources.Requests[corev1.ResourceName(k)] = q currentResources.Requests[corev1.ResourceName(k)] = q
} }
} }
if resources.Claims != nil {
if currentResources.Claims == nil {
currentResources.Claims = make([]corev1.ResourceClaim, 0)
}
currentResources.Claims = append(currentResources.Claims, resources.Claims...)
}
return currentResources, nil return currentResources, nil
} }
......
...@@ -143,12 +143,12 @@ func (b *TRTLLMBackend) setupLeaderContainer(container *corev1.Container, number ...@@ -143,12 +143,12 @@ func (b *TRTLLMBackend) setupLeaderContainer(container *corev1.Container, number
// Build mpirun command with explicit SSH configuration and environment variables // Build mpirun command with explicit SSH configuration and environment variables
// Wrap the entire command (trtllm-llmapi-launch + original command) in bash -c for proper shell interpretation // Wrap the entire command (trtllm-llmapi-launch + original command) in bash -c for proper shell interpretation
wrappedCommand := fmt.Sprintf("bash -c 'source /opt/dynamo/venv/bin/activate && trtllm-llmapi-launch %s'", originalCommand) wrappedCommand := fmt.Sprintf("bash -c 'trtllm-llmapi-launch %s'", originalCommand)
// Generate environment variable flags for mpirun // Generate environment variable flags for mpirun
envVarsStr := generateEnvVarFlags(container.Env) envVarsStr := generateEnvVarFlags(container.Env)
mpirunCmd := fmt.Sprintf("mpirun --oversubscribe -n %d -H %s --mca pml ob1 --mca plm_rsh_args \"-p %d -o StrictHostKeyChecking=no -i ~/.ssh/id_rsa\" %s %s", mpirunCmd := fmt.Sprintf("mpirun --allow-run-as-root --oversubscribe -n %d -H %s --mca pml ob1 --mca plm_rsh_args \"-p %d -o StrictHostKeyChecking=no -i ~/.ssh/id_rsa\" %s %s",
totalGPUs, totalGPUs,
workerHosts, workerHosts,
commonconsts.MpiRunSshPort, commonconsts.MpiRunSshPort,
......
...@@ -759,6 +759,14 @@ func GenerateBasePodSpec( ...@@ -759,6 +759,14 @@ func GenerateBasePodSpec(
maps.Copy(container.Resources.Limits, overrideResources.Limits) maps.Copy(container.Resources.Limits, overrideResources.Limits)
} }
// Claims
if overrideResources != nil && len(overrideResources.Claims) > 0 {
if container.Resources.Claims == nil {
container.Resources.Claims = []corev1.ResourceClaim{}
}
container.Resources.Claims = append(container.Resources.Claims, overrideResources.Claims...)
}
shouldDisableImagePullSecret := component.Annotations[commonconsts.KubeAnnotationDisableImagePullSecretDiscovery] == commonconsts.KubeLabelValueTrue shouldDisableImagePullSecret := component.Annotations[commonconsts.KubeAnnotationDisableImagePullSecretDiscovery] == commonconsts.KubeLabelValueTrue
imagePullSecrets := []corev1.LocalObjectReference{} imagePullSecrets := []corev1.LocalObjectReference{}
...@@ -846,6 +854,7 @@ func GenerateBasePodSpec( ...@@ -846,6 +854,7 @@ func GenerateBasePodSpec(
podSpec.Containers = append(podSpec.Containers, container) podSpec.Containers = append(podSpec.Containers, container)
podSpec.Volumes = append(podSpec.Volumes, volumes...) podSpec.Volumes = append(podSpec.Volumes, volumes...)
podSpec.ImagePullSecrets = append(podSpec.ImagePullSecrets, imagePullSecrets...) podSpec.ImagePullSecrets = append(podSpec.ImagePullSecrets, imagePullSecrets...)
backend.UpdatePodSpec(&podSpec, numberOfNodes, role, component, serviceName) backend.UpdatePodSpec(&podSpec, numberOfNodes, role, component, serviceName)
return controller_common.CanonicalizePodSpec(&podSpec), nil return controller_common.CanonicalizePodSpec(&podSpec), nil
} }
......
...@@ -4751,6 +4751,271 @@ func TestGenerateBasePodSpec_VolumeMounts(t *testing.T) { ...@@ -4751,6 +4751,271 @@ func TestGenerateBasePodSpec_VolumeMounts(t *testing.T) {
} }
} }
func TestGenerateBasePodSpec_ResourceClaims(t *testing.T) {
secretsRetriever := &mockSecretsRetriever{}
controllerConfig := controller_common.Config{}
tests := []struct {
name string
component *v1alpha1.DynamoComponentDeploymentOverridesSpec
expectError bool
expectedResourceClaims []corev1.ResourceClaim
expectedPodClaims []corev1.PodResourceClaim
expectedVolumes []corev1.Volume
}{
{
name: "component with resource claims",
component: &v1alpha1.DynamoComponentDeploymentOverridesSpec{
DynamoComponentDeploymentSharedSpec: v1alpha1.DynamoComponentDeploymentSharedSpec{
ComponentType: commonconsts.ComponentTypeWorker,
Resources: &common.Resources{
Requests: &common.ResourceItem{
CPU: "130",
Memory: "800Gi",
},
Limits: &common.ResourceItem{
CPU: "130",
Memory: "800Gi",
GPU: "4",
},
Claims: []corev1.ResourceClaim{
{
Name: "compute-domain-channel",
},
},
},
ExtraPodSpec: &common.ExtraPodSpec{
PodSpec: &corev1.PodSpec{
ResourceClaims: []corev1.PodResourceClaim{
{
Name: "compute-domain-channel",
ResourceClaimTemplateName: ptr.To("trtllm-test-compute-domain-channel"),
},
},
Volumes: []corev1.Volume{
{
Name: "model-storage",
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: "dynamo-pvc",
},
},
},
},
},
MainContainer: &corev1.Container{
Image: "rohanv672/dynamo:v0.5.1-trtllm",
Args: []string{
"python3 -m dynamo.trtllm --model-path /data/deepseek-r1 --served-model-name deepseek-ai/DeepSeek-R1 --extra-engine-args /data/engine_configs/wide_ep_agg.yaml",
},
Command: []string{"/bin/sh", "-c"},
VolumeMounts: []corev1.VolumeMount{
{
Name: "model-storage",
MountPath: "/data",
},
},
},
},
},
},
expectError: false,
expectedResourceClaims: []corev1.ResourceClaim{
{
Name: "compute-domain-channel",
},
},
expectedPodClaims: []corev1.PodResourceClaim{
{
Name: "compute-domain-channel",
ResourceClaimTemplateName: ptr.To("trtllm-test-compute-domain-channel"),
},
},
expectedVolumes: []corev1.Volume{
{
Name: "model-storage",
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: "dynamo-pvc",
},
},
},
{
Name: "shared-memory",
VolumeSource: corev1.VolumeSource{
EmptyDir: &corev1.EmptyDirVolumeSource{
Medium: corev1.StorageMediumMemory,
SizeLimit: func() *resource.Quantity { q := resource.MustParse(commonconsts.DefaultSharedMemorySize); return &q }(),
},
},
},
},
},
{
name: "component with multiple resource claims",
component: &v1alpha1.DynamoComponentDeploymentOverridesSpec{
DynamoComponentDeploymentSharedSpec: v1alpha1.DynamoComponentDeploymentSharedSpec{
ComponentType: commonconsts.ComponentTypeWorker,
Resources: &common.Resources{
Claims: []corev1.ResourceClaim{
{
Name: "compute-domain-channel",
},
{
Name: "network-domain-channel",
},
},
},
ExtraPodSpec: &common.ExtraPodSpec{
PodSpec: &corev1.PodSpec{
ResourceClaims: []corev1.PodResourceClaim{
{
Name: "compute-domain-channel",
ResourceClaimTemplateName: ptr.To("compute-template"),
},
{
Name: "network-domain-channel",
ResourceClaimTemplateName: ptr.To("network-template"),
},
},
},
MainContainer: &corev1.Container{
Image: "test-image",
Command: []string{"python3"},
Args: []string{"-m", "dynamo.worker"},
},
},
},
},
expectError: false,
expectedResourceClaims: []corev1.ResourceClaim{
{
Name: "compute-domain-channel",
},
{
Name: "network-domain-channel",
},
},
expectedPodClaims: []corev1.PodResourceClaim{
{
Name: "compute-domain-channel",
ResourceClaimTemplateName: ptr.To("compute-template"),
},
{
Name: "network-domain-channel",
ResourceClaimTemplateName: ptr.To("network-template"),
},
},
},
{
name: "component without resource claims",
component: &v1alpha1.DynamoComponentDeploymentOverridesSpec{
DynamoComponentDeploymentSharedSpec: v1alpha1.DynamoComponentDeploymentSharedSpec{
ComponentType: commonconsts.ComponentTypeFrontend,
Resources: &common.Resources{
Requests: &common.ResourceItem{
CPU: "1",
Memory: "1Gi",
},
},
},
},
expectError: false,
expectedResourceClaims: nil,
expectedPodClaims: nil,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
podSpec, err := GenerateBasePodSpec(
tt.component,
BackendFrameworkTRTLLM,
secretsRetriever,
"test-deployment",
"default",
RoleMain,
1,
controllerConfig,
commonconsts.MultinodeDeploymentTypeGrove,
"test-service",
)
if tt.expectError {
if err == nil {
t.Errorf("GenerateBasePodSpec() expected error, got nil")
}
return
}
if err != nil {
t.Errorf("GenerateBasePodSpec() unexpected error: %v", err)
return
}
// Check containers exist
if len(podSpec.Containers) == 0 {
t.Errorf("GenerateBasePodSpec() no containers found")
return
}
container := podSpec.Containers[0]
// Check resource claims in container resources using reflect.DeepEqual
if !reflect.DeepEqual(container.Resources.Claims, tt.expectedResourceClaims) {
t.Errorf("GenerateBasePodSpec() resource claims mismatch:\ngot: %+v\nwant: %+v",
container.Resources.Claims, tt.expectedResourceClaims)
}
// Check pod resource claims using reflect.DeepEqual
if !reflect.DeepEqual(podSpec.ResourceClaims, tt.expectedPodClaims) {
t.Errorf("GenerateBasePodSpec() pod resource claims mismatch:\ngot: %+v\nwant: %+v",
podSpec.ResourceClaims, tt.expectedPodClaims)
}
// Check expected volumes if specified using reflect.DeepEqual
if tt.expectedVolumes != nil {
if !reflect.DeepEqual(podSpec.Volumes, tt.expectedVolumes) {
t.Errorf("GenerateBasePodSpec() volumes mismatch:\ngot: %+v\nwant: %+v",
podSpec.Volumes, tt.expectedVolumes)
}
}
// Verify resource requests and limits are properly set when claims are present
if len(tt.expectedResourceClaims) > 0 {
// Check that standard resources are still processed correctly
if tt.component.Resources != nil {
if tt.component.Resources.Requests != nil {
if tt.component.Resources.Requests.CPU != "" {
if container.Resources.Requests == nil {
t.Errorf("GenerateBasePodSpec() expected CPU request to be set")
} else if cpu, exists := container.Resources.Requests[corev1.ResourceCPU]; !exists || cpu.IsZero() {
t.Errorf("GenerateBasePodSpec() expected CPU request to be set")
}
}
if tt.component.Resources.Requests.Memory != "" {
if container.Resources.Requests == nil {
t.Errorf("GenerateBasePodSpec() expected Memory request to be set")
} else if memory, exists := container.Resources.Requests[corev1.ResourceMemory]; !exists || memory.IsZero() {
t.Errorf("GenerateBasePodSpec() expected Memory request to be set")
}
}
}
if tt.component.Resources.Limits != nil {
if tt.component.Resources.Limits.GPU != "" {
if container.Resources.Limits == nil {
t.Errorf("GenerateBasePodSpec() expected GPU limit to be set")
} else if gpu, exists := container.Resources.Limits[corev1.ResourceName("nvidia.com/gpu")]; !exists || gpu.IsZero() {
t.Errorf("GenerateBasePodSpec() expected GPU limit to be set")
}
}
}
}
}
})
}
}
func TestGenerateBasePodSpec_UseAsCompilationCache_BackendSupport(t *testing.T) { func TestGenerateBasePodSpec_UseAsCompilationCache_BackendSupport(t *testing.T) {
secretsRetriever := &mockSecretsRetriever{} secretsRetriever := &mockSecretsRetriever{}
controllerConfig := controller_common.Config{} controllerConfig := controller_common.Config{}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment