Skip to content

Commit d4296e7

Browse files
committed
adds an option to specify whether informers should fail on error, which defaults to false to maintain existing compatibility
1 parent 400b9ff commit d4296e7

File tree

3 files changed

+34
-27
lines changed

3 files changed

+34
-27
lines changed

cmd/mpi-operator/app/options/options.go

+5
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ type ServerOption struct {
3838
LockNamespace string
3939
QPS int
4040
Burst int
41+
FailInformersOnErr bool
4142
}
4243

4344
// NewServerOption creates a new CMServer with a default config.
@@ -75,4 +76,8 @@ func (s *ServerOption) AddFlags(fs *flag.FlagSet) {
7576

7677
fs.IntVar(&s.QPS, "kube-api-qps", 5, "QPS indicates the maximum QPS to the master from this client.")
7778
fs.IntVar(&s.Burst, "kube-api-burst", 10, "Maximum burst for throttle.")
79+
80+
fs.BoolVar(&s.FailInformersOnErr, "fail-informers-on-error", false,
81+
`Exit the mpi-operator if it fails to list or watch objects from the API server due to lack of permissions (instead of retrying indefinitely).
82+
Note: This only applies if a list/watch fails with Unauthorized or Forbidden errors.`)
7883
}

cmd/mpi-operator/app/server.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ func Run(opt *options.ServerOption) error {
153153
kubeInformerFactory.Core().V1().Pods(),
154154
kubeInformerFactory.Scheduling().V1().PriorityClasses(),
155155
kubeflowInformerFactory.Kubeflow().V2beta1().MPIJobs(),
156-
namespace, opt.GangSchedulingName)
156+
namespace, opt.GangSchedulingName, opt.FailInformersOnErr)
157157
if err != nil {
158158
klog.Fatalf("Failed to setup the controller")
159159
}

pkg/controller/mpi_job_controller.go

+28-26
Original file line numberDiff line numberDiff line change
@@ -272,10 +272,10 @@ func NewMPIJobController(
272272
podInformer coreinformers.PodInformer,
273273
priorityClassInformer schedulinginformers.PriorityClassInformer,
274274
mpiJobInformer informers.MPIJobInformer,
275-
namespace, gangSchedulingName string) (*MPIJobController, error) {
275+
namespace, gangSchedulingName string, failInformersOnErr bool) (*MPIJobController, error) {
276276
return NewMPIJobControllerWithClock(kubeClient, kubeflowClient, volcanoClient, schedClient,
277277
configMapInformer, secretInformer, serviceInformer, jobInformer, podInformer,
278-
priorityClassInformer, mpiJobInformer, &clock.RealClock{}, namespace, gangSchedulingName)
278+
priorityClassInformer, mpiJobInformer, &clock.RealClock{}, namespace, gangSchedulingName, failInformersOnErr)
279279
}
280280

281281
// NewMPIJobControllerWithClock returns a new MPIJob controller.
@@ -292,7 +292,7 @@ func NewMPIJobControllerWithClock(
292292
priorityClassInformer schedulinginformers.PriorityClassInformer,
293293
mpiJobInformer informers.MPIJobInformer,
294294
clock clock.WithTicker,
295-
namespace, gangSchedulingName string) (*MPIJobController, error) {
295+
namespace, gangSchedulingName string, failInformersOnErr bool) (*MPIJobController, error) {
296296

297297
// Create event broadcaster.
298298
klog.V(4).Info("Creating event broadcaster")
@@ -346,30 +346,32 @@ func NewMPIJobControllerWithClock(
346346

347347
controller.updateStatusHandler = controller.doUpdateJobStatus
348348

349-
// Set up error handlers for informers
350-
klog.Info("Setting up informer error handlers")
351-
informers := map[string]cache.SharedIndexInformer{
352-
"configMapInformer": configMapInformer.Informer(),
353-
"secretInformer": secretInformer.Informer(),
354-
"serviceInformer": serviceInformer.Informer(),
355-
"jobInformer": jobInformer.Informer(),
356-
"podInformer": podInformer.Informer(),
357-
"priorityClassInformer": priorityClassInformer.Informer(),
358-
"mpiJobInformer": mpiJobInformer.Informer(),
359-
}
360-
361-
for name, informer := range informers {
362-
err := informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
363-
// Pipe to default handler first, which just logs the error
364-
cache.DefaultWatchErrorHandler(r, err)
365-
366-
if errors.IsUnauthorized(err) || errors.IsForbidden(err) {
367-
klog.Fatalf("Unable to sync cache for informer %s: %s. Exiting.", name, err)
368-
}
369-
})
349+
// Set up error handlers for informers if asked to do so
350+
if failInformersOnErr {
351+
klog.Info("Setting up informer error handlers")
352+
informers := map[string]cache.SharedInformer{
353+
"configMapInformer": configMapInformer.Informer(),
354+
"secretInformer": secretInformer.Informer(),
355+
"serviceInformer": serviceInformer.Informer(),
356+
"jobInformer": jobInformer.Informer(),
357+
"podInformer": podInformer.Informer(),
358+
"priorityClassInformer": priorityClassInformer.Informer(),
359+
"mpiJobInformer": mpiJobInformer.Informer(),
360+
}
370361

371-
if err != nil {
372-
klog.Fatalf("Unable to set error handler for informer %s: %s. Exiting.", name, err)
362+
for name, informer := range informers {
363+
err := informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
364+
// Pipe to default handler first, which just logs the error
365+
cache.DefaultWatchErrorHandler(r, err)
366+
367+
if errors.IsUnauthorized(err) || errors.IsForbidden(err) {
368+
klog.Fatalf("Unable to sync cache for informer %s: %s. Exiting.", name, err)
369+
}
370+
})
371+
372+
if err != nil {
373+
klog.Fatalf("Unable to set error handler for informer %s: %s. Exiting.", name, err)
374+
}
373375
}
374376
}
375377

0 commit comments

Comments
 (0)