11package main
22
33import (
4+ "context"
45 "flag"
56 "fmt"
67 "os"
@@ -10,10 +11,13 @@ import (
1011 "github.com/fsnotify/fsnotify"
1112 "github.com/go-co-op/gocron/v2"
1213 corev1 "k8s.io/api/core/v1"
14+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1315 "k8s.io/client-go/kubernetes"
1416 "k8s.io/client-go/kubernetes/scheme"
1517 typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
1618 "k8s.io/client-go/rest"
19+ "k8s.io/client-go/tools/leaderelection"
20+ "k8s.io/client-go/tools/leaderelection/resourcelock"
1721 "k8s.io/client-go/tools/record"
1822
1923 nadclient "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/clientset/versioned"
@@ -30,18 +34,21 @@ const (
3034 allNamespaces = ""
3135 controllerName = "pod-ip-controlloop"
3236 reconcilerCronConfiguration = "/cron-schedule/config"
37+ reconcilerLeaderLeaseName = "whereabouts-reconciler-lock"
38+ defaultWhereaboutsNamespace = "kube-system"
3339)
3440
3541const (
3642 _ int = iota
3743 couldNotCreateController
38- cronSchedulerCreationError
39- fileWatcherError
40- couldNotCreateConfigWatcherError
44+ couldNotInitializeReconcilerLeaderElection
4145)
4246
4347const (
44- defaultLogLevel = "debug"
48+ defaultLogLevel = "debug"
49+ reconcilerLeaderLeaseDuration = 15 * time .Second
50+ reconcilerLeaderRenewDeadline = 10 * time .Second
51+ reconcilerLeaderRetryPeriod = 2 * time .Second
4552)
4653
4754func main () {
@@ -66,53 +73,186 @@ func main() {
6673 networkController .Start (stopChan )
6774 defer networkController .Shutdown ()
6875
69- s , err := gocron .NewScheduler (gocron .WithLocation (time .UTC ))
76+ leaderElectionCtx , cancelLeaderElection := context .WithCancel (context .Background ())
77+ defer cancelLeaderElection ()
78+ go runReconcilerLeaderElectionLoop (leaderElectionCtx , errorChan )
79+
80+ for {
81+ select {
82+ case <- stopChan :
83+ logging .Verbosef ("shutting down network controller" )
84+ cancelLeaderElection ()
85+ return
86+ case err := <- errorChan :
87+ if err == nil {
88+ logging .Verbosef ("reconciler success" )
89+ } else {
90+ logging .Verbosef ("reconciler failure: %s" , err )
91+ }
92+ }
93+ }
94+ }
95+
96+ func runReconcilerLeaderElectionLoop (ctx context.Context , errorChan chan error ) {
97+ namespace := whereaboutsNamespace ()
98+ identity := reconcilerLeaderIdentity ()
99+
100+ cfg , err := rest .InClusterConfig ()
70101 if err != nil {
71- os .Exit (cronSchedulerCreationError )
102+ _ = logging .Errorf ("failed to generate in-cluster config for reconciler leader election: %v" , err )
103+ os .Exit (couldNotInitializeReconcilerLeaderElection )
104+ }
105+
106+ k8sClientSet , err := kubernetes .NewForConfig (cfg )
107+ if err != nil {
108+ _ = logging .Errorf ("failed to create kubernetes client for reconciler leader election: %v" , err )
109+ os .Exit (couldNotInitializeReconcilerLeaderElection )
110+ }
111+
112+ if ctx .Err () != nil {
113+ return
114+ }
115+
116+ electionCtx , cancelElection := context .WithCancel (ctx )
117+ err = runReconcilerLeaderElection (
118+ electionCtx ,
119+ k8sClientSet ,
120+ namespace ,
121+ identity ,
122+ errorChan ,
123+ cancelElection ,
124+ )
125+ cancelElection ()
126+ if err != nil {
127+ errorChan <- err
128+ }
129+ }
130+
131+ func runReconcilerLeaderElection (
132+ ctx context.Context ,
133+ k8sClientSet kubernetes.Interface ,
134+ namespace string ,
135+ identity string ,
136+ errorChan chan error ,
137+ cancelElection context.CancelFunc ,
138+ ) error {
139+ leaseLock := & resourcelock.LeaseLock {
140+ LeaseMeta : metav1.ObjectMeta {
141+ Name : reconcilerLeaderLeaseName ,
142+ Namespace : namespace ,
143+ },
144+ Client : k8sClientSet .CoordinationV1 (),
145+ LockConfig : resourcelock.ResourceLockConfig {
146+ Identity : identity ,
147+ },
148+ }
149+
150+ leaderElector , err := leaderelection .NewLeaderElector (leaderelection.LeaderElectionConfig {
151+ Lock : leaseLock ,
152+ LeaseDuration : reconcilerLeaderLeaseDuration ,
153+ RenewDeadline : reconcilerLeaderRenewDeadline ,
154+ RetryPeriod : reconcilerLeaderRetryPeriod ,
155+ ReleaseOnCancel : true ,
156+ Name : reconcilerLeaderLeaseName ,
157+ Callbacks : leaderelection.LeaderCallbacks {
158+ OnStartedLeading : func (leadingCtx context.Context ) {
159+ logging .Verbosef ("acquired reconciler leadership (%s/%s) as %q" , namespace , reconcilerLeaderLeaseName , identity )
160+ if err := runScheduledReconciler (leadingCtx , errorChan ); err != nil {
161+ errorChan <- err
162+ cancelElection ()
163+ }
164+ },
165+ OnStoppedLeading : func () {
166+ logging .Verbosef ("lost reconciler leadership (%s/%s)" , namespace , reconcilerLeaderLeaseName )
167+ },
168+ OnNewLeader : func (currentLeader string ) {
169+ if currentLeader == identity {
170+ logging .Verbosef ("this pod is reconciler leader: %q" , currentLeader )
171+ return
172+ }
173+ logging .Verbosef ("reconciler leader is now: %q" , currentLeader )
174+ },
175+ },
176+ })
177+ if err != nil {
178+ return fmt .Errorf ("failed to create reconciler leader elector: %w" , err )
179+ }
180+
181+ logging .Verbosef ("starting reconciler leader election (%s/%s) with identity %q" , namespace , reconcilerLeaderLeaseName , identity )
182+ leaderElector .Run (ctx )
183+ return nil
184+ }
185+
186+ func runScheduledReconciler (ctx context.Context , errorChan chan error ) error {
187+ scheduler , err := gocron .NewScheduler (gocron .WithLocation (time .UTC ))
188+ if err != nil {
189+ return fmt .Errorf ("failed to create reconciler cron scheduler: %w" , err )
72190 }
73191
74192 watcher , err := fsnotify .NewWatcher ()
75193 if err != nil {
76- _ = logging .Errorf ("error creating configuration watcher: %v" , err )
77- os .Exit (fileWatcherError )
194+ if shutdownErr := scheduler .Shutdown (); shutdownErr != nil {
195+ _ = logging .Errorf ("failed to shutdown reconciler scheduler after watcher initialization error: %v" , shutdownErr )
196+ }
197+ return fmt .Errorf ("error creating reconciler configuration watcher: %w" , err )
78198 }
79- defer watcher .Close ()
199+ defer func () {
200+ if closeErr := watcher .Close (); closeErr != nil {
201+ _ = logging .Errorf ("error closing reconciler configuration watcher: %v" , closeErr )
202+ }
203+ }()
80204
81205 reconcilerConfigWatcher , err := reconciler .NewConfigWatcher (
82206 reconcilerCronConfiguration ,
83- s ,
207+ scheduler ,
84208 watcher ,
85209 func () {
86210 reconciler .ReconcileIPs (errorChan )
87211 },
88212 )
89213 if err != nil {
90- os .Exit (couldNotCreateConfigWatcherError )
214+ if shutdownErr := scheduler .Shutdown (); shutdownErr != nil {
215+ _ = logging .Errorf ("failed to shutdown reconciler scheduler after config watcher error: %v" , shutdownErr )
216+ }
217+ return fmt .Errorf ("could not create reconciler config watcher: %w" , err )
91218 }
92- s .Start ()
93219
220+ scheduler .Start ()
94221 const reconcilerConfigMntFile = "/cron-schedule/..data"
95- p := func (e fsnotify.Event ) bool {
96- return e .Name == reconcilerConfigMntFile && e .Op & fsnotify .Create == fsnotify .Create
222+ reconcilerConfigWatcher .SyncConfiguration (func (event fsnotify.Event ) bool {
223+ return event .Name == reconcilerConfigMntFile && event .Op & fsnotify .Create == fsnotify .Create
224+ })
225+
226+ logging .Verbosef ("scheduled reconciler started" )
227+ <- ctx .Done ()
228+ logging .Verbosef ("scheduled reconciler stopping" )
229+
230+ if err := scheduler .Shutdown (); err != nil {
231+ _ = logging .Errorf ("error shutting reconciler scheduler: %v" , err )
97232 }
98- reconcilerConfigWatcher .SyncConfiguration (p )
99233
100- for {
101- select {
102- case <- stopChan :
103- logging .Verbosef ("shutting down network controller" )
104- if err := s .Shutdown (); err != nil {
105- _ = logging .Errorf ("error shutting : %v" , err )
106- }
107- return
108- case err := <- errorChan :
109- if err == nil {
110- logging .Verbosef ("reconciler success" )
111- } else {
112- logging .Verbosef ("reconciler failure: %s" , err )
113- }
114- }
234+ return nil
235+ }
236+
237+ func whereaboutsNamespace () string {
238+ if namespace , found := os .LookupEnv ("WHEREABOUTS_NAMESPACE" ); found && namespace != "" {
239+ return namespace
240+ }
241+ return defaultWhereaboutsNamespace
242+ }
243+
244+ func reconcilerLeaderIdentity () string {
245+ if podName , found := os .LookupEnv ("POD_NAME" ); found && podName != "" {
246+ return podName
247+ }
248+ hostname , err := os .Hostname ()
249+ if err == nil && hostname != "" {
250+ return hostname
251+ }
252+ if nodeName , found := os .LookupEnv ("NODENAME" ); found && nodeName != "" {
253+ return fmt .Sprintf ("%s-%d" , nodeName , os .Getpid ())
115254 }
255+ return fmt .Sprintf ("%s-%d" , reconcilerLeaderLeaseName , os .Getpid ())
116256}
117257
118258func handleSignals (stopChannel chan struct {}, signals ... os.Signal ) {
0 commit comments