Skip to content

Commit 7f94988

Browse files
authored
Fix crash in podgroup when runLauncherAsWorker is true (#669)
* Fix crash in podgroup when runLauncherAsWorker is true Signed-off-by: GonzaloSaez <[email protected]> * Address comments Signed-off-by: GonzaloSaez <[email protected]> --------- Signed-off-by: GonzaloSaez <[email protected]>
1 parent d0ab239 commit 7f94988

File tree

2 files changed

+111
-1
lines changed

2 files changed

+111
-1
lines changed

pkg/controller/podgroup.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,12 @@ func calPGMinResource(minMember *int32, mpiJob *kubeflow.MPIJob, pcLister schedu
356356

357357
sort.Sort(sort.Reverse(order))
358358
// Launcher + Worker > minMember
359-
if minMember != nil && *order[0].Replicas+*order[1].Replicas > *minMember {
359+
replicas := *order[0].Replicas
360+
if len(order) > 1 {
361+
// When using runLauncherAsWorker, there may be no worker.
362+
replicas += *order[1].Replicas
363+
}
364+
if minMember != nil && replicas > *minMember {
360365
// If the launcher and workers have the same priority, it treats workers as a lower priority.
361366
if order[0].priority == order[1].priority {
362367
wIndex := order.getWorkerIndex()

pkg/controller/podgroup_test.go

+105
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@ var (
3838
corev1.ResourceMemory: resource.MustParse("512Gi"),
3939
"example.com/gpu": resource.MustParse("40"),
4040
}
41+
42+
minResourcesNoMinMember = &corev1.ResourceList{
43+
corev1.ResourceCPU: resource.MustParse("1"),
44+
corev1.ResourceMemory: resource.MustParse("2Gi"),
45+
}
4146
)
4247

4348
func TestNewPodGroup(t *testing.T) {
@@ -208,6 +213,73 @@ func TestNewPodGroup(t *testing.T) {
208213
},
209214
},
210215
},
216+
"no worker no MinResources": {
217+
mpiJob: &kubeflow.MPIJob{
218+
ObjectMeta: metav1.ObjectMeta{
219+
Name: "test",
220+
Annotations: map[string]string{
221+
volcanov1beta1.QueueNameAnnotationKey: "project-x",
222+
},
223+
},
224+
Spec: kubeflow.MPIJobSpec{
225+
RunLauncherAsWorker: ptr.To[bool](true),
226+
RunPolicy: kubeflow.RunPolicy{
227+
SchedulingPolicy: &kubeflow.SchedulingPolicy{
228+
MinAvailable: ptr.To[int32](1),
229+
Queue: "project-y",
230+
PriorityClass: "high",
231+
ScheduleTimeoutSeconds: ptr.To[int32](100),
232+
},
233+
},
234+
MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{
235+
kubeflow.MPIReplicaTypeLauncher: {
236+
Replicas: ptr.To[int32](1),
237+
Template: corev1.PodTemplateSpec{
238+
Spec: corev1.PodSpec{
239+
Containers: []corev1.Container{{
240+
Resources: corev1.ResourceRequirements{
241+
Requests: corev1.ResourceList{
242+
corev1.ResourceCPU: resource.MustParse("1"),
243+
corev1.ResourceMemory: resource.MustParse("2Gi"),
244+
},
245+
},
246+
}},
247+
},
248+
},
249+
},
250+
},
251+
},
252+
},
253+
wantVolcanoPG: &volcanov1beta1.PodGroup{
254+
TypeMeta: metav1.TypeMeta{
255+
APIVersion: volcanov1beta1.SchemeGroupVersion.String(),
256+
Kind: "PodGroup",
257+
},
258+
ObjectMeta: metav1.ObjectMeta{
259+
Name: "test",
260+
},
261+
Spec: volcanov1beta1.PodGroupSpec{
262+
MinMember: 1,
263+
Queue: "project-y",
264+
PriorityClassName: "high",
265+
MinResources: minResourcesNoMinMember,
266+
},
267+
},
268+
wantSchedPG: &schedv1alpha1.PodGroup{
269+
TypeMeta: metav1.TypeMeta{
270+
APIVersion: schedv1alpha1.SchemeGroupVersion.String(),
271+
Kind: "PodGroup",
272+
},
273+
ObjectMeta: metav1.ObjectMeta{
274+
Name: "test",
275+
},
276+
Spec: schedv1alpha1.PodGroupSpec{
277+
MinMember: 1,
278+
MinResources: *minResourcesNoMinMember,
279+
ScheduleTimeoutSeconds: ptr.To[int32](100),
280+
},
281+
},
282+
},
211283
}
212284
for name, tc := range testCases {
213285
t.Run(name, func(t *testing.T) {
@@ -447,6 +519,39 @@ func TestCalculatePGMinResources(t *testing.T) {
447519
corev1.ResourceMemory: resource.MustParse("65Gi"),
448520
},
449521
},
522+
"without worker without priorityClass": {
523+
minMember: 1,
524+
job: &kubeflow.MPIJob{
525+
ObjectMeta: metav1.ObjectMeta{
526+
Name: "test",
527+
},
528+
Spec: kubeflow.MPIJobSpec{
529+
MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{
530+
kubeflow.MPIReplicaTypeLauncher: {
531+
Replicas: ptr.To[int32](1),
532+
Template: corev1.PodTemplateSpec{
533+
Spec: corev1.PodSpec{
534+
Containers: []corev1.Container{
535+
{
536+
Resources: corev1.ResourceRequirements{
537+
Requests: corev1.ResourceList{
538+
corev1.ResourceCPU: resource.MustParse("2"),
539+
corev1.ResourceMemory: resource.MustParse("1Gi"),
540+
},
541+
},
542+
},
543+
},
544+
},
545+
},
546+
},
547+
},
548+
},
549+
},
550+
want: &corev1.ResourceList{
551+
corev1.ResourceCPU: resource.MustParse("2"),
552+
corev1.ResourceMemory: resource.MustParse("1Gi"),
553+
},
554+
},
450555
}
451556
for name, tc := range volcanoTests {
452557
t.Run(name, func(t *testing.T) {

0 commit comments

Comments
 (0)