Skip to content

Commit 78d9acf

Browse files
naiming-zededaeriknordmark
authored andcommitted
Fix some issues related to Kube Cluster lease election
- use atomic bool for some items which can be accessed by multiple goroutines - have various mechanism to retry, in the case of fail to access api-server, fail to keep the renew deadline, etc. - publish the KubeLeaseInfo() for debugging, otherwise it is very hard to figure out if the lease status is correct or not Signed-off-by: Naiming Shen <naiming@zededa.com>
1 parent abf4f4f commit 78d9acf

File tree

6 files changed

+216
-143
lines changed

6 files changed

+216
-143
lines changed

pkg/pillar/agentbase/apptracker.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -106,20 +106,20 @@ func GetApplicationInfo(rootRun, persistStatus, persistKubelog, AppUUID string)
106106
appInfo = append(appInfo, OrderedAppInfoItem{Key: structName, Value: ai})
107107
}
108108

109-
// 3) Get the KubeLeaseInfo from zedkuube, it may not exist
109+
// 3) Get the KubeLeaderElectInfo from zedkuube, it may not exist
110110
var structName2 string
111-
structName = "zedkube-KubeLeaseInfo"
112-
leaseInfo := &types.KubeLeaseInfo{}
113-
if structName2, err = readJSONFile(rootRun, structName, "global", leaseInfo); err != nil {
111+
structName = "zedkube-KubeLeaderElectInfo"
112+
leaderInfo := &types.KubeLeaderElectInfo{}
113+
if structName2, err = readJSONFile(rootRun, structName, "global", leaderInfo); err != nil {
114114
appInfo = appendFailedItem(appInfo, structName, "global", err)
115115
} else {
116116
ai = AppInfoItems{ // ENClusterAppStatus
117117
UUID: "global",
118-
Name: leaseInfo.LeaderIdentity,
118+
Name: leaderInfo.LeaderIdentity,
119119
Exist: true,
120-
State: fmt.Sprintf("Is Stats Leader %v", leaseInfo.IsStatsLeader),
121-
Activated: boolToTriState(leaseInfo.InLeaseElection),
122-
Content: fmt.Sprintf("Last time updated at %v", leaseInfo.LatestChange.UTC().Format(time.RFC3339)),
120+
State: fmt.Sprintf("Is Stats Leader %v", leaderInfo.IsStatsLeader),
121+
Activated: boolToTriState(leaderInfo.InLeaderElection),
122+
Content: fmt.Sprintf("Last time updated at %v", leaderInfo.LatestChange.UTC().Format(time.RFC3339)),
123123
}
124124
appInfo = append(appInfo, OrderedAppInfoItem{Key: structName2, Value: ai})
125125
}

pkg/pillar/cmd/zedkube/kubestatscollect.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import (
2222
func (z *zedkube) collectKubeStats() {
2323
// we are the elected leader, start collecting kube stats
2424
// regardless if we are in cluster or single node mode
25-
if z.isKubeStatsLeader {
25+
if z.isKubeStatsLeader.Load() {
2626
log.Functionf("collectKubeStats: Started collecting kube stats")
2727

2828
clientset, err := getKubeClientSet()
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
// Copyright (c) 2024-2025 Zededa, Inc.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
//go:build kubevirt
5+
6+
package zedkube
7+
8+
import (
9+
"context"
10+
"time"
11+
12+
"github.com/lf-edge/eve/pkg/pillar/kubeapi"
13+
"github.com/lf-edge/eve/pkg/pillar/types"
14+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
15+
"k8s.io/client-go/tools/leaderelection"
16+
"k8s.io/client-go/tools/leaderelection/resourcelock"
17+
)
18+
19+
func (z *zedkube) handleLeaderElection() {
20+
var cancelFunc context.CancelFunc
21+
// If we can not perform the leader election, due to kubernetes connection issues
22+
// at the moment, we will retry in 5 minutes
23+
retryTimer := time.NewTimer(0)
24+
retryTimer.Stop() // Ensure the timer is stopped initially
25+
retryTimerStarted := false
26+
for {
27+
log.Noticef("handleLeaderElection: Waiting for signal") // XXX
28+
select {
29+
case <-z.electionStartCh:
30+
31+
// Create a cancellable context
32+
baseCtx, cancel := context.WithCancel(context.Background())
33+
cancelFunc = cancel
34+
35+
clientset, err := getKubeClientSet()
36+
if err != nil {
37+
z.inKubeLeaderElection.Store(false)
38+
z.publishLeaderElectionChange()
39+
log.Errorf("handleLeaderElection: can't get clientset %v, retry in 5 min", err)
40+
retryTimer.Reset(5 * time.Minute)
41+
retryTimerStarted = true
42+
continue
43+
}
44+
45+
// Create a new lease lock
46+
lock := &resourcelock.LeaseLock{
47+
LeaseMeta: metav1.ObjectMeta{
48+
Name: "eve-kube-stats-leader",
49+
Namespace: kubeapi.EVEKubeNameSpace,
50+
},
51+
Client: clientset.CoordinationV1(),
52+
LockConfig: resourcelock.ResourceLockConfig{
53+
Identity: z.nodeName,
54+
},
55+
}
56+
57+
// Define the leader election configuration
58+
lec := leaderelection.LeaderElectionConfig{
59+
Lock: lock,
60+
LeaseDuration: 300 * time.Second,
61+
RenewDeadline: 180 * time.Second,
62+
RetryPeriod: 15 * time.Second,
63+
ReleaseOnCancel: true,
64+
Callbacks: leaderelection.LeaderCallbacks{
65+
OnStartedLeading: func(baseCtx context.Context) {
66+
z.isKubeStatsLeader.Store(true)
67+
z.publishLeaderElectionChange()
68+
log.Noticef("handleLeaderElection: Callback Started leading")
69+
},
70+
OnStoppedLeading: func() {
71+
z.isKubeStatsLeader.Store(false)
72+
z.publishLeaderElectionChange()
73+
log.Noticef("handleLeaderElection: Callback Stopped leading")
74+
},
75+
OnNewLeader: func(identity string) {
76+
z.leaderIdentity = identity
77+
z.publishLeaderElectionChange()
78+
log.Noticef("handleLeaderElection: Callback New leader elected: %s", identity)
79+
},
80+
},
81+
}
82+
83+
// Start the leader election in a separate goroutine
84+
go func() {
85+
leaderelection.RunOrDie(baseCtx, lec)
86+
z.electionFuncRunning.Store(false)
87+
log.Noticef("handleLeaderElection: Leader election routine exited")
88+
if z.inKubeLeaderElection.Load() {
89+
retryTimer.Reset(5 * time.Minute)
90+
retryTimerStarted = true
91+
log.Noticef("handleLeaderElection: We should be inElection, retry in 5 min")
92+
}
93+
z.publishLeaderElectionChange()
94+
}()
95+
z.electionFuncRunning.Store(true)
96+
z.publishLeaderElectionChange()
97+
log.Noticef("handleLeaderElection: Started leader election routine for %s", z.nodeName)
98+
99+
case <-z.electionStopCh:
100+
z.isKubeStatsLeader.Store(false)
101+
z.inKubeLeaderElection.Store(false)
102+
z.leaderIdentity = ""
103+
z.publishLeaderElectionChange()
104+
log.Noticef("handleLeaderElection: Stopped leading signal received")
105+
if retryTimerStarted {
106+
retryTimer.Stop()
107+
retryTimerStarted = false
108+
}
109+
110+
if cancelFunc != nil {
111+
log.Noticef("handleLeaderElection: Stopped. cancelling leader election")
112+
cancelFunc()
113+
cancelFunc = nil
114+
}
115+
116+
case <-retryTimer.C:
117+
log.Noticef("Retrying failed leader election")
118+
sub := z.subZedAgentStatus
119+
items := sub.GetAll()
120+
for _, item := range items {
121+
status := item.(types.ZedAgentStatus)
122+
z.handleControllerStatusChange(&status)
123+
break
124+
}
125+
retryTimerStarted = false
126+
}
127+
}
128+
}
129+
130+
// SignalStartLeaderElection - Function to signal the start of leader election
131+
func (z *zedkube) SignalStartLeaderElection() {
132+
z.inKubeLeaderElection.Store(true)
133+
select {
134+
case z.electionStartCh <- struct{}{}:
135+
log.Noticef("SignalStartLeaderElection: Signal sent successfully")
136+
default:
137+
log.Warningf("SignalStartLeaderElection: Channel is full, signal not sent")
138+
}
139+
}
140+
141+
// SignalStopLeaderElection - Function to signal the stop of leader election
142+
func (z *zedkube) SignalStopLeaderElection() {
143+
select {
144+
case z.electionStopCh <- struct{}{}:
145+
log.Noticef("SignalStopLeaderElection: Signal sent successfully")
146+
default:
147+
log.Warningf("SignalStopLeaderElection: Channel is full, signal not sent")
148+
}
149+
}
150+
151+
func (z *zedkube) handleControllerStatusChange(status *types.ZedAgentStatus) {
152+
configStatus := status.ConfigGetStatus
153+
154+
log.Noticef("handleControllerStatusChange: Leader enter, status %v", configStatus)
155+
switch configStatus {
156+
case types.ConfigGetSuccess, types.ConfigGetReadSaved: // either read success or read from saved config
157+
if !z.inKubeLeaderElection.Load() {
158+
z.SignalStartLeaderElection()
159+
} else {
160+
log.Noticef("handleControllerStatusChange: start. Already in leader election, skip")
161+
}
162+
default:
163+
if z.inKubeLeaderElection.Load() {
164+
z.SignalStopLeaderElection()
165+
} else {
166+
log.Noticef("handleControllerStatusChange: default stop. Not in leader election, skip")
167+
}
168+
}
169+
}
170+
171+
func (z *zedkube) publishLeaderElectionChange() {
172+
// Publish the change in leader
173+
leaderElectinfo := types.KubeLeaderElectInfo{
174+
InLeaderElection: z.inKubeLeaderElection.Load(),
175+
IsStatsLeader: z.isKubeStatsLeader.Load(),
176+
ElectionRunning: z.electionFuncRunning.Load(),
177+
LeaderIdentity: z.leaderIdentity,
178+
LatestChange: time.Now(),
179+
}
180+
z.pubLeaderElectInfo.Publish("global", leaderElectinfo)
181+
}

pkg/pillar/cmd/zedkube/lease.go

Lines changed: 0 additions & 122 deletions
This file was deleted.

pkg/pillar/cmd/zedkube/zedkube.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"net/http"
1010
"strings"
1111
"sync"
12+
"sync/atomic"
1213
"time"
1314

1415
"github.com/lf-edge/eve/pkg/pillar/agentbase"
@@ -63,6 +64,7 @@ type zedkube struct {
6364
pubEdgeNodeClusterStatus pubsub.Publication
6465
pubENClusterAppStatus pubsub.Publication
6566
pubKubeClusterInfo pubsub.Publication
67+
pubLeaderElectInfo pubsub.Publication
6668

6769
subNodeDrainRequestZA pubsub.Subscription
6870
subNodeDrainRequestBoM pubsub.Subscription
@@ -78,8 +80,10 @@ type zedkube struct {
7880
clusterIPIsReady bool
7981
nodeuuid string
8082
nodeName string
81-
isKubeStatsLeader bool
82-
inKubeLeaderElection bool
83+
isKubeStatsLeader atomic.Bool
84+
inKubeLeaderElection atomic.Bool
85+
electionFuncRunning atomic.Bool
86+
leaderIdentity string
8387
electionStartCh chan struct{}
8488
electionStopCh chan struct{}
8589
statusServer *http.Server
@@ -203,6 +207,16 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar
203207
}
204208
zedkubeCtx.pubKubeClusterInfo = pubKubeClusterInfo
205209

210+
pubLeaderElectInfo, err := ps.NewPublication(
211+
pubsub.PublicationOptions{
212+
AgentName: agentName,
213+
TopicType: types.KubeLeaderElectInfo{},
214+
})
215+
if err != nil {
216+
log.Fatal(err)
217+
}
218+
zedkubeCtx.pubLeaderElectInfo = pubLeaderElectInfo
219+
206220
// Look for global config such as log levels
207221
subGlobalConfig, err := ps.NewSubscription(pubsub.SubscriptionOptions{
208222
AgentName: "zedagent",
@@ -267,8 +281,8 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar
267281
subEdgeNodeInfo.Activate()
268282

269283
// start the leader election
270-
zedkubeCtx.electionStartCh = make(chan struct{})
271-
zedkubeCtx.electionStopCh = make(chan struct{})
284+
zedkubeCtx.electionStartCh = make(chan struct{}, 1)
285+
zedkubeCtx.electionStopCh = make(chan struct{}, 1)
272286
go zedkubeCtx.handleLeaderElection()
273287

274288
// Wait for the certs, and nodeInfo which are needed to decrypt the token inside the

0 commit comments

Comments
 (0)