Skip to content

Commit 8e4bc47

Browse files
Ambient Code Botclaude
andcommitted
fix(jobset): delete stale suspended JobSet on runtime upgrade
When a TrainJob is suspended before an RHOAI upgrade (e.g., via Kueue ClusterQueue stopPolicy: Hold) and the ClusterTrainingRuntime spec changes during upgrade (new container image), the Trainer controller fails post-upgrade with: admission webhook "vjobset.kb.io" denied the request: spec.replicatedJobs: Invalid value: ...: field is immutable In Build(), add a check: if the existing JobSet is suspended and replicatedJobsSpecChanged() detects a difference in container images or job names, delete the stale JobSet using foreground propagation and return nil. The controller re-queues and recreates the JobSet with the correct post-upgrade spec on the next reconcile cycle. Add TestBuild_ImmutableJobSetUpgrade covering the upgrade regression and three boundary cases (same image, running jobset, no jobset). Fixes RHOAIENG-48867 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 8fb40b1 commit 8e4bc47

2 files changed

Lines changed: 232 additions & 0 deletions

File tree

pkg/runtime/framework/plugins/jobset/jobset.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,25 @@ func (j *JobSet) Build(ctx context.Context, info *runtime.Info, trainJob *traine
280280
}
281281
}
282282

283+
// RHOAIENG-48867: If the existing JobSet is suspended and spec.replicatedJobs has
284+
// changed (e.g., container images updated during a ClusterTrainingRuntime upgrade),
285+
// delete the stale JobSet so it can be recreated with the new spec on the next
286+
// reconcile cycle. This is required because spec.replicatedJobs is immutable in
287+
// JobSet and cannot be updated in-place — the admission webhook rejects such updates.
288+
if oldJobSet != nil && ptr.Deref(oldJobSet.Spec.Suspend, false) &&
289+
replicatedJobsSpecChanged(oldJobSet.Spec.ReplicatedJobs, jobSetSpec.ReplicatedJobs) {
290+
j.logger.Info("Deleting stale suspended JobSet: spec.replicatedJobs changed post-upgrade, will recreate",
291+
"jobSet", client.ObjectKeyFromObject(trainJob))
292+
// Use foreground propagation so any owned child resources are cleaned up
293+
// before the JobSet object itself is removed.
294+
if err := j.client.Delete(ctx, oldJobSet,
295+
client.PropagationPolicy(metav1.DeletePropagationForeground),
296+
); client.IgnoreNotFound(err) != nil {
297+
return nil, err
298+
}
299+
return nil, nil
300+
}
301+
283302
// Init the JobSet apply configuration from the runtime template spec
284303
jobSetBuilder := NewBuilder(jobsetv1alpha2ac.JobSet(trainJob.Name, trainJob.Namespace).
285304
WithLabels(maps.Clone(info.Labels)).
@@ -306,6 +325,51 @@ func (j *JobSet) Build(ctx context.Context, info *runtime.Info, trainJob *traine
306325
return []apiruntime.ApplyConfiguration{jobSet}, nil
307326
}
308327

328+
// replicatedJobsSpecChanged reports whether the desired ReplicatedJobs differ from
329+
// the existing JobSet's replicatedJobs in ways that would trigger the immutability
330+
// constraint (different count, names, or container images). This covers the upgrade
331+
// scenario where a ClusterTrainingRuntime change updates the runtime container image.
332+
// RHOAIENG-48867
333+
func replicatedJobsSpecChanged(
334+
existing []jobsetv1alpha2.ReplicatedJob,
335+
desired []jobsetv1alpha2ac.ReplicatedJobApplyConfiguration,
336+
) bool {
337+
if len(existing) != len(desired) {
338+
return true
339+
}
340+
existingByName := make(map[string]*jobsetv1alpha2.ReplicatedJob, len(existing))
341+
for i := range existing {
342+
existingByName[existing[i].Name] = &existing[i]
343+
}
344+
for _, d := range desired {
345+
if d.Name == nil {
346+
continue
347+
}
348+
e, ok := existingByName[*d.Name]
349+
if !ok {
350+
// A replicated job with this name does not exist in the current JobSet.
351+
return true
352+
}
353+
if d.Template == nil || d.Template.Spec == nil ||
354+
d.Template.Spec.Template == nil || d.Template.Spec.Template.Spec == nil {
355+
continue
356+
}
357+
existingImages := make(map[string]string, len(e.Template.Spec.Template.Spec.Containers))
358+
for _, c := range e.Template.Spec.Template.Spec.Containers {
359+
existingImages[c.Name] = c.Image
360+
}
361+
for _, c := range d.Template.Spec.Template.Spec.Containers {
362+
if c.Name == nil || c.Image == nil {
363+
continue
364+
}
365+
if img, found := existingImages[*c.Name]; !found || img != *c.Image {
366+
return true
367+
}
368+
}
369+
}
370+
return false
371+
}
372+
309373
func (j *JobSet) Status(ctx context.Context, trainJob *trainer.TrainJob) (*trainer.TrainJobStatus, error) {
310374
jobSet := &jobsetv1alpha2.JobSet{}
311375
if err := j.client.Get(ctx, client.ObjectKeyFromObject(trainJob), jobSet); err != nil {

pkg/runtime/framework/plugins/jobset/jobset_test.go

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323

2424
"github.com/google/go-cmp/cmp"
2525
"github.com/google/go-cmp/cmp/cmpopts"
26+
batchv1 "k8s.io/api/batch/v1"
2627
corev1 "k8s.io/api/core/v1"
2728
apierrors "k8s.io/apimachinery/pkg/api/errors"
2829
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -47,6 +48,173 @@ import (
4748
// TODO: Add tests for all Interfaces.
4849
// REF: https://github.com/kubeflow/trainer/issues/2468
4950

51+
// TestBuild_ImmutableJobSetUpgrade validates the fix for RHOAIENG-48867:
52+
// when a suspended JobSet's spec.replicatedJobs has changed (e.g., after a
53+
// ClusterTrainingRuntime image upgrade), Build() must delete the stale JobSet
54+
// and return nil so the controller recreates it on the next reconcile cycle.
55+
func TestBuild_ImmutableJobSetUpgrade(t *testing.T) {
56+
const (
57+
oldImage = "registry.example.com/trainer:3.2"
58+
newImage = "registry.example.com/trainer:3.3"
59+
)
60+
61+
makeInfo := func(image string) *runtime.Info {
62+
return &runtime.Info{
63+
Scheduler: &runtime.Scheduler{},
64+
TemplateSpec: runtime.TemplateSpec{
65+
PodSets: []runtime.PodSet{
66+
{
67+
Name: constants.Node,
68+
Count: ptr.To[int32](1),
69+
Containers: make([]runtime.Container, 1),
70+
},
71+
},
72+
ObjApply: jobsetv1alpha2ac.JobSetSpec().
73+
WithReplicatedJobs(
74+
jobsetv1alpha2ac.ReplicatedJob().
75+
WithName(constants.Node).
76+
WithTemplate(batchv1ac.JobTemplateSpec().
77+
WithSpec(batchv1ac.JobSpec().
78+
WithParallelism(1).
79+
WithCompletions(1).
80+
WithTemplate(corev1ac.PodTemplateSpec().
81+
WithSpec(corev1ac.PodSpec().
82+
WithContainers(
83+
corev1ac.Container().
84+
WithName(constants.Node).
85+
WithImage(image),
86+
),
87+
),
88+
),
89+
),
90+
),
91+
),
92+
},
93+
}
94+
}
95+
96+
makeSuspendedJobSet := func(image string) *jobsetv1alpha2.JobSet {
97+
return &jobsetv1alpha2.JobSet{
98+
ObjectMeta: metav1.ObjectMeta{
99+
Name: "test",
100+
Namespace: metav1.NamespaceDefault,
101+
},
102+
Spec: jobsetv1alpha2.JobSetSpec{
103+
Suspend: ptr.To(true),
104+
ReplicatedJobs: []jobsetv1alpha2.ReplicatedJob{
105+
{
106+
Name: constants.Node,
107+
Template: batchv1.JobTemplateSpec{
108+
Spec: batchv1.JobSpec{
109+
Template: corev1.PodTemplateSpec{
110+
Spec: corev1.PodSpec{
111+
Containers: []corev1.Container{
112+
{Name: constants.Node, Image: image},
113+
},
114+
},
115+
},
116+
},
117+
},
118+
},
119+
},
120+
},
121+
}
122+
}
123+
124+
cases := map[string]struct {
125+
trainJob *trainer.TrainJob
126+
existingJobSet *jobsetv1alpha2.JobSet
127+
info *runtime.Info
128+
wantNilResult bool // true if Build() should return no apply configs
129+
wantJobSetDeleted bool // true if the existing JobSet should be deleted
130+
wantError error
131+
}{
132+
"suspended JobSet with changed image is deleted on upgrade (RHOAIENG-48867)": {
133+
trainJob: utiltesting.MakeTrainJobWrapper(metav1.NamespaceDefault, "test").
134+
Suspend(false).
135+
Obj(),
136+
existingJobSet: makeSuspendedJobSet(oldImage),
137+
info: makeInfo(newImage),
138+
wantNilResult: true,
139+
wantJobSetDeleted: true,
140+
},
141+
"suspended JobSet with unchanged image is not deleted": {
142+
trainJob: utiltesting.MakeTrainJobWrapper(metav1.NamespaceDefault, "test").
143+
Suspend(false).
144+
Obj(),
145+
existingJobSet: makeSuspendedJobSet(newImage),
146+
info: makeInfo(newImage),
147+
wantNilResult: false,
148+
wantJobSetDeleted: false,
149+
},
150+
"running JobSet (not suspended) is not touched when TrainJob is also running": {
151+
trainJob: utiltesting.MakeTrainJobWrapper(metav1.NamespaceDefault, "test").
152+
Suspend(false).
153+
Obj(),
154+
existingJobSet: func() *jobsetv1alpha2.JobSet {
155+
js := makeSuspendedJobSet(oldImage)
156+
js.Spec.Suspend = ptr.To(false)
157+
return js
158+
}(),
159+
info: makeInfo(newImage),
160+
wantNilResult: true, // existing guard skips update when both are running
161+
wantJobSetDeleted: false,
162+
},
163+
"no existing JobSet results in a new apply config": {
164+
trainJob: utiltesting.MakeTrainJobWrapper(metav1.NamespaceDefault, "test").
165+
Suspend(false).
166+
Obj(),
167+
existingJobSet: nil,
168+
info: makeInfo(newImage),
169+
wantNilResult: false,
170+
wantJobSetDeleted: false,
171+
},
172+
}
173+
174+
for name, tc := range cases {
175+
t.Run(name, func(t *testing.T) {
176+
_, ctx := ktesting.NewTestContext(t)
177+
var cancel func()
178+
ctx, cancel = context.WithCancel(ctx)
179+
t.Cleanup(cancel)
180+
181+
clientBuilder := utiltesting.NewClientBuilder()
182+
if tc.existingJobSet != nil {
183+
clientBuilder = clientBuilder.WithObjects(tc.existingJobSet)
184+
}
185+
cli := clientBuilder.Build()
186+
187+
p, err := New(ctx, cli, nil)
188+
if err != nil {
189+
t.Fatalf("Failed to initialize JobSet plugin: %v", err)
190+
}
191+
192+
objs, err := p.(framework.ComponentBuilderPlugin).Build(ctx, tc.info, tc.trainJob)
193+
if diff := cmp.Diff(tc.wantError, err, cmpopts.EquateErrors()); diff != "" {
194+
t.Errorf("Unexpected error (-want,+got):\n%s", diff)
195+
}
196+
if tc.wantNilResult && len(objs) != 0 {
197+
t.Errorf("Expected Build() to return no apply configs, got %d", len(objs))
198+
}
199+
if !tc.wantNilResult && len(objs) == 0 {
200+
t.Errorf("Expected Build() to return apply configs, got none")
201+
}
202+
// Verify whether the existing JobSet was deleted.
203+
if tc.existingJobSet != nil {
204+
js := &jobsetv1alpha2.JobSet{}
205+
getErr := cli.Get(ctx, client.ObjectKeyFromObject(tc.existingJobSet), js)
206+
wasDeleted := apierrors.IsNotFound(getErr)
207+
if tc.wantJobSetDeleted && !wasDeleted {
208+
t.Errorf("Expected existing JobSet to be deleted, but it still exists")
209+
}
210+
if !tc.wantJobSetDeleted && wasDeleted {
211+
t.Errorf("Expected existing JobSet to remain, but it was deleted")
212+
}
213+
}
214+
})
215+
}
216+
}
217+
50218
func TestJobSet(t *testing.T) {
51219
cases := map[string]struct {
52220
trainJob *trainer.TrainJob

0 commit comments

Comments
 (0)