Skip to content

Commit 755dae7

Browse files
authored
Leader election health check for probes (#25)
1 parent 7257411 commit 755dae7

File tree

5 files changed

+62
-39
lines changed

5 files changed

+62
-39
lines changed

config/config.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,9 @@ func Get() Config {
5757
if cfg.Log.Level == 0 {
5858
cfg.Log.Level = int(logrus.InfoLevel)
5959
}
60-
60+
if cfg.PprofPort == 0 {
61+
cfg.PprofPort = 6060
62+
}
6163
if cfg.API.Key == "" {
6264
required("API_KEY")
6365
}

config/config_test.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ func TestConfig(t *testing.T) {
1212
require.NoError(t, os.Setenv("API_URL", "api.cast.ai"))
1313
require.NoError(t, os.Setenv("KUBECONFIG", "~/.kube/config"))
1414
require.NoError(t, os.Setenv("CLUSTER_ID", "c1"))
15-
require.NoError(t, os.Setenv("PPROF_PORT", "6060"))
1615
require.NoError(t, os.Setenv("LEADER_ELECTION_ENABLED", "true"))
1716
require.NoError(t, os.Setenv("LEADER_ELECTION_NAMESPACE", "castai-agent"))
1817
require.NoError(t, os.Setenv("LEADER_ELECTION_LOCK_NAME", "castai-cluster-controller"))
@@ -23,7 +22,6 @@ func TestConfig(t *testing.T) {
2322
require.Equal(t, "api.cast.ai", cfg.API.URL)
2423
require.Equal(t, "~/.kube/config", cfg.Kubeconfig)
2524
require.Equal(t, "c1", cfg.ClusterID)
26-
require.Equal(t, 6060, cfg.PprofPort)
2725
require.Equal(t, true, cfg.LeaderElection.Enabled)
2826
require.Equal(t, "castai-agent", cfg.LeaderElection.Namespace)
2927
require.Equal(t, "castai-cluster-controller", cfg.LeaderElection.LockName)

go.mod

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ require (
1515
helm.sh/helm/v3 v3.8.1
1616
k8s.io/api v0.23.4
1717
k8s.io/apimachinery v0.23.4
18+
k8s.io/apiserver v0.23.4
1819
k8s.io/client-go v0.23.4
1920
sigs.k8s.io/controller-runtime v0.8.3
2021
)
@@ -31,6 +32,7 @@ require (
3132
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
3233
github.com/asaskevich/govalidator v0.0.0-20200428143746-21a406dcc535 // indirect
3334
github.com/beorn7/perks v1.0.1 // indirect
35+
github.com/blang/semver v3.5.1+incompatible // indirect
3436
github.com/cespare/xxhash/v2 v2.1.2 // indirect
3537
github.com/chai2010/gettext-go v0.0.0-20160711120539-c6fed771bfd5 // indirect
3638
github.com/containerd/containerd v1.5.9 // indirect
@@ -43,6 +45,7 @@ require (
4345
github.com/docker/go-connections v0.4.0 // indirect
4446
github.com/docker/go-metrics v0.0.1 // indirect
4547
github.com/docker/go-units v0.4.0 // indirect
48+
github.com/emicklei/go-restful v2.9.5+incompatible // indirect
4649
github.com/evanphx/json-patch v4.12.0+incompatible // indirect
4750
github.com/exponent-io/jsonpath v0.0.0-20151013193312-d6023ce2651d // indirect
4851
github.com/fatih/color v1.13.0 // indirect
@@ -133,7 +136,6 @@ require (
133136
gopkg.in/yaml.v2 v2.4.0 // indirect
134137
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
135138
k8s.io/apiextensions-apiserver v0.23.4 // indirect
136-
k8s.io/apiserver v0.23.4 // indirect
137139
k8s.io/cli-runtime v0.23.4 // indirect
138140
k8s.io/component-base v0.23.4 // indirect
139141
k8s.io/klog/v2 v2.30.0 // indirect

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ github.com/bits-and-blooms/bitset v1.2.0/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edY
159159
github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84=
160160
github.com/bketelsen/crypt v0.0.4/go.mod h1:aI6NrJ0pMGgvZKL1iVgXLnfIFJtfV+bKCoqOes/6LfM=
161161
github.com/blang/semver v3.1.0+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk=
162+
github.com/blang/semver v3.5.1+incompatible h1:cQNTCjp13qL8KC3Nbxr/y2Bqb63oX6wdnnjpJbkM4JQ=
162163
github.com/blang/semver v3.5.1+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk=
163164
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
164165
github.com/bshuster-repo/logrus-logstash-hook v0.4.1/go.mod h1:zsTqEiSzDgAa/8GZR7E1qaXrhYNDKBYy5/dWPTIflbk=
@@ -376,6 +377,7 @@ github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25Kn
376377
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153 h1:yUdfgN0XgIJw7foRItutHYUIhlcKzcSf5vDpdhQAKTc=
377378
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
378379
github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
380+
github.com/emicklei/go-restful v2.9.5+incompatible h1:spTtZBk5DYEvbxMVutUuTyh1Ao2r4iyvLdACqsl/Ljk=
379381
github.com/emicklei/go-restful v2.9.5+incompatible/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
380382
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
381383
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=

main.go

Lines changed: 54 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,14 @@ import (
66
"fmt"
77
"io/ioutil"
88
"net/http"
9-
_ "net/http/pprof"
9+
"net/http/pprof"
1010
"os"
1111
"time"
1212

1313
"github.com/google/uuid"
1414
"github.com/sirupsen/logrus"
1515
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
16+
"k8s.io/apiserver/pkg/server/healthz"
1617
"k8s.io/client-go/kubernetes"
1718
"k8s.io/client-go/rest"
1819
"k8s.io/client-go/tools/clientcmd"
@@ -112,16 +113,6 @@ func run(
112113
})
113114
log.Infof("running castai-cluster-controller version %v", binVersion)
114115

115-
if cfg.PprofPort != 0 {
116-
go func() {
117-
addr := fmt.Sprintf(":%d", cfg.PprofPort)
118-
log.Infof("starting pprof server on %s", addr)
119-
if err := http.ListenAndServe(addr, http.DefaultServeMux); err != nil {
120-
log.Errorf("failed to start pprof http server: %v", err)
121-
}
122-
}()
123-
}
124-
125116
actionsConfig := actions.Config{
126117
PollWaitInterval: 5 * time.Second,
127118
PollTimeout: 5 * time.Minute,
@@ -139,44 +130,62 @@ func run(
139130
helmClient,
140131
)
141132

133+
httpMux := http.NewServeMux()
134+
var checks []healthz.HealthChecker
135+
var leaderHealthCheck *leaderelection.HealthzAdaptor
142136
if cfg.LeaderElection.Enabled {
143-
lock, err := newLeaseLock(clientset, cfg.LeaderElection.LockName, cfg.LeaderElection.Namespace)
144-
if err != nil {
145-
return err
137+
leaderHealthCheck = leaderelection.NewLeaderHealthzAdaptor(time.Minute * 2)
138+
checks = append(checks, leaderHealthCheck)
139+
}
140+
healthz.InstallHandler(httpMux, checks...)
141+
installPprofHandlers(httpMux)
142+
143+
// Start http server for pprof and health checks handlers.
144+
go func() {
145+
addr := fmt.Sprintf(":%d", cfg.PprofPort)
146+
log.Infof("starting pprof server on %s", addr)
147+
148+
if err := http.ListenAndServe(addr, httpMux); err != nil {
149+
log.Errorf("failed to start pprof http server: %v", err)
146150
}
151+
}()
152+
153+
if cfg.LeaderElection.Enabled {
147154
// Run actions service with leader election. Blocks.
148-
runWithLeaderElection(ctx, log, lock, svc.Run)
149-
return nil
155+
return runWithLeaderElection(ctx, log, clientset, leaderHealthCheck, cfg.LeaderElection, svc.Run)
150156
}
151157

152158
// Run action service. Blocks.
153159
svc.Run(ctx)
154160
return nil
155161
}
156162

157-
func newLeaseLock(client kubernetes.Interface, lockName, lockNamespace string) (*resourcelock.LeaseLock, error) {
163+
func runWithLeaderElection(
164+
ctx context.Context,
165+
log logrus.FieldLogger,
166+
clientset kubernetes.Interface,
167+
watchDog *leaderelection.HealthzAdaptor,
168+
cfg config.LeaderElection,
169+
runFunc func(ctx context.Context),
170+
) error {
158171
id, err := os.Hostname()
159172
if err != nil {
160-
return nil, fmt.Errorf("failed to determine hostname used in leader ID: %w", err)
173+
return fmt.Errorf("failed to determine hostname used in leader ID: %w", err)
161174
}
162175
id = id + "_" + uuid.New().String()
163176

164-
return &resourcelock.LeaseLock{
165-
LeaseMeta: metav1.ObjectMeta{
166-
Name: lockName,
167-
Namespace: lockNamespace,
168-
},
169-
Client: client.CoordinationV1(),
170-
LockConfig: resourcelock.ResourceLockConfig{
171-
Identity: id,
172-
},
173-
}, nil
174-
}
175-
176-
func runWithLeaderElection(ctx context.Context, log logrus.FieldLogger, lock *resourcelock.LeaseLock, runFunc func(ctx context.Context)) {
177177
// Start the leader election code loop
178178
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
179-
Lock: lock,
179+
Lock: &resourcelock.LeaseLock{
180+
LeaseMeta: metav1.ObjectMeta{
181+
Name: cfg.LockName,
182+
Namespace: cfg.Namespace,
183+
},
184+
Client: clientset.CoordinationV1(),
185+
LockConfig: resourcelock.ResourceLockConfig{
186+
Identity: id,
187+
},
188+
},
180189
// IMPORTANT: you MUST ensure that any code you have that
181190
// is protected by the lease must terminate **before**
182191
// you call cancel. Otherwise, you could have a background
@@ -187,25 +196,35 @@ func runWithLeaderElection(ctx context.Context, log logrus.FieldLogger, lock *re
187196
LeaseDuration: 60 * time.Second,
188197
RenewDeadline: 15 * time.Second,
189198
RetryPeriod: 5 * time.Second,
199+
WatchDog: watchDog,
190200
Callbacks: leaderelection.LeaderCallbacks{
191201
OnStartedLeading: func(ctx context.Context) {
192-
log.Infof("started leader: %s", lock.Identity())
202+
log.Infof("started leader: %s", id)
193203
runFunc(ctx)
194204
},
195205
OnStoppedLeading: func() {
196-
log.Infof("leader lost: %s", lock.Identity())
206+
log.Infof("leader lost: %s", id)
197207
os.Exit(0)
198208
},
199209
OnNewLeader: func(identity string) {
200210
// We're notified when new leader elected.
201-
if identity == lock.Identity() {
211+
if identity == id {
202212
// I just got the lock.
203213
return
204214
}
205215
log.Infof("new leader elected: %s", identity)
206216
},
207217
},
208218
})
219+
return nil
220+
}
221+
222+
func installPprofHandlers(mux *http.ServeMux) {
223+
mux.HandleFunc("/debug/pprof/", pprof.Index)
224+
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
225+
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
226+
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
227+
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
209228
}
210229

211230
func kubeConfigFromEnv() (*rest.Config, error) {

0 commit comments

Comments
 (0)