Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions helm-chart/kuberay-operator/templates/_helpers.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -356,8 +356,17 @@ rules:
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- scheduling.volcano.sh
resources:
- podgroups/status
verbs:
- get
- patch
- update
{{- end -}}
{{- if or .batchSchedulerEnabled (eq .batchSchedulerName "scheduler-plugins") }}
- apiGroups:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ type BatchScheduler interface {
// AddMetadataToChildResource enriches the child resource (batchv1.Job, rayv1.RayCluster) with metadata necessary to tie it to the scheduler.
// For example, setting labels for queues / priority, and setting schedulerName.
AddMetadataToChildResource(ctx context.Context, parent metav1.Object, child metav1.Object, groupName string)

// CleanupOnCompletion handles cleanup when the RayJob reaches terminal state (Complete/Failed).
// For batch schedulers like Volcano, this deletes the PodGroup to release queue resources.
// This is a no-op for schedulers that don't need cleanup.
CleanupOnCompletion(ctx context.Context, object metav1.Object) error
}

// BatchSchedulerFactory handles initial setup of the scheduler plugin by registering the
Expand Down Expand Up @@ -58,6 +63,10 @@ func (d *DefaultBatchScheduler) DoBatchSchedulingOnSubmission(_ context.Context,
func (d *DefaultBatchScheduler) AddMetadataToChildResource(_ context.Context, _ metav1.Object, _ metav1.Object, _ string) {
}

func (d *DefaultBatchScheduler) CleanupOnCompletion(_ context.Context, _ metav1.Object) error {
return nil
}

func (df *DefaultBatchSchedulerFactory) New(_ context.Context, _ *rest.Config, _ client.Client) (BatchScheduler, error) {
return &DefaultBatchScheduler{}, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ func (k *KaiScheduler) AddMetadataToChildResource(ctx context.Context, parent me
child.SetLabels(childLabels)
}

func (k *KaiScheduler) CleanupOnCompletion(_ context.Context, _ metav1.Object) error {
// KaiScheduler doesn't need cleanup
return nil
}

func (kf *KaiSchedulerFactory) New(_ context.Context, _ *rest.Config, _ client.Client) (schedulerinterface.BatchScheduler, error) {
return &KaiScheduler{}, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ func (k *KubeScheduler) isGangSchedulingEnabled(obj metav1.Object) bool {
return exist
}

func (k *KubeScheduler) CleanupOnCompletion(_ context.Context, _ metav1.Object) error {
// KubeScheduler doesn't need cleanup
return nil
}

func (kf *KubeSchedulerFactory) New(_ context.Context, _ *rest.Config, cli client.Client) (schedulerinterface.BatchScheduler, error) {
if err := v1alpha1.AddToScheme(cli.Scheme()); err != nil {
return nil, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,83 @@ func (v *VolcanoBatchScheduler) AddMetadataToChildResource(_ context.Context, pa
addSchedulerName(child, v.Name())
}

// CleanupOnCompletion deletes the PodGroup when RayJob finishes.
// This is called when the RayJob reaches terminal state (Complete/Failed).
//
// Why delete instead of marking as Completed?
//
// The Volcano scheduler runs a continuous control loop that recalculates and updates
// PodGroup status in every scheduling cycle (see volcano/pkg/scheduler/framework/job_updater.go:116).
// The status calculation logic (getPodGroupPhase in session.go:614) works as follows:
//
// 1. If scheduled pods < MinMember → return Pending
// 2. If scheduled pods >= MinMember and all completed → return Completed
// 3. If scheduled pods >= MinMember and some running → return Running
// 4. If current status is Inqueue → return Inqueue (preserve it)
// 5. Otherwise → return Pending
//
// When RayJob finishes and RayCluster is deleted:
// - All pods are deleted, so scheduled = 0
// - Since scheduled (0) < MinMember (e.g., 3), condition #1 applies
// - The function returns Pending
// - The enqueue action then changes Pending to Inqueue (enqueue.go:97)
//
// Therefore, marking the PodGroup as Completed doesn't work because:
// 1. We set status to Completed
// 2. Volcano scheduler runs its next cycle
// 3. jobStatus() recalculates: "scheduled < MinMember, so status = Pending"
// 4. enqueue action sees Pending and changes it to Inqueue
// 5. User sees PodGroup stuck in Inqueue state, queue resources NOT released
//
// By deleting the PodGroup entirely:
// - Volcano scheduler can't find the PodGroup (NotFound)
// - Skips all status updates for this PodGroup
// - Queue resources are immediately and permanently released
//
// See: https://github.com/volcano-sh/volcano/blob/master/pkg/scheduler/framework/job_updater.go
//
// https://github.com/volcano-sh/volcano/blob/master/pkg/scheduler/framework/session.go
func (v *VolcanoBatchScheduler) CleanupOnCompletion(ctx context.Context, object metav1.Object) error {
logger := ctrl.LoggerFrom(ctx).WithName(pluginName)

// Only handle RayJob. RayCluster PodGroups will be cleaned up via OwnerReference
rayJob, ok := object.(*rayv1.RayJob)
if !ok {
return nil
}

podGroupName := getAppPodGroupName(rayJob)
podGroup := volcanoschedulingv1beta1.PodGroup{}

if err := v.cli.Get(ctx, types.NamespacedName{
Namespace: rayJob.Namespace,
Name: podGroupName,
}, &podGroup); err != nil {
if errors.IsNotFound(err) {
logger.Info("PodGroup not found, already deleted", "podGroupName", podGroupName)
return nil
}
logger.Error(err, "failed to get PodGroup", "podGroupName", podGroupName)
return err
}

// Delete the PodGroup to immediately release queue resources
// We use client.Delete (not background) to ensure proper cleanup
if err := v.cli.Delete(ctx, &podGroup); err != nil {
// If already deleted, that's fine
if errors.IsNotFound(err) {
logger.Info("PodGroup already deleted", "podGroupName", podGroupName)
return nil
}
logger.Error(err, "failed to delete PodGroup", "podGroupName", podGroupName)
return err
}

logger.Info("PodGroup deleted to release queue resources", "podGroupName", podGroupName,
"minMember", podGroup.Spec.MinMember)
return nil
}

func (vf *VolcanoBatchSchedulerFactory) New(_ context.Context, _ *rest.Config, cli client.Client) (schedulerinterface.BatchScheduler, error) {
if err := volcanoschedulingv1beta1.AddToScheme(cli.Scheme()); err != nil {
return nil, fmt.Errorf("failed to add volcano to scheme with error %w", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -512,3 +513,172 @@ func TestGetAppPodGroupName(t *testing.T) {
rayJob := createTestRayJob(1)
a.Equal("ray-rayjob-sample-pg", getAppPodGroupName(&rayJob))
}

func TestCleanupOnCompletion(t *testing.T) {
a := assert.New(t)
require := require.New(t)

t.Run("RayJob - delete PodGroup", func(_ *testing.T) {
rayJob := createTestRayJob(1)
scheme := runtime.NewScheme()
a.NoError(rayv1.AddToScheme(scheme))
a.NoError(volcanoschedulingv1beta1.AddToScheme(scheme))
fakeCli := fake.NewClientBuilder().WithScheme(scheme).WithStatusSubresource(&volcanoschedulingv1beta1.PodGroup{}).Build()
scheduler := &VolcanoBatchScheduler{cli: fakeCli}

ctx := context.Background()

// Manually create a PodGroup in Pending state to simulate the real scenario
podGroupName := getAppPodGroupName(&rayJob)
pg := &volcanoschedulingv1beta1.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Name: podGroupName,
Namespace: rayJob.Namespace,
},
Spec: volcanoschedulingv1beta1.PodGroupSpec{
MinMember: 3,
},
Status: volcanoschedulingv1beta1.PodGroupStatus{
Phase: volcanoschedulingv1beta1.PodGroupPending,
Running: 1,
},
}
err := fakeCli.Create(ctx, pg)
require.NoError(err)

// Verify PodGroup was created in Pending state
var retrievedPg volcanoschedulingv1beta1.PodGroup
err = fakeCli.Get(ctx, client.ObjectKey{Namespace: rayJob.Namespace, Name: podGroupName}, &retrievedPg)
require.NoError(err)
a.Equal(volcanoschedulingv1beta1.PodGroupPending, retrievedPg.Status.Phase)

// Now call CleanupOnCompletion to simulate RayJob finishing
err = scheduler.CleanupOnCompletion(ctx, &rayJob)
require.NoError(err)

// Verify PodGroup was deleted
err = fakeCli.Get(ctx, client.ObjectKey{Namespace: rayJob.Namespace, Name: podGroupName}, &retrievedPg)
require.Error(err)
a.True(errors.IsNotFound(err))
})

t.Run("RayJob - PodGroup not found (already deleted)", func(_ *testing.T) {
rayJob := createTestRayJob(1)
scheme := runtime.NewScheme()
a.NoError(rayv1.AddToScheme(scheme))
a.NoError(volcanoschedulingv1beta1.AddToScheme(scheme))
fakeCli := fake.NewClientBuilder().WithScheme(scheme).Build()
scheduler := &VolcanoBatchScheduler{cli: fakeCli}

ctx := context.Background()

// Don't create a PodGroup, just call CleanupOnCompletion
err := scheduler.CleanupOnCompletion(ctx, &rayJob)
// Should not return an error, just log that PodGroup was not found
require.NoError(err)
})

t.Run("RayCluster - should be no-op", func(_ *testing.T) {
rayCluster := createTestRayCluster(1)
scheme := runtime.NewScheme()
a.NoError(rayv1.AddToScheme(scheme))
a.NoError(volcanoschedulingv1beta1.AddToScheme(scheme))
fakeCli := fake.NewClientBuilder().WithScheme(scheme).Build()
scheduler := &VolcanoBatchScheduler{cli: fakeCli}

ctx := context.Background()

// Call CleanupOnCompletion with RayCluster - should be no-op
err := scheduler.CleanupOnCompletion(ctx, &rayCluster)
require.NoError(err)

// Verify no PodGroup was created (RayCluster PodGroups are not managed by this method)
var pg volcanoschedulingv1beta1.PodGroup
err = fakeCli.Get(ctx, client.ObjectKey{Namespace: rayCluster.Namespace, Name: getAppPodGroupName(&rayCluster)}, &pg)
require.Error(err) // Should not be found
a.True(errors.IsNotFound(err))
})

t.Run("RayJob - PodGroup in Inqueue state (bug scenario)", func(_ *testing.T) {
rayJob := createTestRayJob(1)
scheme := runtime.NewScheme()
a.NoError(rayv1.AddToScheme(scheme))
a.NoError(volcanoschedulingv1beta1.AddToScheme(scheme))
fakeCli := fake.NewClientBuilder().WithScheme(scheme).WithStatusSubresource(&volcanoschedulingv1beta1.PodGroup{}).Build()
scheduler := &VolcanoBatchScheduler{cli: fakeCli}

ctx := context.Background()

// Create a PodGroup in Inqueue state to simulate the bug scenario
podGroupName := getAppPodGroupName(&rayJob)
pg := &volcanoschedulingv1beta1.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Name: podGroupName,
Namespace: rayJob.Namespace,
},
Spec: volcanoschedulingv1beta1.PodGroupSpec{
MinMember: 31,
},
Status: volcanoschedulingv1beta1.PodGroupStatus{
Phase: volcanoschedulingv1beta1.PodGroupInqueue,
Running: 0,
Succeeded: 1,
},
}
err := fakeCli.Create(ctx, pg)
require.NoError(err)

// Call CleanupOnCompletion
err = scheduler.CleanupOnCompletion(ctx, &rayJob)
require.NoError(err)

// Verify PodGroup was deleted
var retrievedPg volcanoschedulingv1beta1.PodGroup
err = fakeCli.Get(ctx, client.ObjectKey{Namespace: rayJob.Namespace, Name: podGroupName}, &retrievedPg)
require.Error(err)
a.True(errors.IsNotFound(err))
})

t.Run("RayJob - idempotent (can call multiple times)", func(_ *testing.T) {
rayJob := createTestRayJob(1)
scheme := runtime.NewScheme()
a.NoError(rayv1.AddToScheme(scheme))
a.NoError(volcanoschedulingv1beta1.AddToScheme(scheme))
fakeCli := fake.NewClientBuilder().WithScheme(scheme).WithStatusSubresource(&volcanoschedulingv1beta1.PodGroup{}).Build()
scheduler := &VolcanoBatchScheduler{cli: fakeCli}

ctx := context.Background()

// Create a PodGroup
podGroupName := getAppPodGroupName(&rayJob)
pg := &volcanoschedulingv1beta1.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Name: podGroupName,
Namespace: rayJob.Namespace,
},
Spec: volcanoschedulingv1beta1.PodGroupSpec{
MinMember: 3,
},
Status: volcanoschedulingv1beta1.PodGroupStatus{
Phase: volcanoschedulingv1beta1.PodGroupPending,
Running: 1,
},
}
err := fakeCli.Create(ctx, pg)
require.NoError(err)

// Call CleanupOnCompletion first time
err = scheduler.CleanupOnCompletion(ctx, &rayJob)
require.NoError(err)

// Verify PodGroup was deleted
var retrievedPg volcanoschedulingv1beta1.PodGroup
err = fakeCli.Get(ctx, client.ObjectKey{Namespace: rayJob.Namespace, Name: podGroupName}, &retrievedPg)
require.Error(err)
a.True(errors.IsNotFound(err))

// Call CleanupOnCompletion second time - should not error
err = scheduler.CleanupOnCompletion(ctx, &rayJob)
require.NoError(err)
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ func (y *YuniKornScheduler) AddMetadataToChildResource(ctx context.Context, pare
}
}

func (y *YuniKornScheduler) CleanupOnCompletion(_ context.Context, _ metav1.Object) error {
// YuniKorn doesn't need cleanup
return nil
}

func (yf *YuniKornSchedulerFactory) New(_ context.Context, _ *rest.Config, _ client.Client) (schedulerinterface.BatchScheduler, error) {
return &YuniKornScheduler{}, nil
}
Expand Down
15 changes: 15 additions & 0 deletions ray-operator/controllers/ray/rayjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,21 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
// The RayJob is already suspended, we should not requeue it.
return ctrl.Result{}, nil
case rayv1.JobDeploymentStatusComplete, rayv1.JobDeploymentStatusFailed:
// Clean up batch scheduler resources (e.g., delete Volcano PodGroup)
// This should be done before other deletion logic to ensure proper resource cleanup
if r.options.BatchSchedulerManager != nil {
scheduler, err := r.options.BatchSchedulerManager.GetScheduler()
if err != nil {
logger.Error(err, "Failed to get batch scheduler")
// Don't block the reconciliation on scheduler errors, just log the error
} else {
if err := scheduler.CleanupOnCompletion(ctx, rayJobInstance); err != nil {
logger.Error(err, "Failed to cleanup batch scheduler resources")
// Don't block the reconciliation on cleanup failures, just log the error
}
}
}

// The RayJob has reached a terminal state. Handle the cleanup and deletion logic.
// If the RayJob uses an existing RayCluster, we must not delete it.
if len(rayJobInstance.Spec.ClusterSelector) > 0 {
Expand Down
Loading