@@ -27,10 +27,14 @@ import (
2727 "path"
2828 "syscall"
2929
30+ "github.com/google/uuid"
3031 "github.com/prometheus/client_golang/prometheus"
3132 "github.com/prometheus/client_golang/prometheus/promhttp"
3233 "github.com/urfave/cli/v2"
3334
35+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
36+ "k8s.io/client-go/tools/leaderelection"
37+ "k8s.io/client-go/tools/leaderelection/resourcelock"
3438 "k8s.io/component-base/logs"
3539 "k8s.io/component-base/metrics/legacyregistry"
3640 "k8s.io/klog/v2"
@@ -56,7 +60,8 @@ const (
5660)
5761
5862type Flags struct {
59- kubeClientConfig pkgflags.KubeClientConfig
63+ kubeClientConfig pkgflags.KubeClientConfig
64+ leaderElectionConfig pkgflags.LeaderElectionConfig
6065
6166 podName string
6267 namespace string
@@ -157,6 +162,7 @@ func newApp() *cli.App {
157162 },
158163 }
159164
165+ cliFlags = append (cliFlags , flags .leaderElectionConfig .Flags ()... )
160166 cliFlags = append (cliFlags , flags .kubeClientConfig .Flags ()... )
161167 cliFlags = append (cliFlags , featureGateConfig .Flags ()... )
162168 cliFlags = append (cliFlags , loggingConfig .Flags ()... )
@@ -217,12 +223,19 @@ func newApp() *cli.App {
217223 controller := NewController (config )
218224 ctx , cancel := context .WithCancel (c .Context )
219225 go func () {
220- errChan <- controller .Run (ctx )
226+ // Fallback to standalone mode if leader election is disabled
227+ if ! config .flags .leaderElectionConfig .Enabled {
228+ klog .Info ("Leader election disabled, starting controller directly" )
229+ errChan <- controller .Run (ctx )
230+ return
231+ }
232+ errChan <- runWithLeaderElection (ctx , config , controller )
221233 }()
222234
223235 for {
224236 select {
225- case <- sigs :
237+ case sig := <- sigs :
238+ klog .InfoS ("Received signal, shutting down" , "signal" , sig .String ())
226239 cancel ()
227240 case err := <- errChan :
228241 cancel ()
@@ -253,6 +266,109 @@ func newApp() *cli.App {
253266 return app
254267}
255268
269+ func runWithLeaderElection (ctx context.Context , config * Config , controller * Controller ) error {
270+ klog .Info ("Leader election enabled" )
271+ // Unique identity: PodName + UUID to prevent conflicts on restarts
272+ id := uuid .New ().String ()
273+ lockID := fmt .Sprintf ("%s-%s" , config .flags .podName , id )
274+ klog .InfoS ("Leader election candidate registered" , "lockID" , lockID ,
275+ "leaseName" , config .flags .leaderElectionConfig .LeaseLockName ,
276+ "leaseNamespace" , config .flags .leaderElectionConfig .LeaseLockNamespace )
277+
278+ // electorCtx controls the lifecycle of the leader election loop
279+ electorCtx , cancelElector := context .WithCancel (ctx )
280+ // Standard defer to ensure resources are cleaned up on function exit
281+ defer cancelElector ()
282+
283+ lock := & resourcelock.LeaseLock {
284+ LeaseMeta : metav1.ObjectMeta {
285+ Name : config .flags .leaderElectionConfig .LeaseLockName ,
286+ Namespace : config .flags .leaderElectionConfig .LeaseLockNamespace ,
287+ },
288+ Client : config .clientsets .Core .CoordinationV1 (),
289+ LockConfig : resourcelock.ResourceLockConfig {
290+ Identity : lockID ,
291+ },
292+ }
293+
294+ controllerErrCh := make (chan error , 1 )
295+ callbacks := leaderelection.LeaderCallbacks {
296+ OnStartedLeading : func (leaderCtx context.Context ) {
297+ klog .InfoS ("Became leader, starting controller" , "lockID" , lockID )
298+
299+ // ARCHITECTURE NOTE:
300+ // We use cancelElector() to ensure that if the controller logic exits
301+ // (either gracefully or with an error), the entire leader election loop
302+ // terminates. This triggers ReleaseOnCancel, clearing the lease holder
303+ // identity and allowing standby replicas to take over immediately.
304+ //
305+ // By returning from run() after elector.Run() finishes, we rely on
306+ // Kubernetes to restart the Pod, ensuring a clean in-memory state
307+ // for the next leadership term.
308+ defer cancelElector ()
309+
310+ // NOTE: Use leaderCtx provided by the callback.
311+ // It is automatically cancelled if leadership is lost.
312+ if err := controller .Run (leaderCtx ); err != nil {
313+ select {
314+ case controllerErrCh <- err :
315+ default :
316+ }
317+ klog .ErrorS (err , "Controller exited with error" , "lockID" , lockID )
318+ } else {
319+ klog .InfoS ("Controller exited gracefully" , "lockID" , lockID )
320+ }
321+ },
322+ OnStoppedLeading : func () {
323+ // ARCHITECTURE NOTE:
324+ // We only log here. The actual shutdown of the controller is handled by the
325+ // cancellation of the leaderCtx passed to OnStartedLeading.
326+ // When leadership is lost, the library cancels that context, triggering
327+ // the controller's graceful shutdown logic.
328+ klog .Warningf ("Stopped leading, lockID: %s" , lockID )
329+ },
330+ OnNewLeader : func (identity string ) {
331+ // OnNewLeader is called when a new leader is observed.
332+ // We ignore the case where the "new" leader is ourselves to avoid
333+ // redundant logs during initial election or re-election.
334+ if identity == lockID {
335+ klog .V (6 ).InfoS ("OnNewLeader callback: observed leader is still ourselves" , "lockID" , lockID )
336+ return
337+ }
338+ klog .InfoS ("New leader elected" , "leader" , identity , "currentCandidate" , lockID )
339+ },
340+ }
341+
342+ elector , err := leaderelection .NewLeaderElector (leaderelection.LeaderElectionConfig {
343+ Lock : lock ,
344+ LeaseDuration : config .flags .leaderElectionConfig .LeaseDuration ,
345+ RenewDeadline : config .flags .leaderElectionConfig .RenewDeadline ,
346+ RetryPeriod : config .flags .leaderElectionConfig .RetryPeriod ,
347+ Name : config .flags .leaderElectionConfig .LeaseLockName ,
348+ Callbacks : callbacks ,
349+ ReleaseOnCancel : true , // Steps down immediately by clearing the Lease holder
350+ })
351+ if err != nil {
352+ return fmt .Errorf ("failed to create leader elector: %w" , err )
353+ }
354+
355+ // Block until electorCtx is cancelled or leadership is lost
356+ klog .InfoS ("Starting leader election loop" , "lockID" , lockID )
357+ elector .Run (electorCtx )
358+
359+ // If exiting due to a controller failure, propagate the error to main
360+ select {
361+ case err := <- controllerErrCh :
362+ if err != nil {
363+ klog .ErrorS (err , "Process exiting due to controller failure" )
364+ return fmt .Errorf ("controller execution failed: %w" , err )
365+ }
366+ default :
367+ }
368+ klog .InfoS ("Leader election loop ended gracefully" , "lockID" , lockID )
369+ return nil
370+ }
371+
256372func SetupHTTPEndpoint (config * Config ) error {
257373 if config .flags .metricsPath != "" {
258374 // To collect metrics data from the metric handler itself, we
0 commit comments