Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/util/testingjobs/rayjob/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ func MakeJob(name, ns string) *JobWrapper {
{
GroupName: "workers-group-0",
Replicas: ptr.To[int32](1),
MinReplicas: ptr.To[int32](0),
MaxReplicas: ptr.To[int32](10),
MinReplicas: ptr.To[int32](1),
MaxReplicas: ptr.To[int32](5),
RayStartParams: map[string]string{},
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Expand Down
136 changes: 125 additions & 11 deletions test/e2e/singlecluster/kuberay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@ limitations under the License.
package e2e

import (
"strings"

"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
corev1 "k8s.io/api/core/v1"
apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand Down Expand Up @@ -50,6 +53,17 @@ var _ = ginkgo.Describe("Kuberay", func() {
lq *kueue.LocalQueue
)

// countRunningWorkerPods counts the number of running pods that have "workers" in their name
countRunningWorkerPods := func(podList *corev1.PodList) int {
workerPodCount := 0
for _, pod := range podList.Items {
if strings.Contains(pod.Name, "workers") && pod.Status.Phase == corev1.PodRunning {
workerPodCount++
}
}
return workerPodCount
}

ginkgo.BeforeEach(func() {
ns = util.CreateNamespaceFromPrefixWithLog(ctx, k8sClient, "kuberay-e2e-")
rf = utiltestingapi.MakeResourceFlavor(resourceFlavorName).
Expand All @@ -60,7 +74,7 @@ var _ = ginkgo.Describe("Kuberay", func() {
cq = utiltestingapi.MakeClusterQueue(clusterQueueName).
ResourceGroup(
*utiltestingapi.MakeFlavorQuotas(rf.Name).
Resource(corev1.ResourceCPU, "1").Obj()).
Resource(corev1.ResourceCPU, "3").Obj()).
Obj()
util.CreateClusterQueuesAndWaitForActive(ctx, k8sClient, cq)

Expand Down Expand Up @@ -140,19 +154,46 @@ var _ = ginkgo.Describe("Kuberay", func() {
})
})

// TODO enhance to test scale up / down operations
// See https://github.com/kubernetes-sigs/kueue/pull/8082#discussion_r2605582024
ginkgo.It("Should run a rayjob with InTreeAutoscaling", func() {
kuberayTestImage := util.GetKuberayTestImage()

// Create ConfigMap with Python script
configMap := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "rayjob-autoscaling",
Namespace: ns.Name,
},
Data: map[string]string{
"sample_code.py": `import ray
import os

ray.init()

@ray.remote
def my_task(x, s):
import time
time.sleep(s)
return x * x

# run tasks in sequence to avoid triggering autoscaling in the beginning
print([ray.get(my_task.remote(i, 1)) for i in range(4)])

# run tasks in parallel to trigger autoscaling (scaling up)
print(ray.get([my_task.remote(i, 4) for i in range(16)]))

# run tasks in sequence to trigger scaling down
print([ray.get(my_task.remote(i, 1)) for i in range(16)])`,
},
}

rayJob := testingrayjob.MakeJob("rayjob-autoscaling", ns.Name).
Queue(localQueueName).
Annotation(workloadslicing.EnabledAnnotationKey, workloadslicing.EnabledAnnotationValue).
EnableInTreeAutoscaling().
WithSubmissionMode(rayv1.K8sJobMode).
Entrypoint("python -c \"import ray; ray.init(); print(ray.cluster_resources())\"").
RequestAndLimit(rayv1.HeadNode, corev1.ResourceCPU, "300m").
RequestAndLimit(rayv1.WorkerNode, corev1.ResourceCPU, "300m").
Entrypoint("python /home/ray/samples/sample_code.py").
RequestAndLimit(rayv1.HeadNode, corev1.ResourceCPU, "200m").
RequestAndLimit(rayv1.WorkerNode, corev1.ResourceCPU, "200m").
WithSubmitterPodTemplate(corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
Expand All @@ -161,10 +202,10 @@ var _ = ginkgo.Describe("Kuberay", func() {
Image: kuberayTestImage,
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("300m"),
corev1.ResourceCPU: resource.MustParse("200m"),
},
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("300m"),
corev1.ResourceCPU: resource.MustParse("200m"),
},
},
},
Expand All @@ -175,19 +216,42 @@ var _ = ginkgo.Describe("Kuberay", func() {
Image(rayv1.HeadNode, kuberayTestImage).
Image(rayv1.WorkerNode, kuberayTestImage).Obj()

// Add volume and volumeMount to head node for the ConfigMap
rayJob.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.Volumes = []corev1.Volume{
{
Name: "script-volume",
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: "rayjob-autoscaling",
},
},
},
},
}
rayJob.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.Containers[0].VolumeMounts = []corev1.VolumeMount{
{
Name: "script-volume",
MountPath: "/home/ray/samples",
},
}

ginkgo.By("Creating the ConfigMap", func() {
gomega.Expect(k8sClient.Create(ctx, configMap)).Should(gomega.Succeed())
})

ginkgo.By("Creating the rayJob", func() {
gomega.Expect(k8sClient.Create(ctx, rayJob)).Should(gomega.Succeed())
})

ginkgo.By("Checking one workload is created and admitted", func() {
ginkgo.By("Checking one workload is created", func() {
gomega.Eventually(func(g gomega.Gomega) {
workloadList := &kueue.WorkloadList{}
g.Expect(k8sClient.List(ctx, workloadList, client.InNamespace(ns.Name))).To(gomega.Succeed())
g.Expect(workloadList.Items).NotTo(gomega.BeEmpty(), "Expected at least one workload in namespace")
hasAdmittedWorkload := false
for _, wl := range workloadList.Items {
if apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadAdmitted) ||
apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadFinished) {
if apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadAdmitted) {
hasAdmittedWorkload = true
break
}
Expand All @@ -205,6 +269,56 @@ var _ = ginkgo.Describe("Kuberay", func() {
}, util.VeryLongTimeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("Waiting for 3 pods in rayjob namespace", func() {
// 3 rayjob pods: head, worker, submitter job
gomega.Eventually(func(g gomega.Gomega) {
podList := &corev1.PodList{}
g.Expect(k8sClient.List(ctx, podList, client.InNamespace(ns.Name))).To(gomega.Succeed())
g.Expect(podList.Items).To(gomega.HaveLen(3), "Expected exactly 3 pods in rayjob namespace")
// Count pods that have "workers" in their name
workerPodCount := countRunningWorkerPods(podList)
g.Expect(workerPodCount).To(gomega.Equal(1), "Expected exactly 1 pod with 'workers' in the name")
}, util.VeryLongTimeout, util.Interval).Should(gomega.Succeed())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also wondering why we are using VeryLongTimeout? Is it not enough LongTimeout?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also in other places.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using VeryLongTimeout helps making the test more stable, I tested with different sleeping time in scaling up/down, then come to around 5 - 10 seconds, adding extra time for pod startup time, VeryLongTimeout will make sure we have enough time to avoid random failure. I did another check, removed some VeryLongTimeout and still kept some VeryLongTimeout.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I think this is likely because the termination takes 30s as by default. I think we could mitigate this problem largely by setting int the PodTemplate: TerminationGracePeriodSeconds. PTAL as we have the TerminationGracePeriod helper introduced for many Job types already.

With TerminationGracePeriodSeconds I expect LongTimeout is typically ok, but please check. I'm ok with VeryLongTimeout if needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added TerminationGracePeriodSeconds to be 5 seconds in the code. Still feel VeryLongTimeout is safer, just in case the test running has some randomness and takes a bit long. Also VeryLongTimeout is the worst case, if the test finishes earlier, it will not really wait that long.

})

ginkgo.By("Waiting for 2 workloads", func() {
// 2 workloads: one for the ray job, another for the submitter job created by the ray job
gomega.Eventually(func(g gomega.Gomega) {
workloadList := &kueue.WorkloadList{}
g.Expect(k8sClient.List(ctx, workloadList, client.InNamespace(ns.Name))).To(gomega.Succeed())
g.Expect(workloadList.Items).To(gomega.HaveLen(2), "Expected exactly 2 workloads")
}, util.VeryLongTimeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("Waiting for 5 workers due to scaling up", func() {
gomega.Eventually(func(g gomega.Gomega) {
podList := &corev1.PodList{}
g.Expect(k8sClient.List(ctx, podList, client.InNamespace(ns.Name))).To(gomega.Succeed())
// Count pods that have "workers" in their name
workerPodCount := countRunningWorkerPods(podList)
g.Expect(workerPodCount).To(gomega.Equal(5), "Expected exactly 5 pods with 'workers' in the name")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, it is better if this helper returns Pod names rather than just count.

Then I think we should verify the new 5 pods is a superset of the old Pods. Otherwise maybe they were all re-created. Similarly, on scale down we would like to check that the new set of pods is a subset of previous Pods.

Let me know what you think, but I expect this will make the tests proving the scaling really works. Otherwise it was possible also in the past with suspending / re admitting the Job.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, like the suggestion here, let me add this!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update PR to check pod names to be superset after scaling up/down.

}, util.VeryLongTimeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("Waiting for 3 workloads due to scaling up creating another workload", func() {
// 3 workloads now, after scaling up, a new workload will be created for the new resource request
gomega.Eventually(func(g gomega.Gomega) {
workloadList := &kueue.WorkloadList{}
g.Expect(k8sClient.List(ctx, workloadList, client.InNamespace(ns.Name))).To(gomega.Succeed())
g.Expect(workloadList.Items).To(gomega.HaveLen(3), "Expected exactly 3 workloads")
}, util.VeryLongTimeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("Waiting for workers reduced to 1 due to scaling down", func() {
gomega.Eventually(func(g gomega.Gomega) {
podList := &corev1.PodList{}
g.Expect(k8sClient.List(ctx, podList, client.InNamespace(ns.Name))).To(gomega.Succeed())
// Count pods that have "workers" in their name
workerPodCount := countRunningWorkerPods(podList)
g.Expect(workerPodCount).To(gomega.Equal(1), "Expected exactly 1 pods with 'workers' in the name")
}, util.VeryLongTimeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("Waiting for the RayJob to finish", func() {
gomega.Eventually(func(g gomega.Gomega) {
createdRayJob := &rayv1.RayJob{}
Expand Down