From d2e33f663d7093b8e33ec33684adac4564605bd0 Mon Sep 17 00:00:00 2001 From: Fangyin Cheng Date: Mon, 2 Feb 2026 19:58:09 +0800 Subject: [PATCH 1/3] fix: Fix volcano podgroup stuck in inqueue state after rayjob completes --- .../kuberay-operator/templates/_helpers.tpl | 9 + .../ray/batchscheduler/interface/interface.go | 9 + .../kai-scheduler/kai_scheduler.go | 5 + .../scheduler-plugins/scheduler_plugins.go | 5 + .../volcano/volcano_scheduler.go | 77 ++++++++ .../volcano/volcano_scheduler_test.go | 170 ++++++++++++++++++ .../yunikorn/yunikorn_scheduler.go | 5 + .../controllers/ray/rayjob_controller.go | 11 ++ 8 files changed, 291 insertions(+) diff --git a/helm-chart/kuberay-operator/templates/_helpers.tpl b/helm-chart/kuberay-operator/templates/_helpers.tpl index 0af59ec4d73..a0ccdaba8df 100644 --- a/helm-chart/kuberay-operator/templates/_helpers.tpl +++ b/helm-chart/kuberay-operator/templates/_helpers.tpl @@ -356,8 +356,17 @@ rules: - delete - get - list + - patch - update - watch +- apiGroups: + - scheduling.volcano.sh + resources: + - podgroups/status + verbs: + - get + - patch + - update {{- end -}} {{- if or .batchSchedulerEnabled (eq .batchSchedulerName "scheduler-plugins") }} - apiGroups: diff --git a/ray-operator/controllers/ray/batchscheduler/interface/interface.go b/ray-operator/controllers/ray/batchscheduler/interface/interface.go index 8f93a938a39..36f28e1e43e 100644 --- a/ray-operator/controllers/ray/batchscheduler/interface/interface.go +++ b/ray-operator/controllers/ray/batchscheduler/interface/interface.go @@ -23,6 +23,11 @@ type BatchScheduler interface { // AddMetadataToChildResource enriches the child resource (batchv1.Job, rayv1.RayCluster) with metadata necessary to tie it to the scheduler. // For example, setting labels for queues / priority, and setting schedulerName. AddMetadataToChildResource(ctx context.Context, parent metav1.Object, child metav1.Object, groupName string) + + // CleanupOnCompletion handles cleanup when the RayJob reaches terminal state (Complete/Failed). + // For batch schedulers like Volcano, this deletes the PodGroup to release queue resources. + // This is a no-op for schedulers that don't need cleanup. + CleanupOnCompletion(ctx context.Context, object metav1.Object) error } // BatchSchedulerFactory handles initial setup of the scheduler plugin by registering the @@ -58,6 +63,10 @@ func (d *DefaultBatchScheduler) DoBatchSchedulingOnSubmission(_ context.Context, func (d *DefaultBatchScheduler) AddMetadataToChildResource(_ context.Context, _ metav1.Object, _ metav1.Object, _ string) { } +func (d *DefaultBatchScheduler) CleanupOnCompletion(_ context.Context, _ metav1.Object) error { + return nil +} + func (df *DefaultBatchSchedulerFactory) New(_ context.Context, _ *rest.Config, _ client.Client) (BatchScheduler, error) { return &DefaultBatchScheduler{}, nil } diff --git a/ray-operator/controllers/ray/batchscheduler/kai-scheduler/kai_scheduler.go b/ray-operator/controllers/ray/batchscheduler/kai-scheduler/kai_scheduler.go index befa61a727e..0c1e4f63ddd 100644 --- a/ray-operator/controllers/ray/batchscheduler/kai-scheduler/kai_scheduler.go +++ b/ray-operator/controllers/ray/batchscheduler/kai-scheduler/kai_scheduler.go @@ -63,6 +63,11 @@ func (k *KaiScheduler) AddMetadataToChildResource(ctx context.Context, parent me child.SetLabels(childLabels) } +func (k *KaiScheduler) CleanupOnCompletion(_ context.Context, _ metav1.Object) error { + // KaiScheduler doesn't need cleanup + return nil +} + func (kf *KaiSchedulerFactory) New(_ context.Context, _ *rest.Config, _ client.Client) (schedulerinterface.BatchScheduler, error) { return &KaiScheduler{}, nil } diff --git a/ray-operator/controllers/ray/batchscheduler/scheduler-plugins/scheduler_plugins.go b/ray-operator/controllers/ray/batchscheduler/scheduler-plugins/scheduler_plugins.go index 449c9d5dbcb..96f74028a39 100644 --- a/ray-operator/controllers/ray/batchscheduler/scheduler-plugins/scheduler_plugins.go +++ b/ray-operator/controllers/ray/batchscheduler/scheduler-plugins/scheduler_plugins.go @@ -113,6 +113,11 @@ func (k *KubeScheduler) isGangSchedulingEnabled(obj metav1.Object) bool { return exist } +func (k *KubeScheduler) CleanupOnCompletion(_ context.Context, _ metav1.Object) error { + // KubeScheduler doesn't need cleanup + return nil +} + func (kf *KubeSchedulerFactory) New(_ context.Context, _ *rest.Config, cli client.Client) (schedulerinterface.BatchScheduler, error) { if err := v1alpha1.AddToScheme(cli.Scheme()); err != nil { return nil, err diff --git a/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go b/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go index 8f29864c266..5a6f53445f4 100644 --- a/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go +++ b/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go @@ -252,6 +252,83 @@ func (v *VolcanoBatchScheduler) AddMetadataToChildResource(_ context.Context, pa addSchedulerName(child, v.Name()) } +// CleanupOnCompletion deletes the PodGroup when RayJob finishes. +// This is called when the RayJob reaches terminal state (Complete/Failed). +// +// Why delete instead of marking as Completed? +// +// The Volcano scheduler runs a continuous control loop that recalculates and updates +// PodGroup status in every scheduling cycle (see volcano/pkg/scheduler/framework/job_updater.go:116). +// The status calculation logic (getPodGroupPhase in session.go:614) works as follows: +// +// 1. If scheduled pods < MinMember → return Pending +// 2. If scheduled pods >= MinMember and all completed → return Completed +// 3. If scheduled pods >= MinMember and some running → return Running +// 4. If current status is Inqueue → return Inqueue (preserve it) +// 5. Otherwise → return Pending +// +// When RayJob finishes and RayCluster is deleted: +// - All pods are deleted, so scheduled = 0 +// - Since scheduled (0) < MinMember (e.g., 3), condition #1 applies +// - The function returns Pending +// - The enqueue action then changes Pending to Inqueue (enqueue.go:97) +// +// Therefore, marking the PodGroup as Completed doesn't work because: +// 1. We set status to Completed +// 2. Volcano scheduler runs its next cycle +// 3. jobStatus() recalculates: "scheduled < MinMember, so status = Pending" +// 4. enqueue action sees Pending and changes it to Inqueue +// 5. User sees PodGroup stuck in Inqueue state, queue resources NOT released +// +// By deleting the PodGroup entirely: +// - Volcano scheduler can't find the PodGroup (NotFound) +// - Skips all status updates for this PodGroup +// - Queue resources are immediately and permanently released +// +// See: https://github.com/volcano-sh/volcano/blob/master/pkg/scheduler/framework/job_updater.go +// +// https://github.com/volcano-sh/volcano/blob/master/pkg/scheduler/framework/session.go +func (v *VolcanoBatchScheduler) CleanupOnCompletion(ctx context.Context, object metav1.Object) error { + logger := ctrl.LoggerFrom(ctx).WithName(pluginName) + + // Only handle RayJob. RayCluster PodGroups will be cleaned up via OwnerReference + rayJob, ok := object.(*rayv1.RayJob) + if !ok { + return nil + } + + podGroupName := getAppPodGroupName(rayJob) + podGroup := volcanoschedulingv1beta1.PodGroup{} + + if err := v.cli.Get(ctx, types.NamespacedName{ + Namespace: rayJob.Namespace, + Name: podGroupName, + }, &podGroup); err != nil { + if errors.IsNotFound(err) { + logger.Info("PodGroup not found, already deleted", "podGroupName", podGroupName) + return nil + } + logger.Error(err, "failed to get PodGroup", "podGroupName", podGroupName) + return err + } + + // Delete the PodGroup to immediately release queue resources + // We use client.Delete (not background) to ensure proper cleanup + if err := v.cli.Delete(ctx, &podGroup); err != nil { + // If already deleted, that's fine + if errors.IsNotFound(err) { + logger.Info("PodGroup already deleted", "podGroupName", podGroupName) + return nil + } + logger.Error(err, "failed to delete PodGroup", "podGroupName", podGroupName) + return err + } + + logger.Info("PodGroup deleted to release queue resources", "podGroupName", podGroupName, + "minMember", podGroup.Spec.MinMember) + return nil +} + func (vf *VolcanoBatchSchedulerFactory) New(_ context.Context, _ *rest.Config, cli client.Client) (schedulerinterface.BatchScheduler, error) { if err := volcanoschedulingv1beta1.AddToScheme(cli.Scheme()); err != nil { return nil, fmt.Errorf("failed to add volcano to scheme with error %w", err) diff --git a/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler_test.go b/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler_test.go index f7c1cadda87..875990b959c 100644 --- a/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler_test.go +++ b/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -512,3 +513,172 @@ func TestGetAppPodGroupName(t *testing.T) { rayJob := createTestRayJob(1) a.Equal("ray-rayjob-sample-pg", getAppPodGroupName(&rayJob)) } + +func TestCleanupOnCompletion(t *testing.T) { + a := assert.New(t) + require := require.New(t) + + t.Run("RayJob - delete PodGroup", func(_ *testing.T) { + rayJob := createTestRayJob(1) + scheme := runtime.NewScheme() + a.NoError(rayv1.AddToScheme(scheme)) + a.NoError(volcanoschedulingv1beta1.AddToScheme(scheme)) + fakeCli := fake.NewClientBuilder().WithScheme(scheme).WithStatusSubresource(&volcanoschedulingv1beta1.PodGroup{}).Build() + scheduler := &VolcanoBatchScheduler{cli: fakeCli} + + ctx := context.Background() + + // Manually create a PodGroup in Pending state to simulate the real scenario + podGroupName := getAppPodGroupName(&rayJob) + pg := &volcanoschedulingv1beta1.PodGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: podGroupName, + Namespace: rayJob.Namespace, + }, + Spec: volcanoschedulingv1beta1.PodGroupSpec{ + MinMember: 3, + }, + Status: volcanoschedulingv1beta1.PodGroupStatus{ + Phase: volcanoschedulingv1beta1.PodGroupPending, + Running: 1, + }, + } + err := fakeCli.Create(ctx, pg) + require.NoError(err) + + // Verify PodGroup was created in Pending state + var retrievedPg volcanoschedulingv1beta1.PodGroup + err = fakeCli.Get(ctx, client.ObjectKey{Namespace: rayJob.Namespace, Name: podGroupName}, &retrievedPg) + require.NoError(err) + a.Equal(volcanoschedulingv1beta1.PodGroupPending, retrievedPg.Status.Phase) + + // Now call CleanupOnCompletion to simulate RayJob finishing + err = scheduler.CleanupOnCompletion(ctx, &rayJob) + require.NoError(err) + + // Verify PodGroup was deleted + err = fakeCli.Get(ctx, client.ObjectKey{Namespace: rayJob.Namespace, Name: podGroupName}, &retrievedPg) + require.Error(err) + a.True(errors.IsNotFound(err)) + }) + + t.Run("RayJob - PodGroup not found (already deleted)", func(_ *testing.T) { + rayJob := createTestRayJob(1) + scheme := runtime.NewScheme() + a.NoError(rayv1.AddToScheme(scheme)) + a.NoError(volcanoschedulingv1beta1.AddToScheme(scheme)) + fakeCli := fake.NewClientBuilder().WithScheme(scheme).Build() + scheduler := &VolcanoBatchScheduler{cli: fakeCli} + + ctx := context.Background() + + // Don't create a PodGroup, just call CleanupOnCompletion + err := scheduler.CleanupOnCompletion(ctx, &rayJob) + // Should not return an error, just log that PodGroup was not found + require.NoError(err) + }) + + t.Run("RayCluster - should be no-op", func(_ *testing.T) { + rayCluster := createTestRayCluster(1) + scheme := runtime.NewScheme() + a.NoError(rayv1.AddToScheme(scheme)) + a.NoError(volcanoschedulingv1beta1.AddToScheme(scheme)) + fakeCli := fake.NewClientBuilder().WithScheme(scheme).Build() + scheduler := &VolcanoBatchScheduler{cli: fakeCli} + + ctx := context.Background() + + // Call CleanupOnCompletion with RayCluster - should be no-op + err := scheduler.CleanupOnCompletion(ctx, &rayCluster) + require.NoError(err) + + // Verify no PodGroup was created (RayCluster PodGroups are not managed by this method) + var pg volcanoschedulingv1beta1.PodGroup + err = fakeCli.Get(ctx, client.ObjectKey{Namespace: rayCluster.Namespace, Name: getAppPodGroupName(&rayCluster)}, &pg) + require.Error(err) // Should not be found + a.True(errors.IsNotFound(err)) + }) + + t.Run("RayJob - PodGroup in Inqueue state (bug scenario)", func(_ *testing.T) { + rayJob := createTestRayJob(1) + scheme := runtime.NewScheme() + a.NoError(rayv1.AddToScheme(scheme)) + a.NoError(volcanoschedulingv1beta1.AddToScheme(scheme)) + fakeCli := fake.NewClientBuilder().WithScheme(scheme).WithStatusSubresource(&volcanoschedulingv1beta1.PodGroup{}).Build() + scheduler := &VolcanoBatchScheduler{cli: fakeCli} + + ctx := context.Background() + + // Create a PodGroup in Inqueue state to simulate the bug scenario + podGroupName := getAppPodGroupName(&rayJob) + pg := &volcanoschedulingv1beta1.PodGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: podGroupName, + Namespace: rayJob.Namespace, + }, + Spec: volcanoschedulingv1beta1.PodGroupSpec{ + MinMember: 31, + }, + Status: volcanoschedulingv1beta1.PodGroupStatus{ + Phase: volcanoschedulingv1beta1.PodGroupInqueue, + Running: 0, + Succeeded: 1, + }, + } + err := fakeCli.Create(ctx, pg) + require.NoError(err) + + // Call CleanupOnCompletion + err = scheduler.CleanupOnCompletion(ctx, &rayJob) + require.NoError(err) + + // Verify PodGroup was deleted + var retrievedPg volcanoschedulingv1beta1.PodGroup + err = fakeCli.Get(ctx, client.ObjectKey{Namespace: rayJob.Namespace, Name: podGroupName}, &retrievedPg) + require.Error(err) + a.True(errors.IsNotFound(err)) + }) + + t.Run("RayJob - idempotent (can call multiple times)", func(_ *testing.T) { + rayJob := createTestRayJob(1) + scheme := runtime.NewScheme() + a.NoError(rayv1.AddToScheme(scheme)) + a.NoError(volcanoschedulingv1beta1.AddToScheme(scheme)) + fakeCli := fake.NewClientBuilder().WithScheme(scheme).WithStatusSubresource(&volcanoschedulingv1beta1.PodGroup{}).Build() + scheduler := &VolcanoBatchScheduler{cli: fakeCli} + + ctx := context.Background() + + // Create a PodGroup + podGroupName := getAppPodGroupName(&rayJob) + pg := &volcanoschedulingv1beta1.PodGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: podGroupName, + Namespace: rayJob.Namespace, + }, + Spec: volcanoschedulingv1beta1.PodGroupSpec{ + MinMember: 3, + }, + Status: volcanoschedulingv1beta1.PodGroupStatus{ + Phase: volcanoschedulingv1beta1.PodGroupPending, + Running: 1, + }, + } + err := fakeCli.Create(ctx, pg) + require.NoError(err) + + // Call CleanupOnCompletion first time + err = scheduler.CleanupOnCompletion(ctx, &rayJob) + require.NoError(err) + + // Verify PodGroup was deleted + var retrievedPg volcanoschedulingv1beta1.PodGroup + err = fakeCli.Get(ctx, client.ObjectKey{Namespace: rayJob.Namespace, Name: podGroupName}, &retrievedPg) + require.Error(err) + a.True(errors.IsNotFound(err)) + + // Call CleanupOnCompletion second time - should not error + err = scheduler.CleanupOnCompletion(ctx, &rayJob) + require.NoError(err) + }) +} diff --git a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go index 9eb7e31b07c..5d9fd5ab129 100644 --- a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go +++ b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go @@ -136,6 +136,11 @@ func (y *YuniKornScheduler) AddMetadataToChildResource(ctx context.Context, pare } } +func (y *YuniKornScheduler) CleanupOnCompletion(_ context.Context, _ metav1.Object) error { + // YuniKorn doesn't need cleanup + return nil +} + func (yf *YuniKornSchedulerFactory) New(_ context.Context, _ *rest.Config, _ client.Client) (schedulerinterface.BatchScheduler, error) { return &YuniKornScheduler{}, nil } diff --git a/ray-operator/controllers/ray/rayjob_controller.go b/ray-operator/controllers/ray/rayjob_controller.go index b5492f93115..a91a8dcc1a9 100644 --- a/ray-operator/controllers/ray/rayjob_controller.go +++ b/ray-operator/controllers/ray/rayjob_controller.go @@ -408,6 +408,17 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request) // The RayJob is already suspended, we should not requeue it. return ctrl.Result{}, nil case rayv1.JobDeploymentStatusComplete, rayv1.JobDeploymentStatusFailed: + // Clean up batch scheduler resources (e.g., delete Volcano PodGroup) + // This should be done before other deletion logic to ensure proper resource cleanup + if r.options.BatchSchedulerManager != nil { + if scheduler, err := r.options.BatchSchedulerManager.GetScheduler(); err == nil { + if err := scheduler.CleanupOnCompletion(ctx, rayJobInstance); err != nil { + logger.Error(err, "Failed to cleanup batch scheduler resources") + // Don't block the reconciliation on cleanup failures, just log the error + } + } + } + // The RayJob has reached a terminal state. Handle the cleanup and deletion logic. // If the RayJob uses an existing RayCluster, we must not delete it. if len(rayJobInstance.Spec.ClusterSelector) > 0 { From d968b90a68fd01c7f9c501bb8aed872a9968cc15 Mon Sep 17 00:00:00 2001 From: Fangyin Cheng Date: Mon, 2 Feb 2026 20:33:31 +0800 Subject: [PATCH 2/3] fix: fix rayjob controller error handle --- ray-operator/controllers/ray/rayjob_controller.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/ray-operator/controllers/ray/rayjob_controller.go b/ray-operator/controllers/ray/rayjob_controller.go index a91a8dcc1a9..acffca159ed 100644 --- a/ray-operator/controllers/ray/rayjob_controller.go +++ b/ray-operator/controllers/ray/rayjob_controller.go @@ -411,7 +411,11 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request) // Clean up batch scheduler resources (e.g., delete Volcano PodGroup) // This should be done before other deletion logic to ensure proper resource cleanup if r.options.BatchSchedulerManager != nil { - if scheduler, err := r.options.BatchSchedulerManager.GetScheduler(); err == nil { + scheduler, err := r.options.BatchSchedulerManager.GetScheduler() + if err != nil { + logger.Error(err, "Failed to get batch scheduler") + // Don't block the reconciliation on scheduler errors, just log the error + } else { if err := scheduler.CleanupOnCompletion(ctx, rayJobInstance); err != nil { logger.Error(err, "Failed to cleanup batch scheduler resources") // Don't block the reconciliation on cleanup failures, just log the error From cc26203709628612c5304bf2c78d13dd9639fedd Mon Sep 17 00:00:00 2001 From: Fangyin Cheng Date: Wed, 4 Feb 2026 05:15:00 +0800 Subject: [PATCH 3/3] chore: remove unused permissions --- helm-chart/kuberay-operator/templates/_helpers.tpl | 9 --------- 1 file changed, 9 deletions(-) diff --git a/helm-chart/kuberay-operator/templates/_helpers.tpl b/helm-chart/kuberay-operator/templates/_helpers.tpl index a0ccdaba8df..0af59ec4d73 100644 --- a/helm-chart/kuberay-operator/templates/_helpers.tpl +++ b/helm-chart/kuberay-operator/templates/_helpers.tpl @@ -356,17 +356,8 @@ rules: - delete - get - list - - patch - update - watch -- apiGroups: - - scheduling.volcano.sh - resources: - - podgroups/status - verbs: - - get - - patch - - update {{- end -}} {{- if or .batchSchedulerEnabled (eq .batchSchedulerName "scheduler-plugins") }} - apiGroups: