Skip to content

Commit a8765c1

Browse files
Ambient Code Botclaude
andcommitted
feat(controller): emit JobSetRecreating event and add upgrade integration test
- In Reconcile(), detect the transient state where reconcileObjects() succeeds but the JobSet is NotFound and TrainJob is not suspended: this is the cycle after Build() deleted a stale suspended JobSet due to immutable spec.replicatedJobs change (RHOAIENG-48867). Record a Normal "JobSetRecreating" Kubernetes Event so operators can observe the automatic recreation via `kubectl describe trainjob`. - Add integration test "Should delete and recreate stale suspended JobSet when runtime image changes post-upgrade" in test/integration/controller/trainjob_controller_test.go, covering the full end-to-end scenario: pre-upgrade state (suspended TrainJob + suspended JobSet with old image) → runtime image update → TrainJob resumed → JobSet deleted and recreated with new image. Fixes RHOAIENG-48867 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 8e4bc47 commit a8765c1

2 files changed

Lines changed: 109 additions & 0 deletions

File tree

pkg/controller/trainjob_controller.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/go-logr/logr"
2727
corev1 "k8s.io/api/core/v1"
2828
"k8s.io/apimachinery/pkg/api/equality"
29+
apierrors "k8s.io/apimachinery/pkg/api/errors"
2930
"k8s.io/apimachinery/pkg/api/meta"
3031
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3132
"k8s.io/client-go/tools/record"
@@ -137,6 +138,15 @@ func (r *TrainJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
137138
setSuspendedCondition(&trainJob)
138139

139140
if statusErr := setTrainJobStatus(ctx, runtime, &trainJob); statusErr != nil {
141+
// RHOAIENG-48867: When Build() deleted a stale suspended JobSet due to an
142+
// immutable spec.replicatedJobs change (e.g., post-upgrade image update), the
143+
// JobSet is transiently absent. Detect this by: reconcileObjects succeeded (err==nil),
144+
// the TrainJob is not suspended, and the JobSet is NotFound. Record a Normal event
145+
// so operators can observe the automatic recreation.
146+
if err == nil && apierrors.IsNotFound(statusErr) && !ptr.Deref(trainJob.Spec.Suspend, false) {
147+
r.recorder.Event(&trainJob, corev1.EventTypeNormal, "JobSetRecreating",
148+
"Stale suspended JobSet deleted and will be recreated due to runtime spec changes post-upgrade")
149+
}
140150
err = errors.Join(err, statusErr)
141151
}
142152

test/integration/controller/trainjob_controller_test.go

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1436,4 +1436,103 @@ alpha-node-0-1.alpha slots=8
14361436
})
14371437
})
14381438
})
1439+
1440+
// RHOAIENG-48867: Integration test for the upgrade scenario where a suspended
1441+
// TrainJob's ClusterTrainingRuntime image changes, requiring a delete-recreate
1442+
// of the immutable spec.replicatedJobs field in the JobSet.
1443+
ginkgo.When("Reconciling TrainJob after runtime image upgrade (RHOAIENG-48867)", func() {
1444+
var (
1445+
clTrainingRuntime *trainer.ClusterTrainingRuntime
1446+
trainJob *trainer.TrainJob
1447+
trainJobKey client.ObjectKey
1448+
)
1449+
1450+
const (
1451+
preUpgradeImage = "registry.example.com/trainer:pre-upgrade"
1452+
postUpgradeImage = "registry.example.com/trainer:post-upgrade"
1453+
)
1454+
1455+
ginkgo.AfterEach(func() {
1456+
gomega.Expect(k8sClient.DeleteAllOf(ctx, &trainer.TrainJob{}, client.InNamespace(ns.Name))).Should(gomega.Succeed())
1457+
gomega.Expect(k8sClient.DeleteAllOf(ctx, &trainer.ClusterTrainingRuntime{})).Should(gomega.Succeed())
1458+
})
1459+
1460+
ginkgo.BeforeEach(func() {
1461+
clTrainingRuntime = testingutil.MakeClusterTrainingRuntimeWrapper("upgrade-runtime").
1462+
RuntimeSpec(
1463+
testingutil.MakeTrainingRuntimeSpecWrapper(
1464+
testingutil.MakeClusterTrainingRuntimeWrapper("upgrade-runtime").Spec,
1465+
).
1466+
Container(constants.Node, constants.Node, preUpgradeImage, nil, nil, nil).
1467+
Obj(),
1468+
).
1469+
Obj()
1470+
1471+
trainJob = testingutil.MakeTrainJobWrapper(ns.Name, "upgrade-trainjob").
1472+
Suspend(true).
1473+
RuntimeRef(trainer.GroupVersion.WithKind(trainer.ClusterTrainingRuntimeKind), clTrainingRuntime.Name).
1474+
Obj()
1475+
trainJobKey = client.ObjectKeyFromObject(trainJob)
1476+
})
1477+
1478+
ginkgo.It("Should delete and recreate stale suspended JobSet when runtime image changes post-upgrade", func() {
1479+
ginkgo.By("Creating ClusterTrainingRuntime with pre-upgrade image")
1480+
gomega.Expect(k8sClient.Create(ctx, clTrainingRuntime)).Should(gomega.Succeed())
1481+
gomega.Eventually(func(g gomega.Gomega) {
1482+
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(clTrainingRuntime), clTrainingRuntime)).Should(gomega.Succeed())
1483+
}, util.Timeout, util.Interval).Should(gomega.Succeed())
1484+
1485+
ginkgo.By("Creating suspended TrainJob (simulating pre-upgrade state)")
1486+
gomega.Expect(k8sClient.Create(ctx, trainJob)).Should(gomega.Succeed())
1487+
1488+
ginkgo.By("Waiting for suspended JobSet to be created with pre-upgrade image")
1489+
gomega.Eventually(func(g gomega.Gomega) {
1490+
jobSet := &jobsetv1alpha2.JobSet{}
1491+
g.Expect(k8sClient.Get(ctx, trainJobKey, jobSet)).Should(gomega.Succeed())
1492+
g.Expect(ptr.Deref(jobSet.Spec.Suspend, false)).Should(gomega.BeTrue())
1493+
for _, rJob := range jobSet.Spec.ReplicatedJobs {
1494+
if rJob.Name == constants.Node {
1495+
for _, c := range rJob.Template.Spec.Template.Spec.Containers {
1496+
if c.Name == constants.Node {
1497+
g.Expect(c.Image).Should(gomega.Equal(preUpgradeImage))
1498+
}
1499+
}
1500+
}
1501+
}
1502+
}, util.Timeout, util.Interval).Should(gomega.Succeed())
1503+
1504+
ginkgo.By("Simulating upgrade: updating ClusterTrainingRuntime with post-upgrade image")
1505+
gomega.Eventually(func(g gomega.Gomega) {
1506+
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(clTrainingRuntime), clTrainingRuntime)).Should(gomega.Succeed())
1507+
clTrainingRuntime.Spec = testingutil.MakeTrainingRuntimeSpecWrapper(clTrainingRuntime.Spec).
1508+
Container(constants.Node, constants.Node, postUpgradeImage, nil, nil, nil).
1509+
Obj()
1510+
g.Expect(k8sClient.Update(ctx, clTrainingRuntime)).Should(gomega.Succeed())
1511+
}, util.Timeout, util.Interval).Should(gomega.Succeed())
1512+
1513+
ginkgo.By("Resuming TrainJob post-upgrade (simulating Kueue admitting the workload)")
1514+
gomega.Eventually(func(g gomega.Gomega) {
1515+
g.Expect(k8sClient.Get(ctx, trainJobKey, trainJob)).Should(gomega.Succeed())
1516+
trainJob.Spec.Suspend = ptr.To(false)
1517+
g.Expect(k8sClient.Update(ctx, trainJob)).Should(gomega.Succeed())
1518+
}, util.Timeout, util.Interval).Should(gomega.Succeed())
1519+
1520+
ginkgo.By("Verifying the JobSet is recreated with the post-upgrade image and is not suspended")
1521+
gomega.Eventually(func(g gomega.Gomega) {
1522+
jobSet := &jobsetv1alpha2.JobSet{}
1523+
g.Expect(k8sClient.Get(ctx, trainJobKey, jobSet)).Should(gomega.Succeed())
1524+
g.Expect(ptr.Deref(jobSet.Spec.Suspend, false)).Should(gomega.BeFalse())
1525+
for _, rJob := range jobSet.Spec.ReplicatedJobs {
1526+
if rJob.Name == constants.Node {
1527+
for _, c := range rJob.Template.Spec.Template.Spec.Containers {
1528+
if c.Name == constants.Node {
1529+
g.Expect(c.Image).Should(gomega.Equal(postUpgradeImage),
1530+
"JobSet should have post-upgrade image after delete-recreate")
1531+
}
1532+
}
1533+
}
1534+
}
1535+
}, util.Timeout, util.Interval).Should(gomega.Succeed())
1536+
})
1537+
})
14391538
})

0 commit comments

Comments
 (0)