Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ type BatchScheduler interface {
// AddMetadataToChildResource enriches the child resource (batchv1.Job, rayv1.RayCluster) with metadata necessary to tie it to the scheduler.
// For example, setting labels for queues / priority, and setting schedulerName.
AddMetadataToChildResource(ctx context.Context, parent metav1.Object, child metav1.Object, groupName string)

// CleanupOnCompletion handles cleanup when the RayJob reaches terminal state (Complete/Failed).
// For batch schedulers like Volcano, this deletes the PodGroup to release queue resources.
// This is a no-op for schedulers that don't need cleanup.
CleanupOnCompletion(ctx context.Context, object metav1.Object) error
}

// BatchSchedulerFactory handles initial setup of the scheduler plugin by registering the
Expand Down Expand Up @@ -58,6 +63,10 @@ func (d *DefaultBatchScheduler) DoBatchSchedulingOnSubmission(_ context.Context,
func (d *DefaultBatchScheduler) AddMetadataToChildResource(_ context.Context, _ metav1.Object, _ metav1.Object, _ string) {
}

func (d *DefaultBatchScheduler) CleanupOnCompletion(_ context.Context, _ metav1.Object) error {
return nil
}

func (df *DefaultBatchSchedulerFactory) New(_ context.Context, _ *rest.Config, _ client.Client) (BatchScheduler, error) {
return &DefaultBatchScheduler{}, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ func (k *KaiScheduler) AddMetadataToChildResource(ctx context.Context, parent me
child.SetLabels(childLabels)
}

func (k *KaiScheduler) CleanupOnCompletion(_ context.Context, _ metav1.Object) error {
// KaiScheduler doesn't need cleanup
return nil
}

func (kf *KaiSchedulerFactory) New(_ context.Context, _ *rest.Config, _ client.Client) (schedulerinterface.BatchScheduler, error) {
return &KaiScheduler{}, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ func (k *KubeScheduler) isGangSchedulingEnabled(obj metav1.Object) bool {
return exist
}

func (k *KubeScheduler) CleanupOnCompletion(_ context.Context, _ metav1.Object) error {
// KubeScheduler doesn't need cleanup
return nil
}

func (kf *KubeSchedulerFactory) New(_ context.Context, _ *rest.Config, cli client.Client) (schedulerinterface.BatchScheduler, error) {
if err := v1alpha1.AddToScheme(cli.Scheme()); err != nil {
return nil, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,83 @@ func (v *VolcanoBatchScheduler) AddMetadataToChildResource(_ context.Context, pa
addSchedulerName(child, v.Name())
}

// CleanupOnCompletion deletes the PodGroup when RayJob finishes.
// This is called when the RayJob reaches terminal state (Complete/Failed).
//
// Why delete instead of marking as Completed?
//
// The Volcano scheduler runs a continuous control loop that recalculates and updates
// PodGroup status in every scheduling cycle (see volcano/pkg/scheduler/framework/job_updater.go:116).
// The status calculation logic (getPodGroupPhase in session.go:614) works as follows:
//
// 1. If scheduled pods < MinMember → return Pending
// 2. If scheduled pods >= MinMember and all completed → return Completed
// 3. If scheduled pods >= MinMember and some running → return Running
// 4. If current status is Inqueue → return Inqueue (preserve it)
// 5. Otherwise → return Pending
//
// When RayJob finishes and RayCluster is deleted:
// - All pods are deleted, so scheduled = 0
// - Since scheduled (0) < MinMember (e.g., 3), condition #1 applies
// - The function returns Pending
// - The enqueue action then changes Pending to Inqueue (enqueue.go:97)
//
// Therefore, marking the PodGroup as Completed doesn't work because:
// 1. We set status to Completed
// 2. Volcano scheduler runs its next cycle
// 3. jobStatus() recalculates: "scheduled < MinMember, so status = Pending"
// 4. enqueue action sees Pending and changes it to Inqueue
// 5. User sees PodGroup stuck in Inqueue state, queue resources NOT released
//
// By deleting the PodGroup entirely:
// - Volcano scheduler can't find the PodGroup (NotFound)
// - Skips all status updates for this PodGroup
// - Queue resources are immediately and permanently released
//
// See: https://github.com/volcano-sh/volcano/blob/master/pkg/scheduler/framework/job_updater.go
//
// https://github.com/volcano-sh/volcano/blob/master/pkg/scheduler/framework/session.go
func (v *VolcanoBatchScheduler) CleanupOnCompletion(ctx context.Context, object metav1.Object) error {
logger := ctrl.LoggerFrom(ctx).WithName(pluginName)

// Only handle RayJob. RayCluster PodGroups will be cleaned up via OwnerReference
rayJob, ok := object.(*rayv1.RayJob)
if !ok {
return nil
}

podGroupName := getAppPodGroupName(rayJob)
podGroup := volcanoschedulingv1beta1.PodGroup{}

if err := v.cli.Get(ctx, types.NamespacedName{
Namespace: rayJob.Namespace,
Name: podGroupName,
}, &podGroup); err != nil {
if errors.IsNotFound(err) {
logger.Info("PodGroup not found, already deleted", "podGroupName", podGroupName)
return nil
}
logger.Error(err, "failed to get PodGroup", "podGroupName", podGroupName)
return err
}

// Delete the PodGroup to immediately release queue resources
// We use client.Delete (not background) to ensure proper cleanup
if err := v.cli.Delete(ctx, &podGroup); err != nil {
// If already deleted, that's fine
if errors.IsNotFound(err) {
logger.Info("PodGroup already deleted", "podGroupName", podGroupName)
return nil
}
logger.Error(err, "failed to delete PodGroup", "podGroupName", podGroupName)
return err
}

logger.Info("PodGroup deleted to release queue resources", "podGroupName", podGroupName,
"minMember", podGroup.Spec.MinMember)
return nil
}

func (vf *VolcanoBatchSchedulerFactory) New(_ context.Context, _ *rest.Config, cli client.Client) (schedulerinterface.BatchScheduler, error) {
if err := volcanoschedulingv1beta1.AddToScheme(cli.Scheme()); err != nil {
return nil, fmt.Errorf("failed to add volcano to scheme with error %w", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -512,3 +513,172 @@ func TestGetAppPodGroupName(t *testing.T) {
rayJob := createTestRayJob(1)
a.Equal("ray-rayjob-sample-pg", getAppPodGroupName(&rayJob))
}

func TestCleanupOnCompletion(t *testing.T) {
a := assert.New(t)
require := require.New(t)

t.Run("RayJob - delete PodGroup", func(_ *testing.T) {
rayJob := createTestRayJob(1)
scheme := runtime.NewScheme()
a.NoError(rayv1.AddToScheme(scheme))
a.NoError(volcanoschedulingv1beta1.AddToScheme(scheme))
fakeCli := fake.NewClientBuilder().WithScheme(scheme).WithStatusSubresource(&volcanoschedulingv1beta1.PodGroup{}).Build()
scheduler := &VolcanoBatchScheduler{cli: fakeCli}

ctx := context.Background()

// Manually create a PodGroup in Pending state to simulate the real scenario
podGroupName := getAppPodGroupName(&rayJob)
pg := &volcanoschedulingv1beta1.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Name: podGroupName,
Namespace: rayJob.Namespace,
},
Spec: volcanoschedulingv1beta1.PodGroupSpec{
MinMember: 3,
},
Status: volcanoschedulingv1beta1.PodGroupStatus{
Phase: volcanoschedulingv1beta1.PodGroupPending,
Running: 1,
},
}
err := fakeCli.Create(ctx, pg)
require.NoError(err)

// Verify PodGroup was created in Pending state
var retrievedPg volcanoschedulingv1beta1.PodGroup
err = fakeCli.Get(ctx, client.ObjectKey{Namespace: rayJob.Namespace, Name: podGroupName}, &retrievedPg)
require.NoError(err)
a.Equal(volcanoschedulingv1beta1.PodGroupPending, retrievedPg.Status.Phase)

// Now call CleanupOnCompletion to simulate RayJob finishing
err = scheduler.CleanupOnCompletion(ctx, &rayJob)
require.NoError(err)

// Verify PodGroup was deleted
err = fakeCli.Get(ctx, client.ObjectKey{Namespace: rayJob.Namespace, Name: podGroupName}, &retrievedPg)
require.Error(err)
a.True(errors.IsNotFound(err))
})

t.Run("RayJob - PodGroup not found (already deleted)", func(_ *testing.T) {
rayJob := createTestRayJob(1)
scheme := runtime.NewScheme()
a.NoError(rayv1.AddToScheme(scheme))
a.NoError(volcanoschedulingv1beta1.AddToScheme(scheme))
fakeCli := fake.NewClientBuilder().WithScheme(scheme).Build()
scheduler := &VolcanoBatchScheduler{cli: fakeCli}

ctx := context.Background()

// Don't create a PodGroup, just call CleanupOnCompletion
err := scheduler.CleanupOnCompletion(ctx, &rayJob)
// Should not return an error, just log that PodGroup was not found
require.NoError(err)
})

t.Run("RayCluster - should be no-op", func(_ *testing.T) {
rayCluster := createTestRayCluster(1)
scheme := runtime.NewScheme()
a.NoError(rayv1.AddToScheme(scheme))
a.NoError(volcanoschedulingv1beta1.AddToScheme(scheme))
fakeCli := fake.NewClientBuilder().WithScheme(scheme).Build()
scheduler := &VolcanoBatchScheduler{cli: fakeCli}

ctx := context.Background()

// Call CleanupOnCompletion with RayCluster - should be no-op
err := scheduler.CleanupOnCompletion(ctx, &rayCluster)
require.NoError(err)

// Verify no PodGroup was created (RayCluster PodGroups are not managed by this method)
var pg volcanoschedulingv1beta1.PodGroup
err = fakeCli.Get(ctx, client.ObjectKey{Namespace: rayCluster.Namespace, Name: getAppPodGroupName(&rayCluster)}, &pg)
require.Error(err) // Should not be found
a.True(errors.IsNotFound(err))
})

t.Run("RayJob - PodGroup in Inqueue state (bug scenario)", func(_ *testing.T) {
rayJob := createTestRayJob(1)
scheme := runtime.NewScheme()
a.NoError(rayv1.AddToScheme(scheme))
a.NoError(volcanoschedulingv1beta1.AddToScheme(scheme))
fakeCli := fake.NewClientBuilder().WithScheme(scheme).WithStatusSubresource(&volcanoschedulingv1beta1.PodGroup{}).Build()
scheduler := &VolcanoBatchScheduler{cli: fakeCli}

ctx := context.Background()

// Create a PodGroup in Inqueue state to simulate the bug scenario
podGroupName := getAppPodGroupName(&rayJob)
pg := &volcanoschedulingv1beta1.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Name: podGroupName,
Namespace: rayJob.Namespace,
},
Spec: volcanoschedulingv1beta1.PodGroupSpec{
MinMember: 31,
},
Status: volcanoschedulingv1beta1.PodGroupStatus{
Phase: volcanoschedulingv1beta1.PodGroupInqueue,
Running: 0,
Succeeded: 1,
},
}
err := fakeCli.Create(ctx, pg)
require.NoError(err)

// Call CleanupOnCompletion
err = scheduler.CleanupOnCompletion(ctx, &rayJob)
require.NoError(err)

// Verify PodGroup was deleted
var retrievedPg volcanoschedulingv1beta1.PodGroup
err = fakeCli.Get(ctx, client.ObjectKey{Namespace: rayJob.Namespace, Name: podGroupName}, &retrievedPg)
require.Error(err)
a.True(errors.IsNotFound(err))
})

t.Run("RayJob - idempotent (can call multiple times)", func(_ *testing.T) {
rayJob := createTestRayJob(1)
scheme := runtime.NewScheme()
a.NoError(rayv1.AddToScheme(scheme))
a.NoError(volcanoschedulingv1beta1.AddToScheme(scheme))
fakeCli := fake.NewClientBuilder().WithScheme(scheme).WithStatusSubresource(&volcanoschedulingv1beta1.PodGroup{}).Build()
scheduler := &VolcanoBatchScheduler{cli: fakeCli}

ctx := context.Background()

// Create a PodGroup
podGroupName := getAppPodGroupName(&rayJob)
pg := &volcanoschedulingv1beta1.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Name: podGroupName,
Namespace: rayJob.Namespace,
},
Spec: volcanoschedulingv1beta1.PodGroupSpec{
MinMember: 3,
},
Status: volcanoschedulingv1beta1.PodGroupStatus{
Phase: volcanoschedulingv1beta1.PodGroupPending,
Running: 1,
},
}
err := fakeCli.Create(ctx, pg)
require.NoError(err)

// Call CleanupOnCompletion first time
err = scheduler.CleanupOnCompletion(ctx, &rayJob)
require.NoError(err)

// Verify PodGroup was deleted
var retrievedPg volcanoschedulingv1beta1.PodGroup
err = fakeCli.Get(ctx, client.ObjectKey{Namespace: rayJob.Namespace, Name: podGroupName}, &retrievedPg)
require.Error(err)
a.True(errors.IsNotFound(err))

// Call CleanupOnCompletion second time - should not error
err = scheduler.CleanupOnCompletion(ctx, &rayJob)
require.NoError(err)
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ func (y *YuniKornScheduler) AddMetadataToChildResource(ctx context.Context, pare
}
}

func (y *YuniKornScheduler) CleanupOnCompletion(_ context.Context, _ metav1.Object) error {
// YuniKorn doesn't need cleanup
return nil
}

func (yf *YuniKornSchedulerFactory) New(_ context.Context, _ *rest.Config, _ client.Client) (schedulerinterface.BatchScheduler, error) {
return &YuniKornScheduler{}, nil
}
Expand Down
15 changes: 15 additions & 0 deletions ray-operator/controllers/ray/rayjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,21 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
// The RayJob is already suspended, we should not requeue it.
return ctrl.Result{}, nil
case rayv1.JobDeploymentStatusComplete, rayv1.JobDeploymentStatusFailed:
// Clean up batch scheduler resources (e.g., delete Volcano PodGroup)
// This should be done before other deletion logic to ensure proper resource cleanup
if r.options.BatchSchedulerManager != nil {
scheduler, err := r.options.BatchSchedulerManager.GetScheduler()
if err != nil {
logger.Error(err, "Failed to get batch scheduler")
// Don't block the reconciliation on scheduler errors, just log the error
} else {
if err := scheduler.CleanupOnCompletion(ctx, rayJobInstance); err != nil {
logger.Error(err, "Failed to cleanup batch scheduler resources")
// Don't block the reconciliation on cleanup failures, just log the error
}
}
}

// The RayJob has reached a terminal state. Handle the cleanup and deletion logic.
// If the RayJob uses an existing RayCluster, we must not delete it.
if len(rayJobInstance.Spec.ClusterSelector) > 0 {
Expand Down
Loading