Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/util/testingjobs/rayjob/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ func MakeJob(name, ns string) *JobWrapper {
{
GroupName: "workers-group-0",
Replicas: ptr.To[int32](1),
MinReplicas: ptr.To[int32](0),
MaxReplicas: ptr.To[int32](10),
MinReplicas: ptr.To[int32](1),
MaxReplicas: ptr.To[int32](5),
RayStartParams: map[string]string{},
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Expand Down
181 changes: 172 additions & 9 deletions test/e2e/singlecluster/kuberay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@ limitations under the License.
package e2e

import (
"strings"

"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta2"
Expand Down Expand Up @@ -50,6 +54,36 @@ var _ = ginkgo.Describe("Kuberay", func() {
lq *kueue.LocalQueue
)

// getRunningWorkerPodNames returns the names of running pods that have "workers" in their name
getRunningWorkerPodNames := func(podList *corev1.PodList) []string {
var podNames []string
for _, pod := range podList.Items {
if strings.Contains(pod.Name, "workers") && pod.Status.Phase == corev1.PodRunning {
podNames = append(podNames, pod.Name)
}
}
return podNames
}

// verifyPodNamesAreSuperset checks that superset contains all names from subset
verifyPodNamesAreSuperset := func(superset, subset []string) bool {
if len(superset) < len(subset) {
return false
}

supersetMap := make(map[string]bool)
for _, name := range superset {
supersetMap[name] = true
}

for _, subsetName := range subset {
if !supersetMap[subsetName] {
return false
}
}
return true
}

ginkgo.BeforeEach(func() {
ns = util.CreateNamespaceFromPrefixWithLog(ctx, k8sClient, "kuberay-e2e-")
rf = utiltestingapi.MakeResourceFlavor(resourceFlavorName).
Expand All @@ -60,7 +94,7 @@ var _ = ginkgo.Describe("Kuberay", func() {
cq = utiltestingapi.MakeClusterQueue(clusterQueueName).
ResourceGroup(
*utiltestingapi.MakeFlavorQuotas(rf.Name).
Resource(corev1.ResourceCPU, "1").Obj()).
Resource(corev1.ResourceCPU, "3").Obj()).
Obj()
util.CreateClusterQueuesAndWaitForActive(ctx, k8sClient, cq)

Expand Down Expand Up @@ -140,19 +174,46 @@ var _ = ginkgo.Describe("Kuberay", func() {
})
})

// TODO enhance to test scale up / down operations
// See https://github.com/kubernetes-sigs/kueue/pull/8082#discussion_r2605582024
ginkgo.It("Should run a rayjob with InTreeAutoscaling", func() {
kuberayTestImage := util.GetKuberayTestImage()

// Create ConfigMap with Python script
configMap := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "rayjob-autoscaling",
Namespace: ns.Name,
},
Data: map[string]string{
"sample_code.py": `import ray
import os

ray.init()

@ray.remote
def my_task(x, s):
import time
time.sleep(s)
return x * x

# run tasks in sequence to avoid triggering autoscaling in the beginning
print([ray.get(my_task.remote(i, 1)) for i in range(4)])

# run tasks in parallel to trigger autoscaling (scaling up)
print(ray.get([my_task.remote(i, 4) for i in range(16)]))

# run tasks in sequence to trigger scaling down
print([ray.get(my_task.remote(i, 1)) for i in range(16)])`,
},
}

rayJob := testingrayjob.MakeJob("rayjob-autoscaling", ns.Name).
Queue(localQueueName).
Annotation(workloadslicing.EnabledAnnotationKey, workloadslicing.EnabledAnnotationValue).
EnableInTreeAutoscaling().
WithSubmissionMode(rayv1.K8sJobMode).
Entrypoint("python -c \"import ray; ray.init(); print(ray.cluster_resources())\"").
RequestAndLimit(rayv1.HeadNode, corev1.ResourceCPU, "300m").
RequestAndLimit(rayv1.WorkerNode, corev1.ResourceCPU, "300m").
Entrypoint("python /home/ray/samples/sample_code.py").
RequestAndLimit(rayv1.HeadNode, corev1.ResourceCPU, "200m").
RequestAndLimit(rayv1.WorkerNode, corev1.ResourceCPU, "200m").
WithSubmitterPodTemplate(corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
Expand All @@ -161,10 +222,10 @@ var _ = ginkgo.Describe("Kuberay", func() {
Image: kuberayTestImage,
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("300m"),
corev1.ResourceCPU: resource.MustParse("200m"),
},
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("300m"),
corev1.ResourceCPU: resource.MustParse("200m"),
},
},
},
Expand All @@ -175,11 +236,47 @@ var _ = ginkgo.Describe("Kuberay", func() {
Image(rayv1.HeadNode, kuberayTestImage).
Image(rayv1.WorkerNode, kuberayTestImage).Obj()

// Add volume and volumeMount to head node for the ConfigMap
rayJob.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.Volumes = []corev1.Volume{
{
Name: "script-volume",
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: "rayjob-autoscaling",
},
},
},
},
}
rayJob.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.Containers[0].VolumeMounts = []corev1.VolumeMount{
{
Name: "script-volume",
MountPath: "/home/ray/samples",
},
}
rayJob.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.TerminationGracePeriodSeconds = ptr.To(int64(5))
for i := range len(rayJob.Spec.RayClusterSpec.WorkerGroupSpecs) {
rayJob.Spec.RayClusterSpec.WorkerGroupSpecs[i].Template.Spec.TerminationGracePeriodSeconds = ptr.To(int64(5))
}

ginkgo.By("Creating the ConfigMap", func() {
gomega.Expect(k8sClient.Create(ctx, configMap)).Should(gomega.Succeed())
})
ginkgo.DeferCleanup(func() {
gomega.Expect(k8sClient.Delete(ctx, configMap)).Should(gomega.Succeed())
})

ginkgo.By("Creating the rayJob", func() {
gomega.Expect(k8sClient.Create(ctx, rayJob)).Should(gomega.Succeed())
})

ginkgo.By("Checking one workload is created and admitted", func() {
// Variable to store initial pod names for verification during scaling
var initialPodNames []string
// Variable to store scaled-up pod names for verification during scaling down
var scaledUpPodNames []string

ginkgo.By("Checking one workload is created", func() {
gomega.Eventually(func(g gomega.Gomega) {
workloadList := &kueue.WorkloadList{}
g.Expect(k8sClient.List(ctx, workloadList, client.InNamespace(ns.Name))).To(gomega.Succeed())
Expand All @@ -204,6 +301,72 @@ var _ = ginkgo.Describe("Kuberay", func() {
}, util.VeryLongTimeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("Waiting for 3 pods in rayjob namespace", func() {
// 3 rayjob pods: head, worker, submitter job
gomega.Eventually(func(g gomega.Gomega) {
podList := &corev1.PodList{}
g.Expect(k8sClient.List(ctx, podList, client.InNamespace(ns.Name))).To(gomega.Succeed())
g.Expect(podList.Items).To(gomega.HaveLen(3), "Expected exactly 3 pods in rayjob namespace")
// Get worker pod names and check count
workerPodNames := getRunningWorkerPodNames(podList)
g.Expect(workerPodNames).To(gomega.HaveLen(1), "Expected exactly 1 pod with 'workers' in the name")

// Store initial pod names for later verification
initialPodNames = workerPodNames
}, util.LongTimeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("Waiting for 2 workloads", func() {
// 2 workloads: one for the ray cluster, another for the submitter job created by the ray job
gomega.Eventually(func(g gomega.Gomega) {
workloadList := &kueue.WorkloadList{}
g.Expect(k8sClient.List(ctx, workloadList, client.InNamespace(ns.Name))).To(gomega.Succeed())
g.Expect(workloadList.Items).To(gomega.HaveLen(2), "Expected exactly 2 workloads")
}, util.LongTimeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("Waiting for 5 workers due to scaling up", func() {
gomega.Eventually(func(g gomega.Gomega) {
podList := &corev1.PodList{}
g.Expect(k8sClient.List(ctx, podList, client.InNamespace(ns.Name))).To(gomega.Succeed())
// Get worker pod names and check count
currentPodNames := getRunningWorkerPodNames(podList)
g.Expect(currentPodNames).To(gomega.HaveLen(5), "Expected exactly 5 pods with 'workers' in the name")

// Verify that the current pod names are a superset of the initial pod names
g.Expect(verifyPodNamesAreSuperset(currentPodNames, initialPodNames)).To(gomega.BeTrue(),
"Current worker pod names should be a superset of initial pod names. "+
"Initial pods: %v, Current pods: %v", initialPodNames, currentPodNames)

// Store scaled-up pod names for later verification during scaling down
scaledUpPodNames = currentPodNames
}, util.VeryLongTimeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("Waiting for 3 workloads due to scaling up creating another workload", func() {
// 3 workloads now, after scaling up, a new workload will be created for the new resource request
gomega.Eventually(func(g gomega.Gomega) {
workloadList := &kueue.WorkloadList{}
g.Expect(k8sClient.List(ctx, workloadList, client.InNamespace(ns.Name))).To(gomega.Succeed())
g.Expect(workloadList.Items).To(gomega.HaveLen(3), "Expected exactly 3 workloads")
}, util.LongTimeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("Waiting for workers reduced to 1 due to scaling down", func() {
gomega.Eventually(func(g gomega.Gomega) {
podList := &corev1.PodList{}
g.Expect(k8sClient.List(ctx, podList, client.InNamespace(ns.Name))).To(gomega.Succeed())
// Get worker pod names and check count
workerPodNames := getRunningWorkerPodNames(podList)
g.Expect(workerPodNames).To(gomega.HaveLen(1), "Expected exactly 1 pods with 'workers' in the name")

// Verify that the previous scaled-up pod names are a superset of the current pod names
g.Expect(verifyPodNamesAreSuperset(scaledUpPodNames, workerPodNames)).To(gomega.BeTrue(),
"Previous scaled-up worker pod names should be a superset of current pod names. "+
"Scaled-up pods: %v, Current pods: %v", scaledUpPodNames, workerPodNames)
}, util.VeryLongTimeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("Waiting for the RayJob to finish", func() {
gomega.Eventually(func(g gomega.Gomega) {
createdRayJob := &rayv1.RayJob{}
Expand Down
3 changes: 3 additions & 0 deletions test/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ func DeleteNamespace(ctx context.Context, c client.Client, ns *corev1.Namespace)
if err := DeleteAllJobsInNamespace(ctx, c, ns); err != nil {
return err
}
if err := DeleteAllRayJobsInNamespace(ctx, c, ns); err != nil {
return err
}
if err := DeleteAllTrainingRuntimesInNamespace(ctx, c, ns); err != nil {
return err
}
Expand Down