Skip to content

Commit 1728875

Browse files
authored
fix/concurrency-issue (#139)
* use k8s lease to solve concurrent assignment issue * increase lease duration * Refactor lease duration from int32 to int and update lease acquisition logic
1 parent 376454b commit 1728875

File tree

8 files changed

+327
-13
lines changed

8 files changed

+327
-13
lines changed

cmd/main.go

+31-5
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"github.com/doitintl/kubeip/internal/address"
1111
"github.com/doitintl/kubeip/internal/config"
12+
"github.com/doitintl/kubeip/internal/lease"
1213
nd "github.com/doitintl/kubeip/internal/node"
1314
"github.com/doitintl/kubeip/internal/types"
1415
"github.com/pkg/errors"
@@ -23,8 +24,10 @@ import (
2324
type contextKey string
2425

2526
const (
26-
developModeKey contextKey = "develop-mode"
27-
unassignTimeout = 5 * time.Minute
27+
developModeKey contextKey = "develop-mode"
28+
unassignTimeout = 5 * time.Minute
29+
kubeipLockName = "kubeip-lock"
30+
defaultLeaseDuration = 5
2831
)
2932

3033
var (
@@ -79,14 +82,17 @@ func prepareLogger(level string, json bool) *logrus.Entry {
7982
return log
8083
}
8184

82-
func assignAddress(c context.Context, log *logrus.Entry, assigner address.Assigner, node *types.Node, cfg *config.Config) error {
85+
func assignAddress(c context.Context, log *logrus.Entry, client kubernetes.Interface, assigner address.Assigner, node *types.Node, cfg *config.Config) error {
8386
ctx, cancel := context.WithCancel(c)
8487
defer cancel()
8588

8689
// ticker for retry interval
8790
ticker := time.NewTicker(cfg.RetryInterval)
8891
defer ticker.Stop()
8992

93+
// create new cluster wide lock
94+
lock := lease.NewKubeLeaseLock(client, kubeipLockName, "default", node.Instance, cfg.LeaseDuration)
95+
9096
for retryCounter := 0; retryCounter <= cfg.RetryAttempts; retryCounter++ {
9197
log.WithFields(logrus.Fields{
9298
"node": node.Name,
@@ -95,7 +101,20 @@ func assignAddress(c context.Context, log *logrus.Entry, assigner address.Assign
95101
"retry-counter": retryCounter,
96102
"retry-attempts": cfg.RetryAttempts,
97103
}).Debug("assigning static public IP address to node")
98-
err := assigner.Assign(ctx, node.Instance, node.Zone, cfg.Filter, cfg.OrderBy)
104+
err := func(ctx context.Context) error {
105+
if err := lock.Lock(ctx); err != nil {
106+
return errors.Wrap(err, "failed to acquire lock")
107+
}
108+
log.Debug("lock acquired")
109+
defer func() {
110+
lock.Unlock(ctx) //nolint:errcheck
111+
log.Debug("lock released")
112+
}()
113+
if err := assigner.Assign(ctx, node.Instance, node.Zone, cfg.Filter, cfg.OrderBy); err != nil {
114+
return err //nolint:wrapcheck
115+
}
116+
return nil
117+
}(c)
99118
if err == nil || errors.Is(err, address.ErrStaticIPAlreadyAssigned) {
100119
return nil
101120
}
@@ -152,7 +171,7 @@ func run(c context.Context, log *logrus.Entry, cfg *config.Config) error {
152171
errorCh := make(chan error, 1) // buffered channel to avoid goroutine leak
153172
go func() {
154173
defer close(errorCh) // close the channel when the goroutine exits to avoid goroutine leak
155-
e := assignAddress(ctx, log, assigner, n, cfg)
174+
e := assignAddress(ctx, log, clientset, assigner, n, cfg)
156175
if e != nil {
157176
errorCh <- e
158177
}
@@ -267,6 +286,13 @@ func main() {
267286
EnvVars: []string{"RETRY_ATTEMPTS"},
268287
Category: "Configuration",
269288
},
289+
&cli.IntFlag{
290+
Name: "lease-duration",
291+
Usage: "duration of the kubernetes lease",
292+
Value: defaultLeaseDuration,
293+
EnvVars: []string{"LEASE_DURATION"},
294+
Category: "Configuration",
295+
},
270296
&cli.BoolFlag{
271297
Name: "release-on-exit",
272298
Usage: "release the static public IP address on exit",

cmd/main_test.go

+8-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
mocks "github.com/doitintl/kubeip/mocks/address"
1212
"github.com/pkg/errors"
1313
tmock "github.com/stretchr/testify/mock"
14+
"k8s.io/client-go/kubernetes/fake"
1415
)
1516

1617
func Test_assignAddress(t *testing.T) {
@@ -45,6 +46,7 @@ func Test_assignAddress(t *testing.T) {
4546
OrderBy: "test-order-by",
4647
RetryAttempts: 3,
4748
RetryInterval: time.Millisecond,
49+
LeaseDuration: 1,
4850
},
4951
},
5052
},
@@ -70,6 +72,7 @@ func Test_assignAddress(t *testing.T) {
7072
OrderBy: "test-order-by",
7173
RetryAttempts: 3,
7274
RetryInterval: time.Millisecond,
75+
LeaseDuration: 1,
7376
},
7477
},
7578
},
@@ -93,6 +96,7 @@ func Test_assignAddress(t *testing.T) {
9396
OrderBy: "test-order-by",
9497
RetryAttempts: 3,
9598
RetryInterval: time.Millisecond,
99+
LeaseDuration: 1,
96100
},
97101
},
98102
wantErr: true,
@@ -125,6 +129,7 @@ func Test_assignAddress(t *testing.T) {
125129
OrderBy: "test-order-by",
126130
RetryAttempts: 10,
127131
RetryInterval: 5 * time.Millisecond,
132+
LeaseDuration: 1,
128133
},
129134
},
130135
wantErr: true,
@@ -152,6 +157,7 @@ func Test_assignAddress(t *testing.T) {
152157
OrderBy: "test-order-by",
153158
RetryAttempts: 3,
154159
RetryInterval: 15 * time.Millisecond,
160+
LeaseDuration: 1,
155161
},
156162
},
157163
wantErr: true,
@@ -161,7 +167,8 @@ func Test_assignAddress(t *testing.T) {
161167
t.Run(tt.name, func(t *testing.T) {
162168
log := prepareLogger("debug", false)
163169
assigner := tt.args.assignerFn(t)
164-
if err := assignAddress(tt.args.c, log, assigner, tt.args.node, tt.args.cfg); (err != nil) != tt.wantErr {
170+
client := fake.NewSimpleClientset()
171+
if err := assignAddress(tt.args.c, log, client, assigner, tt.args.node, tt.args.cfg); (err != nil) != tt.wantErr {
165172
t.Errorf("assignAddress() error = %v, wantErr %v", err, tt.wantErr)
166173
}
167174
})

examples/aws/eks.tf

+5
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,11 @@ resource "kubernetes_cluster_role" "kubeip_cluster_role" {
167167
resources = ["nodes"]
168168
verbs = ["get"]
169169
}
170+
rule {
171+
api_groups = ["coordination.k8s.io"]
172+
resources = ["leases"]
173+
verbs = ["create", "delete", "get", "list", "update"]
174+
}
170175
depends_on = [
171176
kubernetes_service_account.kubeip_service_account,
172177
module.eks

examples/gcp/gke.tf

+9
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,11 @@ resource "kubernetes_cluster_role" "kubeip_cluster_role" {
227227
resources = ["nodes"]
228228
verbs = ["get"]
229229
}
230+
rule {
231+
api_groups = ["coordination.k8s.io"]
232+
resources = ["leases"]
233+
verbs = ["create", "delete", "get", "list", "update"]
234+
}
230235
depends_on = [
231236
kubernetes_service_account.kubeip_service_account,
232237
google_container_cluster.kubeip_cluster
@@ -303,6 +308,10 @@ resource "kubernetes_daemonset" "kubeip_daemonset" {
303308
name = "LOG_JSON"
304309
value = "true"
305310
}
311+
env {
312+
name = "LEASE_DURATION"
313+
value = "20"
314+
}
306315
resources {
307316
requests = {
308317
cpu = "100m"

internal/address/gcp.go

-7
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package address
33
import (
44
"context"
55
"fmt"
6-
"math/rand"
76
"strings"
87
"time"
98

@@ -27,7 +26,6 @@ const (
2726
accessConfigKind = "compute#accessConfig"
2827
defaultPrefixLength = 96
2928
maxRetries = 10 // number of retries for assigning ephemeral public IP address
30-
maxWaitListTime = 10 // max time to wait before listing addresses
3129
)
3230

3331
var (
@@ -223,11 +221,6 @@ func (a *gcpAssigner) Assign(ctx context.Context, instanceID, zone string, filte
223221
return errors.Wrapf(err, "check if static public IP is already assigned to instance %s", instanceID)
224222
}
225223

226-
// add random sleep to reduce the chance of multiple kubeip instances getting the same address list
227-
waitTime := time.Duration(rand.Intn(maxWaitListTime)) * time.Second //nolint:gosec
228-
a.logger.WithField("waitTime", waitTime).Debug("waiting before listing addresses")
229-
time.Sleep(waitTime)
230-
231224
// get available reserved public IP addresses
232225
addresses, err := a.listAddresses(filter, orderBy, reservedStatus)
233226
if err != nil {

internal/config/config.go

+3
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ type Config struct {
2929
RetryAttempts int `json:"retry-attempts"`
3030
// ReleaseOnExit releases the IP address on exit
3131
ReleaseOnExit bool `json:"release-on-exit"`
32+
// LeaseDuration is the duration of the kubernetes lease
33+
LeaseDuration int `json:"lease-duration"`
3234
}
3335

3436
func NewConfig(c *cli.Context) *Config {
@@ -44,5 +46,6 @@ func NewConfig(c *cli.Context) *Config {
4446
cfg.Region = c.String("region")
4547
cfg.IPv6 = c.Bool("ipv6")
4648
cfg.ReleaseOnExit = c.Bool("release-on-exit")
49+
cfg.LeaseDuration = c.Int("lease-duration")
4750
return &cfg
4851
}

internal/lease/lock.go

+135
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
package lease
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
coordinationv1 "k8s.io/api/coordination/v1"
8+
"k8s.io/apimachinery/pkg/api/errors"
9+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
10+
"k8s.io/apimachinery/pkg/util/wait"
11+
"k8s.io/client-go/kubernetes"
12+
"k8s.io/utils/ptr"
13+
)
14+
15+
type KubeLock interface {
16+
Lock(ctx context.Context) error
17+
Unlock(ctx context.Context) error
18+
}
19+
20+
type kubeLeaseLock struct {
21+
client kubernetes.Interface
22+
leaseName string
23+
namespace string
24+
holderIdentity string
25+
leaseDuration int // seconds
26+
cancelFunc context.CancelFunc
27+
}
28+
29+
func NewKubeLeaseLock(client kubernetes.Interface, leaseName, namespace, holderIdentity string, leaseDurationSeconds int) KubeLock {
30+
return &kubeLeaseLock{
31+
client: client,
32+
leaseName: leaseName,
33+
namespace: namespace,
34+
holderIdentity: holderIdentity,
35+
leaseDuration: leaseDurationSeconds,
36+
}
37+
}
38+
39+
func (k *kubeLeaseLock) Lock(ctx context.Context) error {
40+
backoff := wait.Backoff{
41+
Duration: time.Second, // start with 1 second
42+
Factor: 1.5, //nolint:gomnd // multiply by 1.5 on each retry
43+
Jitter: 0.5, //nolint:gomnd // add 50% jitter to wait time on each retry
44+
Steps: 100, //nolint:gomnd // retry 100 times
45+
Cap: time.Hour, // but never wait more than 1 hour
46+
}
47+
48+
return wait.ExponentialBackoff(backoff, func() (bool, error) { //nolint:wrapcheck
49+
timestamp := metav1.MicroTime{Time: time.Now()}
50+
lease := &coordinationv1.Lease{
51+
ObjectMeta: metav1.ObjectMeta{
52+
Name: k.leaseName,
53+
Namespace: k.namespace,
54+
},
55+
Spec: coordinationv1.LeaseSpec{
56+
HolderIdentity: &k.holderIdentity,
57+
LeaseDurationSeconds: ptr.To(int32(k.leaseDuration)),
58+
AcquireTime: &timestamp,
59+
RenewTime: &timestamp,
60+
},
61+
}
62+
63+
_, err := k.client.CoordinationV1().Leases(k.namespace).Create(ctx, lease, metav1.CreateOptions{})
64+
if err != nil {
65+
if errors.IsAlreadyExists(err) {
66+
// If the lease already exists, check if it's held by another holder
67+
existingLease, getErr := k.client.CoordinationV1().Leases(k.namespace).Get(ctx, k.leaseName, metav1.GetOptions{})
68+
if getErr != nil {
69+
return false, getErr //nolint:wrapcheck
70+
}
71+
// check if the lease is expired
72+
if existingLease.Spec.RenewTime != nil && time.Since(existingLease.Spec.RenewTime.Time) > time.Duration(k.leaseDuration)*time.Second {
73+
// If the lease is expired, delete it and retry
74+
delErr := k.client.CoordinationV1().Leases(k.namespace).Delete(ctx, k.leaseName, metav1.DeleteOptions{})
75+
if delErr != nil {
76+
return false, delErr //nolint:wrapcheck
77+
}
78+
return false, nil
79+
}
80+
// check if the lease is held by another holder
81+
if existingLease.Spec.HolderIdentity != nil && *existingLease.Spec.HolderIdentity != k.holderIdentity {
82+
// If the lease is held by another holder, return false to retry
83+
return false, nil
84+
}
85+
return true, nil
86+
}
87+
return false, err //nolint:wrapcheck
88+
}
89+
90+
// Create a child context with cancellation
91+
ctx, k.cancelFunc = context.WithCancel(ctx)
92+
go k.renewLeasePeriodically(ctx)
93+
94+
return true, nil
95+
})
96+
}
97+
98+
func (k *kubeLeaseLock) renewLeasePeriodically(ctx context.Context) {
99+
// let's renew the lease every 1/2 of the lease duration; use milliseconds for ticker
100+
ticker := time.NewTicker(time.Duration(k.leaseDuration*500) * time.Millisecond) //nolint:gomnd
101+
defer ticker.Stop()
102+
103+
for {
104+
select {
105+
case <-ticker.C:
106+
lease, err := k.client.CoordinationV1().Leases(k.namespace).Get(ctx, k.leaseName, metav1.GetOptions{})
107+
if err != nil || lease.Spec.HolderIdentity == nil || *lease.Spec.HolderIdentity != k.holderIdentity {
108+
return
109+
}
110+
111+
lease.Spec.RenewTime = &metav1.MicroTime{Time: time.Now()}
112+
k.client.CoordinationV1().Leases(k.namespace).Update(ctx, lease, metav1.UpdateOptions{}) //nolint:errcheck
113+
case <-ctx.Done():
114+
// Exit the goroutine when the context is cancelled
115+
return
116+
}
117+
}
118+
}
119+
120+
func (k *kubeLeaseLock) Unlock(ctx context.Context) error {
121+
// Call the cancel function to stop the lease renewal process
122+
if k.cancelFunc != nil {
123+
k.cancelFunc()
124+
}
125+
lease, err := k.client.CoordinationV1().Leases(k.namespace).Get(ctx, k.leaseName, metav1.GetOptions{})
126+
if err != nil {
127+
return err //nolint:wrapcheck
128+
}
129+
130+
if lease.Spec.HolderIdentity == nil || *lease.Spec.HolderIdentity != k.holderIdentity {
131+
return nil
132+
}
133+
134+
return k.client.CoordinationV1().Leases(k.namespace).Delete(ctx, k.leaseName, metav1.DeleteOptions{}) //nolint:wrapcheck
135+
}

0 commit comments

Comments
 (0)