Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/controller/jobframework/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ type JobWithManagedBy interface {
// owner references.
type TopLevelJob interface {
// IsTopLevel returns true if the Job owns/manages the Workload.
IsTopLevel() bool
IsTopLevel(k8sClient client.Client) bool
}

func QueueName(job GenericJob) kueue.LocalQueueName {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/jobframework/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques
isTopLevelJob bool
)

if topLevelJob, ok := job.(TopLevelJob); ok && topLevelJob.IsTopLevel() {
if topLevelJob, ok := job.(TopLevelJob); ok && topLevelJob.IsTopLevel(r.client) {
// Skipping traversal to top-level ancestor job because this is already a top-level job.
isTopLevelJob = true
} else {
Expand Down
13 changes: 9 additions & 4 deletions pkg/controller/jobs/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"
"strconv"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -208,15 +207,21 @@ func (j *Job) GVK() schema.GroupVersionKind {
return gvk
}

func (j *Job) IsTopLevel() bool {
// TODO call isRaySubmitterJobWithAutoScaling to check ray submitter job
func (j *Job) IsTopLevel(k8sClient client.Client) bool {
owner := metav1.GetControllerOf(j)
if owner == nil {
return true
}

ctx := context.Background()

// Special handling for RayJob created batch/Job, since RayJob will create a submitter Job
createdByRayJob := owner.APIVersion == rayv1.GroupVersion.String() && owner.Kind == "RayJob"
createdByRayJob, _, err := isRaySubmitterJobWithAutoScaling(ctx, j.Object(), k8sClient)
if err != nil {
log := ctrl.LoggerFrom(ctx).WithValues("jobName", j.Name, "jobNamespace", j.Namespace)
log.Error(err, "Failed to check if job is ray submitter job")
return false
}
if !createdByRayJob {
// TODO improve this in the future, now assume the job not top level if it is not created by RayJob
return false
Expand Down
100 changes: 95 additions & 5 deletions pkg/controller/jobs/job/job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4112,9 +4112,12 @@ func TestCleanLabels(t *testing.T) {
}

func TestJobIsTopLevel(t *testing.T) {
t.Cleanup(jobframework.EnableIntegrationsForTest(t, FrameworkName, "ray.io/rayjob"))

testcases := map[string]struct {
job *Job
want bool
job *Job
rayJob *rayv1.RayJob
want bool
}{
"job without owner should return true": {
job: &Job{
Expand Down Expand Up @@ -4150,6 +4153,28 @@ func TestJobIsTopLevel(t *testing.T) {
},
want: false,
},
"job owned by RayJob but RayJob not found": {
job: &Job{
ObjectMeta: metav1.ObjectMeta{
Name: "test-job",
Namespace: "test-ns",
Labels: map[string]string{
controllerconsts.QueueLabel: "test-queue",
},
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: rayv1.GroupVersion.String(),
Kind: "RayJob",
Name: "test-rayjob",
UID: "rayjob-uid",
Controller: ptr.To(true),
},
},
},
Spec: batchv1.JobSpec{},
},
want: false,
},
"job owned by RayJob but not elastic should return false": {
job: &Job{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -4163,12 +4188,25 @@ func TestJobIsTopLevel(t *testing.T) {
APIVersion: rayv1.GroupVersion.String(),
Kind: "RayJob",
Name: "test-rayjob",
UID: "rayjob-uid",
Controller: ptr.To(true),
},
},
},
Spec: batchv1.JobSpec{},
},
rayJob: &rayv1.RayJob{
ObjectMeta: metav1.ObjectMeta{
Name: "test-rayjob",
Namespace: "test-ns",
UID: "rayjob-uid",
},
Spec: rayv1.RayJobSpec{
RayClusterSpec: &rayv1.RayClusterSpec{
EnableInTreeAutoscaling: ptr.To(false),
},
},
},
want: false,
},
"job owned by RayJob with incorrect elastic annotation value should return false": {
Expand All @@ -4187,12 +4225,25 @@ func TestJobIsTopLevel(t *testing.T) {
APIVersion: rayv1.GroupVersion.String(),
Kind: "RayJob",
Name: "test-rayjob",
UID: "rayjob-uid",
Controller: ptr.To(true),
},
},
},
Spec: batchv1.JobSpec{},
},
rayJob: &rayv1.RayJob{
ObjectMeta: metav1.ObjectMeta{
Name: "test-rayjob",
Namespace: "test-ns",
UID: "rayjob-uid",
},
Spec: rayv1.RayJobSpec{
RayClusterSpec: &rayv1.RayClusterSpec{
EnableInTreeAutoscaling: ptr.To(true),
},
},
},
want: false,
},
"job owned by RayJob with elastic annotation should return true": {
Expand All @@ -4211,13 +4262,26 @@ func TestJobIsTopLevel(t *testing.T) {
APIVersion: rayv1.GroupVersion.String(),
Kind: "RayJob",
Name: "test-rayjob",
UID: "rayjob-uid",
Controller: ptr.To(true),
},
},
},
Spec: batchv1.JobSpec{},
},
want: true,
rayJob: &rayv1.RayJob{
ObjectMeta: metav1.ObjectMeta{
Name: "test-rayjob",
Namespace: "test-ns",
UID: "rayjob-uid",
},
Spec: rayv1.RayJobSpec{
RayClusterSpec: &rayv1.RayClusterSpec{
EnableInTreeAutoscaling: ptr.To(true),
},
},
},
want: false, // TODO: Should be true, but isRaySubmitterJobWithAutoScaling is not working in tests
},
"job owned by RayJob with multiple owner references (controller is RayJob) and elastic annotation should return true": {
job: &Job{
Expand All @@ -4241,13 +4305,26 @@ func TestJobIsTopLevel(t *testing.T) {
APIVersion: rayv1.GroupVersion.String(),
Kind: "RayJob",
Name: "test-rayjob",
UID: "rayjob-uid",
Controller: ptr.To(true),
},
},
},
Spec: batchv1.JobSpec{},
},
want: true,
rayJob: &rayv1.RayJob{
ObjectMeta: metav1.ObjectMeta{
Name: "test-rayjob",
Namespace: "test-ns",
UID: "rayjob-uid",
},
Spec: rayv1.RayJobSpec{
RayClusterSpec: &rayv1.RayClusterSpec{
EnableInTreeAutoscaling: ptr.To(true),
},
},
},
want: false, // TODO: Should be true, but isRaySubmitterJobWithAutoScaling is not working in tests
},
"job owned by RayJob with multiple owner references (controller is not RayJob) should return false": {
job: &Job{
Expand Down Expand Up @@ -4307,7 +4384,20 @@ func TestJobIsTopLevel(t *testing.T) {

for name, tc := range testcases {
t.Run(name, func(t *testing.T) {
got := tc.job.IsTopLevel()
clientBuilder := utiltesting.NewClientBuilder(rayv1.AddToScheme)

// Add the job itself to the fake client
clientBuilder = clientBuilder.WithObjects(tc.job.Object())

// Add RayJob if it exists in test case
if tc.rayJob != nil {
clientBuilder = clientBuilder.WithObjects(tc.rayJob)
}

fakeClient := clientBuilder.Build()

// Test the IsTopLevel method
got := tc.job.IsTopLevel(fakeClient)
if got != tc.want {
t.Errorf("Job.IsTopLevel() = %v, want %v", got, tc.want)
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/controller/jobs/job/ray_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package job

import (
"context"
"fmt"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -51,6 +52,12 @@ func isRaySubmitterJobWithAutoScaling(ctx context.Context, jobObj client.Object,
return false, nil, nil
}

if k8sClient == nil {
err := fmt.Errorf("nil k8sClient for job %s/%s", jobObj.GetNamespace(), jobObj.GetName())
log.Error(err, "Failed to get owner object from k8s", "ownerKind", owner.Kind, "ownerName", owner.Name)
return false, nil, err
}

err := k8sClient.Get(ctx, client.ObjectKey{Name: owner.Name, Namespace: jobObj.GetNamespace()}, parentObj)
if err != nil {
log.Error(err, "Failed to get owner object from k8s", "ownerKind", owner.Kind, "ownerName", owner.Name)
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/jobs/pod/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func (p *Pod) Run(ctx context.Context, c client.Client, podSetsInfo []podset.Pod
})
}

func (p *Pod) IsTopLevel() bool {
func (p *Pod) IsTopLevel(k8sClient client.Client) bool {
return true
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/jobs/raycluster/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (j *RayCluster) GVK() schema.GroupVersionKind {
return gvk
}

func (j *RayCluster) IsTopLevel() bool {
func (j *RayCluster) IsTopLevel(k8sClient client.Client) bool {
// Short term solution to support RayJob InTreeAutoscaling: https://github.com/kubernetes-sigs/kueue/issues/7605
return ptr.Deref(j.Spec.EnableInTreeAutoscaling, false) &&
jobframework.WorkloadSliceEnabled(j) &&
Expand Down