diff --git a/ray-operator/controllers/ray/batchscheduler/interface/interface.go b/ray-operator/controllers/ray/batchscheduler/interface/interface.go index 8f93a938a39..36f28e1e43e 100644 --- a/ray-operator/controllers/ray/batchscheduler/interface/interface.go +++ b/ray-operator/controllers/ray/batchscheduler/interface/interface.go @@ -23,6 +23,11 @@ type BatchScheduler interface { // AddMetadataToChildResource enriches the child resource (batchv1.Job, rayv1.RayCluster) with metadata necessary to tie it to the scheduler. // For example, setting labels for queues / priority, and setting schedulerName. AddMetadataToChildResource(ctx context.Context, parent metav1.Object, child metav1.Object, groupName string) + + // CleanupOnCompletion handles cleanup when the RayJob reaches terminal state (Complete/Failed). + // For batch schedulers like Volcano, this deletes the PodGroup to release queue resources. + // This is a no-op for schedulers that don't need cleanup. + CleanupOnCompletion(ctx context.Context, object metav1.Object) error } // BatchSchedulerFactory handles initial setup of the scheduler plugin by registering the @@ -58,6 +63,10 @@ func (d *DefaultBatchScheduler) DoBatchSchedulingOnSubmission(_ context.Context, func (d *DefaultBatchScheduler) AddMetadataToChildResource(_ context.Context, _ metav1.Object, _ metav1.Object, _ string) { } +func (d *DefaultBatchScheduler) CleanupOnCompletion(_ context.Context, _ metav1.Object) error { + return nil +} + func (df *DefaultBatchSchedulerFactory) New(_ context.Context, _ *rest.Config, _ client.Client) (BatchScheduler, error) { return &DefaultBatchScheduler{}, nil } diff --git a/ray-operator/controllers/ray/batchscheduler/kai-scheduler/kai_scheduler.go b/ray-operator/controllers/ray/batchscheduler/kai-scheduler/kai_scheduler.go index befa61a727e..0c1e4f63ddd 100644 --- a/ray-operator/controllers/ray/batchscheduler/kai-scheduler/kai_scheduler.go +++ b/ray-operator/controllers/ray/batchscheduler/kai-scheduler/kai_scheduler.go @@ -63,6 +63,11 @@ func (k *KaiScheduler) AddMetadataToChildResource(ctx context.Context, parent me child.SetLabels(childLabels) } +func (k *KaiScheduler) CleanupOnCompletion(_ context.Context, _ metav1.Object) error { + // KaiScheduler doesn't need cleanup + return nil +} + func (kf *KaiSchedulerFactory) New(_ context.Context, _ *rest.Config, _ client.Client) (schedulerinterface.BatchScheduler, error) { return &KaiScheduler{}, nil } diff --git a/ray-operator/controllers/ray/batchscheduler/scheduler-plugins/scheduler_plugins.go b/ray-operator/controllers/ray/batchscheduler/scheduler-plugins/scheduler_plugins.go index 449c9d5dbcb..96f74028a39 100644 --- a/ray-operator/controllers/ray/batchscheduler/scheduler-plugins/scheduler_plugins.go +++ b/ray-operator/controllers/ray/batchscheduler/scheduler-plugins/scheduler_plugins.go @@ -113,6 +113,11 @@ func (k *KubeScheduler) isGangSchedulingEnabled(obj metav1.Object) bool { return exist } +func (k *KubeScheduler) CleanupOnCompletion(_ context.Context, _ metav1.Object) error { + // KubeScheduler doesn't need cleanup + return nil +} + func (kf *KubeSchedulerFactory) New(_ context.Context, _ *rest.Config, cli client.Client) (schedulerinterface.BatchScheduler, error) { if err := v1alpha1.AddToScheme(cli.Scheme()); err != nil { return nil, err diff --git a/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go b/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go index 8f29864c266..5a6f53445f4 100644 --- a/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go +++ b/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go @@ -252,6 +252,83 @@ func (v *VolcanoBatchScheduler) AddMetadataToChildResource(_ context.Context, pa addSchedulerName(child, v.Name()) } +// CleanupOnCompletion deletes the PodGroup when RayJob finishes. +// This is called when the RayJob reaches terminal state (Complete/Failed). +// +// Why delete instead of marking as Completed? +// +// The Volcano scheduler runs a continuous control loop that recalculates and updates +// PodGroup status in every scheduling cycle (see volcano/pkg/scheduler/framework/job_updater.go:116). +// The status calculation logic (getPodGroupPhase in session.go:614) works as follows: +// +// 1. If scheduled pods < MinMember → return Pending +// 2. If scheduled pods >= MinMember and all completed → return Completed +// 3. If scheduled pods >= MinMember and some running → return Running +// 4. If current status is Inqueue → return Inqueue (preserve it) +// 5. Otherwise → return Pending +// +// When RayJob finishes and RayCluster is deleted: +// - All pods are deleted, so scheduled = 0 +// - Since scheduled (0) < MinMember (e.g., 3), condition #1 applies +// - The function returns Pending +// - The enqueue action then changes Pending to Inqueue (enqueue.go:97) +// +// Therefore, marking the PodGroup as Completed doesn't work because: +// 1. We set status to Completed +// 2. Volcano scheduler runs its next cycle +// 3. jobStatus() recalculates: "scheduled < MinMember, so status = Pending" +// 4. enqueue action sees Pending and changes it to Inqueue +// 5. User sees PodGroup stuck in Inqueue state, queue resources NOT released +// +// By deleting the PodGroup entirely: +// - Volcano scheduler can't find the PodGroup (NotFound) +// - Skips all status updates for this PodGroup +// - Queue resources are immediately and permanently released +// +// See: https://github.com/volcano-sh/volcano/blob/master/pkg/scheduler/framework/job_updater.go +// +// https://github.com/volcano-sh/volcano/blob/master/pkg/scheduler/framework/session.go +func (v *VolcanoBatchScheduler) CleanupOnCompletion(ctx context.Context, object metav1.Object) error { + logger := ctrl.LoggerFrom(ctx).WithName(pluginName) + + // Only handle RayJob. RayCluster PodGroups will be cleaned up via OwnerReference + rayJob, ok := object.(*rayv1.RayJob) + if !ok { + return nil + } + + podGroupName := getAppPodGroupName(rayJob) + podGroup := volcanoschedulingv1beta1.PodGroup{} + + if err := v.cli.Get(ctx, types.NamespacedName{ + Namespace: rayJob.Namespace, + Name: podGroupName, + }, &podGroup); err != nil { + if errors.IsNotFound(err) { + logger.Info("PodGroup not found, already deleted", "podGroupName", podGroupName) + return nil + } + logger.Error(err, "failed to get PodGroup", "podGroupName", podGroupName) + return err + } + + // Delete the PodGroup to immediately release queue resources + // We use client.Delete (not background) to ensure proper cleanup + if err := v.cli.Delete(ctx, &podGroup); err != nil { + // If already deleted, that's fine + if errors.IsNotFound(err) { + logger.Info("PodGroup already deleted", "podGroupName", podGroupName) + return nil + } + logger.Error(err, "failed to delete PodGroup", "podGroupName", podGroupName) + return err + } + + logger.Info("PodGroup deleted to release queue resources", "podGroupName", podGroupName, + "minMember", podGroup.Spec.MinMember) + return nil +} + func (vf *VolcanoBatchSchedulerFactory) New(_ context.Context, _ *rest.Config, cli client.Client) (schedulerinterface.BatchScheduler, error) { if err := volcanoschedulingv1beta1.AddToScheme(cli.Scheme()); err != nil { return nil, fmt.Errorf("failed to add volcano to scheme with error %w", err) diff --git a/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler_test.go b/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler_test.go index f7c1cadda87..875990b959c 100644 --- a/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler_test.go +++ b/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -512,3 +513,172 @@ func TestGetAppPodGroupName(t *testing.T) { rayJob := createTestRayJob(1) a.Equal("ray-rayjob-sample-pg", getAppPodGroupName(&rayJob)) } + +func TestCleanupOnCompletion(t *testing.T) { + a := assert.New(t) + require := require.New(t) + + t.Run("RayJob - delete PodGroup", func(_ *testing.T) { + rayJob := createTestRayJob(1) + scheme := runtime.NewScheme() + a.NoError(rayv1.AddToScheme(scheme)) + a.NoError(volcanoschedulingv1beta1.AddToScheme(scheme)) + fakeCli := fake.NewClientBuilder().WithScheme(scheme).WithStatusSubresource(&volcanoschedulingv1beta1.PodGroup{}).Build() + scheduler := &VolcanoBatchScheduler{cli: fakeCli} + + ctx := context.Background() + + // Manually create a PodGroup in Pending state to simulate the real scenario + podGroupName := getAppPodGroupName(&rayJob) + pg := &volcanoschedulingv1beta1.PodGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: podGroupName, + Namespace: rayJob.Namespace, + }, + Spec: volcanoschedulingv1beta1.PodGroupSpec{ + MinMember: 3, + }, + Status: volcanoschedulingv1beta1.PodGroupStatus{ + Phase: volcanoschedulingv1beta1.PodGroupPending, + Running: 1, + }, + } + err := fakeCli.Create(ctx, pg) + require.NoError(err) + + // Verify PodGroup was created in Pending state + var retrievedPg volcanoschedulingv1beta1.PodGroup + err = fakeCli.Get(ctx, client.ObjectKey{Namespace: rayJob.Namespace, Name: podGroupName}, &retrievedPg) + require.NoError(err) + a.Equal(volcanoschedulingv1beta1.PodGroupPending, retrievedPg.Status.Phase) + + // Now call CleanupOnCompletion to simulate RayJob finishing + err = scheduler.CleanupOnCompletion(ctx, &rayJob) + require.NoError(err) + + // Verify PodGroup was deleted + err = fakeCli.Get(ctx, client.ObjectKey{Namespace: rayJob.Namespace, Name: podGroupName}, &retrievedPg) + require.Error(err) + a.True(errors.IsNotFound(err)) + }) + + t.Run("RayJob - PodGroup not found (already deleted)", func(_ *testing.T) { + rayJob := createTestRayJob(1) + scheme := runtime.NewScheme() + a.NoError(rayv1.AddToScheme(scheme)) + a.NoError(volcanoschedulingv1beta1.AddToScheme(scheme)) + fakeCli := fake.NewClientBuilder().WithScheme(scheme).Build() + scheduler := &VolcanoBatchScheduler{cli: fakeCli} + + ctx := context.Background() + + // Don't create a PodGroup, just call CleanupOnCompletion + err := scheduler.CleanupOnCompletion(ctx, &rayJob) + // Should not return an error, just log that PodGroup was not found + require.NoError(err) + }) + + t.Run("RayCluster - should be no-op", func(_ *testing.T) { + rayCluster := createTestRayCluster(1) + scheme := runtime.NewScheme() + a.NoError(rayv1.AddToScheme(scheme)) + a.NoError(volcanoschedulingv1beta1.AddToScheme(scheme)) + fakeCli := fake.NewClientBuilder().WithScheme(scheme).Build() + scheduler := &VolcanoBatchScheduler{cli: fakeCli} + + ctx := context.Background() + + // Call CleanupOnCompletion with RayCluster - should be no-op + err := scheduler.CleanupOnCompletion(ctx, &rayCluster) + require.NoError(err) + + // Verify no PodGroup was created (RayCluster PodGroups are not managed by this method) + var pg volcanoschedulingv1beta1.PodGroup + err = fakeCli.Get(ctx, client.ObjectKey{Namespace: rayCluster.Namespace, Name: getAppPodGroupName(&rayCluster)}, &pg) + require.Error(err) // Should not be found + a.True(errors.IsNotFound(err)) + }) + + t.Run("RayJob - PodGroup in Inqueue state (bug scenario)", func(_ *testing.T) { + rayJob := createTestRayJob(1) + scheme := runtime.NewScheme() + a.NoError(rayv1.AddToScheme(scheme)) + a.NoError(volcanoschedulingv1beta1.AddToScheme(scheme)) + fakeCli := fake.NewClientBuilder().WithScheme(scheme).WithStatusSubresource(&volcanoschedulingv1beta1.PodGroup{}).Build() + scheduler := &VolcanoBatchScheduler{cli: fakeCli} + + ctx := context.Background() + + // Create a PodGroup in Inqueue state to simulate the bug scenario + podGroupName := getAppPodGroupName(&rayJob) + pg := &volcanoschedulingv1beta1.PodGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: podGroupName, + Namespace: rayJob.Namespace, + }, + Spec: volcanoschedulingv1beta1.PodGroupSpec{ + MinMember: 31, + }, + Status: volcanoschedulingv1beta1.PodGroupStatus{ + Phase: volcanoschedulingv1beta1.PodGroupInqueue, + Running: 0, + Succeeded: 1, + }, + } + err := fakeCli.Create(ctx, pg) + require.NoError(err) + + // Call CleanupOnCompletion + err = scheduler.CleanupOnCompletion(ctx, &rayJob) + require.NoError(err) + + // Verify PodGroup was deleted + var retrievedPg volcanoschedulingv1beta1.PodGroup + err = fakeCli.Get(ctx, client.ObjectKey{Namespace: rayJob.Namespace, Name: podGroupName}, &retrievedPg) + require.Error(err) + a.True(errors.IsNotFound(err)) + }) + + t.Run("RayJob - idempotent (can call multiple times)", func(_ *testing.T) { + rayJob := createTestRayJob(1) + scheme := runtime.NewScheme() + a.NoError(rayv1.AddToScheme(scheme)) + a.NoError(volcanoschedulingv1beta1.AddToScheme(scheme)) + fakeCli := fake.NewClientBuilder().WithScheme(scheme).WithStatusSubresource(&volcanoschedulingv1beta1.PodGroup{}).Build() + scheduler := &VolcanoBatchScheduler{cli: fakeCli} + + ctx := context.Background() + + // Create a PodGroup + podGroupName := getAppPodGroupName(&rayJob) + pg := &volcanoschedulingv1beta1.PodGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: podGroupName, + Namespace: rayJob.Namespace, + }, + Spec: volcanoschedulingv1beta1.PodGroupSpec{ + MinMember: 3, + }, + Status: volcanoschedulingv1beta1.PodGroupStatus{ + Phase: volcanoschedulingv1beta1.PodGroupPending, + Running: 1, + }, + } + err := fakeCli.Create(ctx, pg) + require.NoError(err) + + // Call CleanupOnCompletion first time + err = scheduler.CleanupOnCompletion(ctx, &rayJob) + require.NoError(err) + + // Verify PodGroup was deleted + var retrievedPg volcanoschedulingv1beta1.PodGroup + err = fakeCli.Get(ctx, client.ObjectKey{Namespace: rayJob.Namespace, Name: podGroupName}, &retrievedPg) + require.Error(err) + a.True(errors.IsNotFound(err)) + + // Call CleanupOnCompletion second time - should not error + err = scheduler.CleanupOnCompletion(ctx, &rayJob) + require.NoError(err) + }) +} diff --git a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go index 9eb7e31b07c..5d9fd5ab129 100644 --- a/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go +++ b/ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go @@ -136,6 +136,11 @@ func (y *YuniKornScheduler) AddMetadataToChildResource(ctx context.Context, pare } } +func (y *YuniKornScheduler) CleanupOnCompletion(_ context.Context, _ metav1.Object) error { + // YuniKorn doesn't need cleanup + return nil +} + func (yf *YuniKornSchedulerFactory) New(_ context.Context, _ *rest.Config, _ client.Client) (schedulerinterface.BatchScheduler, error) { return &YuniKornScheduler{}, nil } diff --git a/ray-operator/controllers/ray/rayjob_controller.go b/ray-operator/controllers/ray/rayjob_controller.go index b5492f93115..acffca159ed 100644 --- a/ray-operator/controllers/ray/rayjob_controller.go +++ b/ray-operator/controllers/ray/rayjob_controller.go @@ -408,6 +408,21 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request) // The RayJob is already suspended, we should not requeue it. return ctrl.Result{}, nil case rayv1.JobDeploymentStatusComplete, rayv1.JobDeploymentStatusFailed: + // Clean up batch scheduler resources (e.g., delete Volcano PodGroup) + // This should be done before other deletion logic to ensure proper resource cleanup + if r.options.BatchSchedulerManager != nil { + scheduler, err := r.options.BatchSchedulerManager.GetScheduler() + if err != nil { + logger.Error(err, "Failed to get batch scheduler") + // Don't block the reconciliation on scheduler errors, just log the error + } else { + if err := scheduler.CleanupOnCompletion(ctx, rayJobInstance); err != nil { + logger.Error(err, "Failed to cleanup batch scheduler resources") + // Don't block the reconciliation on cleanup failures, just log the error + } + } + } + // The RayJob has reached a terminal state. Handle the cleanup and deletion logic. // If the RayJob uses an existing RayCluster, we must not delete it. if len(rayJobInstance.Spec.ClusterSelector) > 0 {