Unverified Commit ad0648f4 authored by hhzhang16's avatar hhzhang16 Committed by GitHub
Browse files

test: add error bubbling from profiling and more error handling tests (#3785)


Signed-off-by: default avatarHannah Zhang <hannahz@nvidia.com>
parent eb73c2b0
......@@ -236,6 +236,7 @@ func (r *DynamoGraphDeploymentRequestReconciler) FinalizeResource(ctx context.Co
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments/finalizers,verbs=update
// +kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch
// +kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=events,verbs=create;patch
......@@ -1004,6 +1005,11 @@ func (r *DynamoGraphDeploymentRequestReconciler) checkProfilingJobStatus(ctx con
return true, nil
}
if condition.Type == batchv1.JobFailed && condition.Status == corev1.ConditionTrue {
// Get detailed error from pod logs
detailedError := r.getProfilingJobErrorDetails(ctx, dgdr, job)
if detailedError != "" {
return false, fmt.Errorf("profiling job failed: %s. Details: %s", condition.Message, detailedError)
}
return false, fmt.Errorf("profiling job failed: %s", condition.Message)
}
}
......@@ -1011,6 +1017,56 @@ func (r *DynamoGraphDeploymentRequestReconciler) checkProfilingJobStatus(ctx con
return false, nil
}
// getProfilingJobErrorDetails retrieves detailed error information from failed profiling job pods
func (r *DynamoGraphDeploymentRequestReconciler) getProfilingJobErrorDetails(ctx context.Context, dgdr *nvidiacomv1alpha1.DynamoGraphDeploymentRequest, job *batchv1.Job) string {
logger := log.FromContext(ctx)
// List pods owned by this job
podList := &corev1.PodList{}
labelSelector := client.MatchingLabels{
"job-name": job.Name,
}
if err := r.List(ctx, podList, client.InNamespace(dgdr.Namespace), labelSelector); err != nil {
logger.Error(err, "Failed to list pods for profiling job")
return ""
}
// Look for failed pods and extract error details
for _, pod := range podList.Items {
// Check pod phase and container statuses
if pod.Status.Phase == corev1.PodFailed {
// Get profiler container status (first container)
for _, containerStatus := range pod.Status.ContainerStatuses {
if containerStatus.Name == ContainerNameProfiler && containerStatus.State.Terminated != nil {
terminated := containerStatus.State.Terminated
// Construct detailed error message
errorMsg := fmt.Sprintf("Pod: %s, Container: %s, ExitCode: %d, Reason: %s",
pod.Name, containerStatus.Name, terminated.ExitCode, terminated.Reason)
if terminated.Message != "" {
errorMsg += fmt.Sprintf(", Message: %s", terminated.Message)
}
logger.Info("Retrieved profiling job error details", "error", errorMsg)
return errorMsg
}
}
// If no terminated state found, check waiting state
for _, containerStatus := range pod.Status.ContainerStatuses {
if containerStatus.Name == ContainerNameProfiler && containerStatus.State.Waiting != nil {
waiting := containerStatus.State.Waiting
errorMsg := fmt.Sprintf("Pod: %s, Container: %s, Waiting - Reason: %s, Message: %s",
pod.Name, containerStatus.Name, waiting.Reason, waiting.Message)
logger.Info("Retrieved profiling job waiting details", "error", errorMsg)
return errorMsg
}
}
}
}
return ""
}
// generateDGDSpec generates DGD spec from profiling results (online or offline/AIC)
func (r *DynamoGraphDeploymentRequestReconciler) generateDGDSpec(ctx context.Context, dgdr *nvidiacomv1alpha1.DynamoGraphDeploymentRequest) error {
logger := log.FromContext(ctx)
......
......@@ -27,6 +27,7 @@ import (
. "github.com/onsi/gomega"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
......@@ -872,3 +873,314 @@ var _ = Describe("DGDR Validation", func() {
})
})
})
var _ = Describe("DGDR Profiler Arguments", func() {
var reconciler *DynamoGraphDeploymentRequestReconciler
BeforeEach(func() {
reconciler = &DynamoGraphDeploymentRequestReconciler{
Client: k8sClient,
Recorder: record.NewFakeRecorder(100),
ProfilerImage: "test-profiler:latest",
Config: commonController.Config{
RestrictedNamespace: "",
},
RBACManager: &MockRBACManager{},
}
})
Context("When creating profiling job with all arguments", func() {
It("Should include all SLA and GPU arguments for online profiling", func() {
ctx := context.Background()
namespace := "default"
dgdrName := "test-args-online"
// Create ServiceAccount
sa := &corev1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
Name: ServiceAccountProfilingJob,
Namespace: namespace,
},
}
_ = k8sClient.Create(ctx, sa)
defer k8sClient.Delete(ctx, sa)
dgdr := &nvidiacomv1alpha1.DynamoGraphDeploymentRequest{
ObjectMeta: metav1.ObjectMeta{
Name: dgdrName,
Namespace: namespace,
},
Spec: nvidiacomv1alpha1.DynamoGraphDeploymentRequestSpec{
ModelName: "test-model",
Backend: BackendTRTLLM,
SLA: nvidiacomv1alpha1.SLASpec{
TTFT: 50,
ITL: 10,
ISL: 3000,
OSL: 500,
},
Online: true,
GPU: &nvidiacomv1alpha1.GPUSpec{
Type: "h200_sxm",
MinNumGPUsPerEngine: 2,
MaxNumGPUsPerEngine: 4,
},
},
}
Expect(k8sClient.Create(ctx, dgdr)).Should(Succeed())
defer k8sClient.Delete(ctx, dgdr)
// Re-fetch DGDR to get proper metadata from API server
var fetchedDGDR nvidiacomv1alpha1.DynamoGraphDeploymentRequest
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: dgdrName, Namespace: namespace}, &fetchedDGDR)).Should(Succeed())
// Create profiling job with properly initialized DGDR
err := reconciler.createProfilingJob(ctx, &fetchedDGDR)
Expect(err).NotTo(HaveOccurred())
// Verify job was created
jobName := getProfilingJobName(&fetchedDGDR)
job := &batchv1.Job{}
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: namespace}, job)).Should(Succeed())
// Verify profiler container arguments
profilerContainer := job.Spec.Template.Spec.Containers[0]
args := profilerContainer.Args
// Check all required arguments are present
Expect(args).Should(ContainElement("--namespace"))
Expect(args).Should(ContainElement(namespace))
Expect(args).Should(ContainElement("--backend"))
Expect(args).Should(ContainElement(BackendTRTLLM))
Expect(args).Should(ContainElement("--ttft"))
Expect(args).Should(ContainElement("50"))
Expect(args).Should(ContainElement("--itl"))
Expect(args).Should(ContainElement("10"))
Expect(args).Should(ContainElement("--isl"))
Expect(args).Should(ContainElement("3000"))
Expect(args).Should(ContainElement("--osl"))
Expect(args).Should(ContainElement("500"))
Expect(args).Should(ContainElement("--min-num-gpus-per-engine"))
Expect(args).Should(ContainElement("2"))
Expect(args).Should(ContainElement("--max-num-gpus-per-engine"))
Expect(args).Should(ContainElement("4"))
Expect(args).Should(ContainElement("--output-dir"))
// Verify AI Configurator args are NOT present for online profiling
Expect(args).ShouldNot(ContainElement("--use-ai-configurator"))
// Clean up
k8sClient.Delete(ctx, job)
})
It("Should include AI Configurator arguments for offline profiling", func() {
ctx := context.Background()
namespace := "default"
dgdrName := "test-args-offline"
// Create ServiceAccount
sa := &corev1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
Name: ServiceAccountProfilingJob,
Namespace: namespace,
},
}
_ = k8sClient.Create(ctx, sa)
defer k8sClient.Delete(ctx, sa)
dgdr := &nvidiacomv1alpha1.DynamoGraphDeploymentRequest{
ObjectMeta: metav1.ObjectMeta{
Name: dgdrName,
Namespace: namespace,
},
Spec: nvidiacomv1alpha1.DynamoGraphDeploymentRequestSpec{
ModelName: "QWEN3_32B",
Backend: BackendTRTLLM,
SLA: nvidiacomv1alpha1.SLASpec{
TTFT: 50,
ITL: 10,
ISL: 3000,
OSL: 500,
},
Online: false, // Offline profiling
GPU: &nvidiacomv1alpha1.GPUSpec{
Type: "h200_sxm",
MinNumGPUsPerEngine: 1,
MaxNumGPUsPerEngine: 8,
},
},
}
Expect(k8sClient.Create(ctx, dgdr)).Should(Succeed())
defer k8sClient.Delete(ctx, dgdr)
// Re-fetch DGDR to get proper metadata from API server
var fetchedDGDR nvidiacomv1alpha1.DynamoGraphDeploymentRequest
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: dgdrName, Namespace: namespace}, &fetchedDGDR)).Should(Succeed())
// Create profiling job with properly initialized DGDR
err := reconciler.createProfilingJob(ctx, &fetchedDGDR)
Expect(err).NotTo(HaveOccurred())
// Verify job was created
jobName := getProfilingJobName(&fetchedDGDR)
job := &batchv1.Job{}
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: namespace}, job)).Should(Succeed())
// Verify profiler container arguments
profilerContainer := job.Spec.Template.Spec.Containers[0]
args := profilerContainer.Args
// Check AI Configurator arguments are present
Expect(args).Should(ContainElement("--use-ai-configurator"))
Expect(args).Should(ContainElement("--aic-model-name"))
Expect(args).Should(ContainElement("QWEN3_32B"))
Expect(args).Should(ContainElement("--aic-system"))
Expect(args).Should(ContainElement("h200_sxm"))
Expect(args).Should(ContainElement("--aic-backend-version"))
Expect(args).Should(ContainElement("0.20.0"))
// Clean up
k8sClient.Delete(ctx, job)
})
})
})
var _ = Describe("DGDR Error Handling", func() {
var reconciler *DynamoGraphDeploymentRequestReconciler
var recorder *record.FakeRecorder
BeforeEach(func() {
recorder = record.NewFakeRecorder(100)
reconciler = &DynamoGraphDeploymentRequestReconciler{
Client: k8sClient,
Recorder: recorder,
ProfilerImage: "test-profiler:latest",
Config: commonController.Config{
RestrictedNamespace: "",
},
RBACManager: &MockRBACManager{},
}
})
Context("When profiling job fails", func() {
It("Should capture detailed error from pod termination state", func() {
ctx := context.Background()
namespace := "default"
dgdrName := "test-error-capture"
dgdr := &nvidiacomv1alpha1.DynamoGraphDeploymentRequest{
ObjectMeta: metav1.ObjectMeta{
Name: dgdrName,
Namespace: namespace,
},
Spec: nvidiacomv1alpha1.DynamoGraphDeploymentRequestSpec{
ModelName: "test-model",
Backend: BackendVLLM,
SLA: nvidiacomv1alpha1.SLASpec{
TTFT: 100,
ITL: 1500,
ISL: 3000,
OSL: 5,
},
Online: true,
},
}
Expect(k8sClient.Create(ctx, dgdr)).Should(Succeed())
defer k8sClient.Delete(ctx, dgdr)
// Set status to Profiling
dgdr.Status.State = StateProfiling
Expect(k8sClient.Status().Update(ctx, dgdr)).Should(Succeed())
// Create failed job
jobName := getProfilingJobName(dgdr)
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: jobName,
Namespace: namespace,
},
Spec: batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Name: ContainerNameProfiler,
Image: "test",
}},
RestartPolicy: corev1.RestartPolicyNever,
},
},
},
Status: batchv1.JobStatus{
Conditions: []batchv1.JobCondition{{
Type: batchv1.JobFailed,
Status: corev1.ConditionTrue,
Message: "BackoffLimitExceeded",
}},
},
}
Expect(k8sClient.Create(ctx, job)).Should(Succeed())
defer k8sClient.Delete(ctx, job)
// Update job status
job.Status.Conditions = []batchv1.JobCondition{{
Type: batchv1.JobFailed,
Status: corev1.ConditionTrue,
Message: "BackoffLimitExceeded",
}}
Expect(k8sClient.Status().Update(ctx, job)).Should(Succeed())
// Create failed pod with termination details
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: jobName + "-pod",
Namespace: namespace,
Labels: map[string]string{
"job-name": jobName,
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Name: ContainerNameProfiler,
Image: "test",
}},
RestartPolicy: corev1.RestartPolicyNever,
},
Status: corev1.PodStatus{
Phase: corev1.PodFailed,
ContainerStatuses: []corev1.ContainerStatus{{
Name: ContainerNameProfiler,
State: corev1.ContainerState{
Terminated: &corev1.ContainerStateTerminated{
ExitCode: 1,
Reason: "Error",
Message: "ValueError: Invalid model name for AI Configurator",
},
},
}},
},
}
Expect(k8sClient.Create(ctx, pod)).Should(Succeed())
defer k8sClient.Delete(ctx, pod)
// Reconcile - should capture error details
_, err := reconciler.Reconcile(ctx, reconcile.Request{
NamespacedName: types.NamespacedName{Name: dgdrName, Namespace: namespace},
})
Expect(err).NotTo(HaveOccurred())
// Verify DGDR transitioned to Failed state
var updated nvidiacomv1alpha1.DynamoGraphDeploymentRequest
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: dgdrName, Namespace: namespace}, &updated)).Should(Succeed())
Expect(updated.Status.State).Should(Equal(StateFailed))
// Verify error condition contains detailed error
condition := meta.FindStatusCondition(updated.Status.Conditions, ConditionTypeProfiling)
Expect(condition).NotTo(BeNil())
Expect(condition.Status).Should(Equal(metav1.ConditionFalse))
Expect(condition.Message).Should(ContainSubstring("profiling job failed"))
})
})
})
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment