Skip to content

Commit 8367de8

Browse files
authored
Add leader election (#22)
1 parent b0aaee9 commit 8367de8

File tree

10 files changed

+143
-49
lines changed

10 files changed

+143
-49
lines changed

actions/actions.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ type Config struct {
2626
}
2727

2828
type Service interface {
29-
Run(ctx context.Context) error
29+
Run(ctx context.Context)
3030
}
3131

3232
type ActionHandler interface {
@@ -71,23 +71,23 @@ type service struct {
7171
startedActionsMu sync.Mutex
7272
}
7373

74-
func (s *service) Run(ctx context.Context) error {
74+
func (s *service) Run(ctx context.Context) {
7575
for {
7676
select {
7777
case <-time.After(s.cfg.PollWaitInterval):
7878
err := s.doWork(ctx)
7979
if err != nil {
8080
if errors.Is(err, context.Canceled) {
8181
s.log.Info("service stopped")
82-
return nil
82+
return
8383
}
8484

8585
s.log.Errorf("cycle failed: %v", err)
8686
continue
8787
}
8888
case <-ctx.Done():
8989
s.log.Info("service stopped")
90-
return nil
90+
return
9191
}
9292
}
9393
}

actions/actions_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ func TestActions(t *testing.T) {
8686
r.Equal("a2", ids[1])
8787
r.Equal("a3", ids[2])
8888
}()
89-
r.NoError(svc.Run(ctx))
89+
svc.Run(ctx)
9090
})
9191

9292
t.Run("continue polling on api error", func(t *testing.T) {
@@ -103,7 +103,7 @@ func TestActions(t *testing.T) {
103103

104104
r.Len(client.Acks, 0)
105105
}()
106-
r.NoError(svc.Run(ctx))
106+
svc.Run(ctx)
107107
})
108108

109109
t.Run("ack with error when action handler failed", func(t *testing.T) {
@@ -131,7 +131,7 @@ func TestActions(t *testing.T) {
131131
r.Equal("a1", client.Acks[0].ActionID)
132132
r.Equal("handling action *castai.ActionPatchNode: ups", *client.Acks[0].Err)
133133
}()
134-
r.NoError(svc.Run(ctx))
134+
svc.Run(ctx)
135135
})
136136
}
137137

actions/chart_upsert_handler.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ func (c *chartUpsertHandler) Handle(ctx context.Context, data interface{}) error
4747
_, err := c.helm.Install(ctx, helm.InstallOptions{
4848
ChartSource: &req.ChartSource,
4949
Namespace: req.Namespace,
50+
CreateNamespace: req.CreateNamespace,
5051
ReleaseName: req.ReleaseName,
5152
ValuesOverrides: req.ValuesOverrides,
5253
})

castai/types.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ type ActionChartUpsert struct {
117117
ReleaseName string `json:"releaseName"`
118118
ValuesOverrides map[string]string `json:"valuesOverrides,omitempty"`
119119
ChartSource ChartSource `json:"chartSource"`
120+
CreateNamespace bool `json:"createNamespace"`
120121
}
121122

122123
type ActionChartUninstall struct {

config/config.go

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,27 @@ import (
88
)
99

1010
type Config struct {
11-
Log Log
12-
API API
13-
Kubeconfig string
14-
ClusterID string
15-
PprofPort int
11+
Log Log
12+
API API
13+
Kubeconfig string
14+
ClusterID string
15+
PprofPort int
16+
LeaderElection LeaderElection
1617
}
1718

1819
type Log struct {
1920
Level int
2021
}
2122

2223
type API struct {
23-
Key string
24-
URL string
24+
Key string
25+
URL string
26+
}
27+
28+
type LeaderElection struct {
29+
Enabled bool
30+
Namespace string
31+
LockName string
2532
}
2633

2734
var cfg *Config
@@ -33,14 +40,14 @@ func Get() Config {
3340
}
3441

3542
_ = viper.BindEnv("log.level", "LOG_LEVEL")
36-
3743
_ = viper.BindEnv("api.key", "API_KEY")
3844
_ = viper.BindEnv("api.url", "API_URL")
3945
_ = viper.BindEnv("clusterid", "CLUSTER_ID")
40-
4146
_ = viper.BindEnv("kubeconfig")
42-
4347
_ = viper.BindEnv("pprofport", "PPROF_PORT")
48+
_ = viper.BindEnv("leaderelection.enabled", "LEADER_ELECTION_ENABLED")
49+
_ = viper.BindEnv("leaderelection.namespace", "LEADER_ELECTION_NAMESPACE")
50+
_ = viper.BindEnv("leaderelection.lockname", "LEADER_ELECTION_LOCK_NAME")
4451

4552
cfg = &Config{}
4653
if err := viper.Unmarshal(&cfg); err != nil {
@@ -60,6 +67,14 @@ func Get() Config {
6067
if cfg.ClusterID == "" {
6168
required("CLUSTER_ID")
6269
}
70+
if cfg.LeaderElection.Enabled {
71+
if cfg.LeaderElection.Namespace == "" {
72+
required("LEADER_ELECTION_NAMESPACE")
73+
}
74+
if cfg.LeaderElection.LockName == "" {
75+
required("LEADER_ELECTION_LOCK_NAME")
76+
}
77+
}
6378

6479
return *cfg
6580
}

config/config_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ func TestConfig(t *testing.T) {
1313
require.NoError(t, os.Setenv("KUBECONFIG", "~/.kube/config"))
1414
require.NoError(t, os.Setenv("CLUSTER_ID", "c1"))
1515
require.NoError(t, os.Setenv("PPROF_PORT", "6060"))
16+
require.NoError(t, os.Setenv("LEADER_ELECTION_ENABLED", "true"))
17+
require.NoError(t, os.Setenv("LEADER_ELECTION_NAMESPACE", "castai-agent"))
18+
require.NoError(t, os.Setenv("LEADER_ELECTION_LOCK_NAME", "castai-cluster-controller"))
1619

1720
cfg := Get()
1821

@@ -21,4 +24,7 @@ func TestConfig(t *testing.T) {
2124
require.Equal(t, "~/.kube/config", cfg.Kubeconfig)
2225
require.Equal(t, "c1", cfg.ClusterID)
2326
require.Equal(t, 6060, cfg.PprofPort)
27+
require.Equal(t, true, cfg.LeaderElection.Enabled)
28+
require.Equal(t, "castai-agent", cfg.LeaderElection.Namespace)
29+
require.Equal(t, "castai-cluster-controller", cfg.LeaderElection.LockName)
2430
}

hack/remote/deploy.sh

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,23 +7,22 @@ cd "$(git rev-parse --show-toplevel)"
77

88
# Build bo binary and push docker image.
99
IMAGE_TAG=v0.0.1
10-
GOOS=linux go build -ldflags "-X main.Version=${IMAGE_TAG}" -o bin/castai-cluster-controller .
10+
GOOS=linux GOARCH=amd64 go build -ldflags "-X main.Version=${IMAGE_TAG}" -o bin/castai-cluster-controller .
1111
DOCKER_IMAGE_REPO=europe-west3-docker.pkg.dev/ci-master-mo3d/tilt/$USER/castai-cluster-controller
12-
docker build -t $DOCKER_IMAGE_REPO:$IMAGE_TAG .
13-
docker push $DOCKER_IMAGE_REPO:$IMAGE_TAG
12+
docker build -t "$DOCKER_IMAGE_REPO:$IMAGE_TAG" .
13+
docker push "$DOCKER_IMAGE_REPO:$IMAGE_TAG"
1414

1515
# Install local chart and binary.
1616
LOCAL_CHART_DIR=../gh-helm-charts/charts/castai-cluster-controller
1717
helm upgrade -i cluster-controller $LOCAL_CHART_DIR \
1818
-f ./hack/remote/values.yaml \
1919
--set image.repository="$DOCKER_IMAGE_REPO" \
2020
--set image.tag="$IMAGE_TAG" \
21+
--set aks.enabled=false \
2122
--set serviceAccount.create="true" \
2223
--set castai.apiKey="your-api-key" \
2324
--set castai.apiURL="your-api-url" \
2425
--set castai.clusterID="your-cluster-id" \
25-
--set aks.enabled=false \
26-
--reuse-values \
2726
--history-max=3 \
2827
-n castai-agent
2928

helm/chart_loader.go

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,17 @@ import (
66
"context"
77
"fmt"
88
"net/http"
9+
"strings"
910
"time"
1011

11-
"github.com/castai/cluster-controller/castai"
1212
"github.com/cenkalti/backoff/v4"
1313
"helm.sh/helm/v3/pkg/chart"
1414
"helm.sh/helm/v3/pkg/chart/loader"
1515
"helm.sh/helm/v3/pkg/cli"
1616
"helm.sh/helm/v3/pkg/getter"
1717
"helm.sh/helm/v3/pkg/repo"
18+
19+
"github.com/castai/cluster-controller/castai"
1820
)
1921

2022
type ChartLoader interface {
@@ -32,23 +34,21 @@ type remoteChartLoader struct {
3234
func (cl *remoteChartLoader) Load(ctx context.Context, c *castai.ChartSource) (*chart.Chart, error) {
3335
var res *chart.Chart
3436
err := backoff.Retry(func() error {
35-
index, err := cl.downloadHelmIndex(c.RepoURL)
36-
if err != nil {
37-
return err
38-
}
39-
40-
urls, err := cl.chartURLs(index, c.Name, c.Version)
41-
if err != nil {
42-
return err
43-
}
44-
45-
var archiveResp *http.Response
46-
for _, u := range urls {
47-
archiveResp, err = cl.fetchArchive(ctx, u)
48-
if err == nil {
49-
break
37+
var archiveURL string
38+
if strings.HasSuffix(c.RepoURL, ".tgz") {
39+
archiveURL = c.RepoURL
40+
} else {
41+
index, err := cl.downloadHelmIndex(c.RepoURL)
42+
if err != nil {
43+
return err
44+
}
45+
archiveURL, err = cl.chartURL(index, c.Name, c.Version)
46+
if err != nil {
47+
return err
5048
}
5149
}
50+
51+
archiveResp, err := cl.fetchArchive(ctx, archiveURL)
5252
if err != nil {
5353
return err
5454
}
@@ -109,12 +109,12 @@ func (cl *remoteChartLoader) downloadHelmIndex(repoURL string) (*repo.IndexFile,
109109
return index, nil
110110
}
111111

112-
func (cl *remoteChartLoader) chartURLs(index *repo.IndexFile, name, version string) ([]string, error) {
112+
func (cl *remoteChartLoader) chartURL(index *repo.IndexFile, name, version string) (string, error) {
113113
for _, c := range index.Entries[name] {
114114
if c.Version == version && len(c.URLs) > 0 {
115-
return c.URLs, nil
115+
return c.URLs[0], nil
116116
}
117117
}
118118

119-
return nil, fmt.Errorf("finding chart %q version %q in helm repo index", name, version)
119+
return "", fmt.Errorf("finding chart %q version %q in helm repo index", name, version)
120120
}

helm/client.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
type InstallOptions struct {
2727
ChartSource *castai.ChartSource
2828
Namespace string
29+
CreateNamespace bool
2930
ReleaseName string
3031
ValuesOverrides map[string]string
3132
}
@@ -99,7 +100,7 @@ func (c *client) Install(ctx context.Context, opts InstallOptions) (*release.Rel
99100

100101
install := action.NewInstall(cfg)
101102
install.Namespace = namespace
102-
install.CreateNamespace = true
103+
install.CreateNamespace = opts.CreateNamespace
103104
install.ReleaseName = opts.ReleaseName
104105
install.Timeout = 10 * time.Minute
105106
install.Wait = true // Wait unit all applied resources are running.

0 commit comments

Comments
 (0)