Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 79 additions & 6 deletions controllers/evalhub/evaluation_failed_kueue_workloads_reconciler.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -36,6 +37,15 @@ const kueueWorkloadReasonInadmissible = "Inadmissible"
// failure on every reconcile.
const annotationKueueFailedWorkloadEventReported = "trustyai.opendatahub.io/evalhub-kueue-failed-workload-reported"

// messageCodeGPUUnavailable is sent to EvalHub when Kueue cannot admit a workload because GPU
// resources required by the adapter are not available in the requested queue.
const messageCodeGPUUnavailable = "gpu_unavailable"

// gpuResourceSuffixes are the trailing parts of Kubernetes extended resource names that identify
// GPU accelerators (e.g. "nvidia.com/gpu", "amd.com/gpu"). We match by suffix to avoid hard-coding
// vendor-specific resource names and to remain forward-compatible.
var gpuResourceSuffixes = []string{"/gpu", ".gpu"}

// evalHubEvaluationFailedKueueWorkloadsControllerName matches ctrl.NewControllerManagedBy(mgr).Named(...).
const evalHubEvaluationFailedKueueWorkloadsControllerName = "evalhub-evaluation-failed-kueue-workloads"

Expand Down Expand Up @@ -144,6 +154,71 @@ func jobOwnerFromWorkload(wl *kueue.Workload) (name string, uid types.UID, ok bo
return "", "", false
}

// jobRequestsGPU returns true if any container in the Job's pod template requests GPU resources.
func jobRequestsGPU(job *batchv1.Job) bool {
return podSpecRequestsGPU(&job.Spec.Template.Spec)
}

func podSpecRequestsGPU(spec *corev1.PodSpec) bool {
for _, c := range spec.InitContainers {
if containerRequestsGPU(c) {
return true
}
}
for _, c := range spec.Containers {
if containerRequestsGPU(c) {
return true
}
}
return false
}

func containerRequestsGPU(c corev1.Container) bool {
for name := range c.Resources.Requests {
if isGPUResource(string(name)) {
return true
}
}
for name := range c.Resources.Limits {
if isGPUResource(string(name)) {
return true
}
}
return false
}

func isGPUResource(name string) bool {
for _, suffix := range gpuResourceSuffixes {
if strings.HasSuffix(name, suffix) {
return true
}
}
return false
}

// kueueConditionMentionsGPU returns true when the Kueue inadmissible condition message references
// a GPU resource by name (e.g. "nvidia.com/gpu"). This lets us distinguish GPU-specific quota
// failures from other admission failures without parsing structured data out of free-form text.
func kueueConditionMentionsGPU(msg string) bool {
for _, suffix := range gpuResourceSuffixes {
if strings.Contains(msg, suffix) {
return true
}
}
return false
}

// classifyKueueAdmissionFailure analyses an inadmissible Kueue workload and the owning Job to
// produce a user-facing failure message and EvalHub message code. It intentionally avoids exposing
// internal cluster details (queue names, flavor names, raw quota numbers).
func classifyKueueAdmissionFailure(job *batchv1.Job, cond *metav1.Condition) (msg, messageCode string) {
if jobRequestsGPU(job) && kueueConditionMentionsGPU(cond.Message) {
return "GPU resources required by this evaluation are not currently available in the requested queue. The job will run when GPU capacity becomes available.", messageCodeGPUUnavailable
}
// Non-GPU or unrecognised failure: surface a generic queue-error without internal detail.
return "The evaluation job cannot be admitted to the requested queue. The job will run when sufficient resources become available.", messageCodeQueueError
}

func (r *EvalHubEvaluationFailedKueueWorkloadsReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := log.FromContext(ctx)
log.Info("reconcile start",
Expand Down Expand Up @@ -238,12 +313,9 @@ func (r *EvalHubEvaluationFailedKueueWorkloadsReconciler) Reconcile(ctx context.
return ctrl.Result{RequeueAfter: 30 * time.Second}, nil
}

msg := strings.TrimSpace(cond.Message)
if msg == "" {
msg = fmt.Sprintf("Kueue workload (conditionType=%s, reason=%s)", cond.Type, cond.Reason)
}
failureMsg, failureCode := classifyKueueAdmissionFailure(&job, cond)

if err := postEvalHubBenchmarkFailed(ctx, r.RESTConfig, baseURL, job.Namespace, jobID, providerID, benchmarkID, benchmarkIndex, msg, messageCodeQueueError); err != nil {
if err := postEvalHubBenchmarkFailed(ctx, r.RESTConfig, baseURL, job.Namespace, jobID, providerID, benchmarkID, benchmarkIndex, failureMsg, failureCode); err != nil {
log.Error(err, "failed to post EvalHub benchmark failure event for Kueue workload",
append(evaluationFailedKueueWorkloadsLogFields(), "action", "post_events_failed",
"workload", wl.Name, "workloadNamespace", wl.Namespace, "queue", wl.Spec.QueueName,
Expand All @@ -263,7 +335,8 @@ func (r *EvalHubEvaluationFailedKueueWorkloadsReconciler) Reconcile(ctx context.
"workload", wl.Name, "workloadNamespace", wl.Namespace, "queue", wl.Spec.QueueName,
"job", job.Name, "jobUid", string(job.UID),
"evalJobID", jobID, "providerID", providerID, "benchmarkID", benchmarkID,
"conditionType", cond.Type, "conditionReason", cond.Reason)...)
"conditionType", cond.Type, "conditionReason", cond.Reason,
"messageCode", failureCode)...)

return ctrl.Result{}, nil
}
Expand Down
131 changes: 131 additions & 0 deletions controllers/evalhub/evaluation_failed_kueue_workloads_reconciler_test.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
. "github.com/onsi/gomega"

batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
Expand Down Expand Up @@ -145,4 +147,133 @@ var _ = Describe("Kueue failed workload reconciler helpers", func() {
Expect(isEvalHubEvaluationJob(j)).To(BeFalse())
})
})

Describe("jobRequestsGPU", func() {
It("returns true when a container requests nvidia.com/gpu", func() {
j := gpuJob("nvidia.com/gpu", "1")
Expect(jobRequestsGPU(j)).To(BeTrue())
})

It("returns true when a container requests amd.com/gpu", func() {
j := gpuJob("amd.com/gpu", "1")
Expect(jobRequestsGPU(j)).To(BeTrue())
})

It("returns false when no GPU resources are requested", func() {
j := &batchv1.Job{
Spec: batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "adapter",
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("250m"),
corev1.ResourceMemory: resource.MustParse("512Mi"),
},
},
},
},
},
},
},
}
Expect(jobRequestsGPU(j)).To(BeFalse())
})

It("returns false for an empty job", func() {
Expect(jobRequestsGPU(&batchv1.Job{})).To(BeFalse())
})
})

Describe("kueueConditionMentionsGPU", func() {
It("returns true when message contains /gpu suffix", func() {
Expect(kueueConditionMentionsGPU("insufficient quota for nvidia.com/gpu in flavor default")).To(BeTrue())
})

It("returns true when message contains amd.com/gpu", func() {
Expect(kueueConditionMentionsGPU("insufficient quota for amd.com/gpu in cluster queue")).To(BeTrue())
})

It("returns false when message has no GPU reference", func() {
Expect(kueueConditionMentionsGPU("ClusterQueue foo is stopped")).To(BeFalse())
})

It("returns false for empty message", func() {
Expect(kueueConditionMentionsGPU("")).To(BeFalse())
})
})

Describe("classifyKueueAdmissionFailure", func() {
It("returns gpu_unavailable code when job requests GPU and condition mentions GPU", func() {
job := gpuJob("nvidia.com/gpu", "1")
cond := &metav1.Condition{
Type: "QuotaReserved",
Status: metav1.ConditionFalse,
Reason: "Inadmissible",
Message: "insufficient quota for nvidia.com/gpu in flavor default, requested: 1, used: 0, borrowable: 0",
}
msg, code := classifyKueueAdmissionFailure(job, cond)
Expect(code).To(Equal(messageCodeGPUUnavailable))
Expect(msg).NotTo(BeEmpty())
Expect(msg).NotTo(ContainSubstring("nvidia.com/gpu"))
Expect(msg).NotTo(ContainSubstring("flavor"))
})

It("returns queue_error code when job requests GPU but condition does not mention GPU", func() {
job := gpuJob("nvidia.com/gpu", "1")
cond := &metav1.Condition{
Type: "QuotaReserved",
Status: metav1.ConditionFalse,
Reason: "Inadmissible",
Message: "ClusterQueue foo is stopped",
}
_, code := classifyKueueAdmissionFailure(job, cond)
Expect(code).To(Equal(messageCodeQueueError))
})

It("returns queue_error code for a CPU-only job even if condition mentions GPU", func() {
job := &batchv1.Job{}
cond := &metav1.Condition{
Message: "insufficient quota for nvidia.com/gpu",
}
_, code := classifyKueueAdmissionFailure(job, cond)
Expect(code).To(Equal(messageCodeQueueError))
})

It("returns queue_error code for CPU-only job with non-GPU failure", func() {
job := &batchv1.Job{}
cond := &metav1.Condition{
Message: "insufficient quota for cpu",
}
_, code := classifyKueueAdmissionFailure(job, cond)
Expect(code).To(Equal(messageCodeQueueError))
})
})
})

// gpuJob builds a minimal batchv1.Job that requests the given GPU resource.
func gpuJob(resourceName, quantity string) *batchv1.Job {
return &batchv1.Job{
Spec: batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "adapter",
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceName(resourceName): resource.MustParse(quantity),
},
Limits: corev1.ResourceList{
corev1.ResourceName(resourceName): resource.MustParse(quantity),
},
},
},
},
},
},
},
}
}
Loading