Skip to content

Commit 40ae060

Browse files
fix: add client reader to avoid watch on pods
Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
1 parent 3a77e8b commit 40ae060

3 files changed

Lines changed: 29 additions & 24 deletions

File tree

pkg/controller/setup.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ func SetupControllers(mgr ctrl.Manager, runtimes map[string]runtime.Runtime, opt
4141
}
4242
if err := NewTrainJobReconciler(
4343
mgr.GetClient(),
44+
mgr.GetAPIReader(),
4445
mgr.GetEventRecorderFor("trainer-trainjob-controller"),
4546
runtimes,
4647
WithWatchers(runtimeRec, clRuntimeRec),

pkg/controller/trainjob_controller.go

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,12 @@ type TrainJobWatcher interface {
5252
}
5353

5454
type TrainJobReconciler struct {
55-
log logr.Logger
56-
client client.Client
57-
recorder record.EventRecorder
58-
runtimes map[string]jobruntimes.Runtime
59-
watchers iter.Seq[TrainJobWatcher]
55+
log logr.Logger
56+
client client.Client
57+
apiReader client.Reader
58+
recorder record.EventRecorder
59+
runtimes map[string]jobruntimes.Runtime
60+
watchers iter.Seq[TrainJobWatcher]
6061
}
6162

6263
type TrainJobReconcilerOptions struct {
@@ -74,17 +75,18 @@ func WithWatchers(watchers ...TrainJobWatcher) TrainJobReconcilerOption {
7475
var _ reconcile.Reconciler = (*TrainJobReconciler)(nil)
7576
var _ predicate.TypedPredicate[*trainer.TrainJob] = (*TrainJobReconciler)(nil)
7677

77-
func NewTrainJobReconciler(client client.Client, recorder record.EventRecorder, runtimes map[string]jobruntimes.Runtime, opts ...TrainJobReconcilerOption) *TrainJobReconciler {
78+
func NewTrainJobReconciler(client client.Client, apiReader client.Reader, recorder record.EventRecorder, runtimes map[string]jobruntimes.Runtime, opts ...TrainJobReconcilerOption) *TrainJobReconciler {
7879
options := &TrainJobReconcilerOptions{}
7980
for _, opt := range opts {
8081
opt(options)
8182
}
8283
return &TrainJobReconciler{
83-
log: ctrl.Log.WithName("trainjob-controller"),
84-
client: client,
85-
recorder: recorder,
86-
runtimes: runtimes,
87-
watchers: options.Watchers,
84+
log: ctrl.Log.WithName("trainjob-controller"),
85+
client: client,
86+
apiReader: apiReader,
87+
recorder: recorder,
88+
runtimes: runtimes,
89+
watchers: options.Watchers,
8890
}
8991
}
9092

@@ -141,8 +143,8 @@ func (r *TrainJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
141143
return ctrl.Result{}, errors.Join(err, r.client.Status().Update(ctx, &trainJob))
142144
}
143145

144-
// RHAI progression tracking
145-
result, progressionErr := progression.ReconcileProgression(ctx, r.client, log, &trainJob)
146+
// RHAI progression tracking (use APIReader to avoid pod watches)
147+
result, progressionErr := progression.ReconcileProgression(ctx, r.client, r.apiReader, log, &trainJob)
146148
return result, errors.Join(err, progressionErr)
147149
}
148150

pkg/rhai/progression/progression.go

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,9 @@ func isPodReady(pod *corev1.Pod) bool {
103103
return false
104104
}
105105

106-
func GetPrimaryPod(ctx context.Context, c client.Client, trainJob *trainer.TrainJob) (*corev1.Pod, error) {
106+
// GetPrimaryPod returns the first running and ready pod with an IP for a TrainJob.
107+
// Uses the provided reader (typically APIReader) to avoid setting up pod informers/watchers.
108+
func GetPrimaryPod(ctx context.Context, reader client.Reader, trainJob *trainer.TrainJob) (*corev1.Pod, error) {
107109
// First, try to find the rank 0 pod (primary worker or launcher)
108110
podList := &corev1.PodList{}
109111

@@ -140,7 +142,7 @@ func GetPrimaryPod(ctx context.Context, c client.Client, trainJob *trainer.Train
140142
// Try each label selector pattern
141143
for _, labelSet := range labelSets {
142144
labelSelector := labels.SelectorFromSet(labelSet)
143-
if err := c.List(ctx, podList, &client.ListOptions{
145+
if err := reader.List(ctx, podList, &client.ListOptions{
144146
Namespace: trainJob.Namespace,
145147
LabelSelector: labelSelector,
146148
}); err != nil {
@@ -160,7 +162,7 @@ func GetPrimaryPod(ctx context.Context, c client.Client, trainJob *trainer.Train
160162
labelSelector := labels.SelectorFromSet(labels.Set{
161163
"training.kubeflow.org/job-name": trainJob.Name,
162164
})
163-
if err := c.List(ctx, podList, &client.ListOptions{
165+
if err := reader.List(ctx, podList, &client.ListOptions{
164166
Namespace: trainJob.Namespace,
165167
LabelSelector: labelSelector,
166168
}); err != nil {
@@ -172,7 +174,7 @@ func GetPrimaryPod(ctx context.Context, c client.Client, trainJob *trainer.Train
172174
labelSelector = labels.SelectorFromSet(labels.Set{
173175
"jobset.sigs.k8s.io/jobset-name": trainJob.Name,
174176
})
175-
if err := c.List(ctx, podList, &client.ListOptions{
177+
if err := reader.List(ctx, podList, &client.ListOptions{
176178
Namespace: trainJob.Namespace,
177179
LabelSelector: labelSelector,
178180
}); err != nil {
@@ -390,12 +392,12 @@ func UpdateTrainerStatusAnnotation(trainJob *trainer.TrainJob, status *Annotatio
390392
return nil
391393
}
392394

393-
func PollAndUpdateProgress(ctx context.Context, c client.Client, trainJob *trainer.TrainJob) (bool, error) {
395+
func PollAndUpdateProgress(ctx context.Context, c client.Client, reader client.Reader, trainJob *trainer.TrainJob) (bool, error) {
394396
if !IsProgressionTrackingEnabled(trainJob) {
395397
return false, nil
396398
}
397399

398-
pod, err := GetPrimaryPod(ctx, c, trainJob)
400+
pod, err := GetPrimaryPod(ctx, reader, trainJob)
399401
if err != nil {
400402
return false, fmt.Errorf("primary pod not available: %w", err)
401403
}
@@ -471,13 +473,13 @@ func IsFinalStatusCaptured(trainJob *trainer.TrainJob) bool {
471473
return false
472474
}
473475

474-
func PollAndUpdateFinalProgress(ctx context.Context, c client.Client, trainJob *trainer.TrainJob, completed bool) (bool, error) {
476+
func PollAndUpdateFinalProgress(ctx context.Context, c client.Client, reader client.Reader, trainJob *trainer.TrainJob, completed bool) (bool, error) {
475477
if !IsProgressionTrackingEnabled(trainJob) {
476478
return false, nil
477479
}
478480

479481
// Try to get final metrics from pod if it still exists
480-
pod, err := GetPrimaryPod(ctx, c, trainJob)
482+
pod, err := GetPrimaryPod(ctx, reader, trainJob)
481483
if err == nil {
482484
metricsPort := GetMetricsPort(trainJob)
483485
if status, pollErr := PollTrainingProgress(ctx, pod, metricsPort); pollErr == nil {
@@ -674,7 +676,7 @@ func InjectPreStopHook(podSpec *corev1.PodSpec, trainJob *trainer.TrainJob) erro
674676
// ReconcileProgression handles progression tracking during TrainJob reconciliation.
675677
// Returns ctrl.Result for requeue behavior and any errors encountered.
676678
// This should be called at the end of TrainJob reconciliation when progression tracking is enabled.
677-
func ReconcileProgression(ctx context.Context, c client.Client, log logr.Logger, trainJob *trainer.TrainJob) (ctrl.Result, error) {
679+
func ReconcileProgression(ctx context.Context, c client.Client, reader client.Reader, log logr.Logger, trainJob *trainer.TrainJob) (ctrl.Result, error) {
678680
if !IsProgressionTrackingEnabled(trainJob) {
679681
return ctrl.Result{}, nil
680682
}
@@ -688,7 +690,7 @@ func ReconcileProgression(ctx context.Context, c client.Client, log logr.Logger,
688690

689691
if isRunning {
690692
// Poll metrics while job is running
691-
if _, pollErr := PollAndUpdateProgress(ctx, c, trainJob); pollErr != nil {
693+
if _, pollErr := PollAndUpdateProgress(ctx, c, reader, trainJob); pollErr != nil {
692694
log.V(1).Info("Failed to poll training progress", "error", pollErr)
693695
} else {
694696
log.V(2).Info("Successfully updated training progress")
@@ -702,7 +704,7 @@ func ReconcileProgression(ctx context.Context, c client.Client, log logr.Logger,
702704
if (isCompleted || isFailed) && !IsFinalStatusCaptured(trainJob) {
703705
// Job just completed/failed - capture final metrics
704706
// PreStop hook keeps pod alive, so this should succeed
705-
captured, pollErr := PollAndUpdateFinalProgress(ctx, c, trainJob, isCompleted)
707+
captured, pollErr := PollAndUpdateFinalProgress(ctx, c, reader, trainJob, isCompleted)
706708
if pollErr != nil {
707709
log.V(1).Info("Failed to capture final training progress, will retry", "error", pollErr, "completed", isCompleted)
708710
// Requeue quickly - pod should still be alive in preStop window

0 commit comments

Comments
 (0)