Skip to content

Commit d222017

Browse files
authored
improve signal termination cleanup (#137)
1 parent dd8775d commit d222017

File tree

3 files changed

+65
-27
lines changed

3 files changed

+65
-27
lines changed

cmd/main.go

+24-26
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ import (
2323
type contextKey string
2424

2525
const (
26-
developModeKey contextKey = "develop-mode"
26+
developModeKey contextKey = "develop-mode"
27+
unassignTimeout = 5 * time.Minute
2728
)
2829

2930
var (
@@ -81,36 +82,29 @@ func prepareLogger(level string, json bool) *logrus.Entry {
8182
func assignAddress(c context.Context, log *logrus.Entry, assigner address.Assigner, node *types.Node, cfg *config.Config) error {
8283
ctx, cancel := context.WithCancel(c)
8384
defer cancel()
84-
// retry counter
85-
retryCounter := 0
85+
8686
// ticker for retry interval
8787
ticker := time.NewTicker(cfg.RetryInterval)
8888
defer ticker.Stop()
8989

90-
for {
90+
for retryCounter := 0; retryCounter <= cfg.RetryAttempts; retryCounter++ {
9191
err := assigner.Assign(ctx, node.Instance, node.Zone, cfg.Filter, cfg.OrderBy)
92-
if err != nil && errors.Is(err, address.ErrStaticIPAlreadyAssigned) {
93-
log.Infof("static public IP address already assigned to node instance %s", node.Instance)
92+
if err == nil || errors.Is(err, address.ErrStaticIPAlreadyAssigned) {
9493
return nil
9594
}
96-
if err != nil {
97-
log.WithError(err).Errorf("failed to assign static public IP address to node %s", node.Name)
98-
if retryCounter < cfg.RetryAttempts {
99-
retryCounter++
100-
log.Infof("retrying after %v", cfg.RetryInterval)
101-
} else {
102-
log.Infof("reached maximum number of retries (%d)", cfg.RetryAttempts)
103-
return errors.Wrap(err, "reached maximum number of retries")
104-
}
105-
select {
106-
case <-ticker.C:
107-
continue
108-
case <-ctx.Done():
109-
return errors.Wrap(err, "context is done")
110-
}
95+
96+
log.WithError(err).Errorf("failed to assign static public IP address to node %s", node.Name)
97+
log.Infof("retrying after %v", cfg.RetryInterval)
98+
99+
select {
100+
case <-ticker.C:
101+
continue
102+
case <-ctx.Done():
103+
// If the context is done, return an error indicating that the operation was cancelled
104+
return errors.Wrap(ctx.Err(), "context cancelled while assigning addresses")
111105
}
112-
return nil
113106
}
107+
return errors.New("reached maximum number of retries")
114108
}
115109

116110
func run(c context.Context, log *logrus.Entry, cfg *config.Config) error {
@@ -145,8 +139,9 @@ func run(c context.Context, log *logrus.Entry, cfg *config.Config) error {
145139
return errors.Wrap(err, "initializing assigner")
146140
}
147141
// assign static public IP address
148-
errorCh := make(chan error)
142+
errorCh := make(chan error, 1) // buffered channel to avoid goroutine leak
149143
go func() {
144+
defer close(errorCh) // close the channel when the goroutine exits to avoid goroutine leak
150145
e := assignAddress(ctx, log, assigner, n, cfg)
151146
if e != nil {
152147
errorCh <- e
@@ -159,13 +154,16 @@ func run(c context.Context, log *logrus.Entry, cfg *config.Config) error {
159154
return errors.Wrap(err, "assigning static public IP address")
160155
}
161156
case <-ctx.Done():
162-
log.Infof("kubeip agent stopped")
157+
log.Infof("kubeip agent gracefully stopped")
163158
if cfg.ReleaseOnExit {
164159
log.Infof("releasing static public IP address")
160+
releaseCtx, releaseCancel := context.WithTimeout(context.Background(), unassignTimeout) // release the static public IP address within 5 minutes
161+
defer releaseCancel()
165162
// use a different context for releasing the static public IP address since the main context is canceled
166-
if err = assigner.Unassign(context.Background(), n.Instance, n.Zone); err != nil { //nolint:contextcheck
167-
return errors.Wrap(err, "releasing static public IP address")
163+
if err = assigner.Unassign(releaseCtx, n.Instance, n.Zone); err != nil { //nolint:contextcheck
164+
return errors.Wrap(err, "failed to release static public IP address")
168165
}
166+
log.Infof("static public IP address released")
169167
}
170168
}
171169

cmd/main_test.go

+32
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,38 @@ func Test_assignAddress(t *testing.T) {
9797
},
9898
wantErr: true,
9999
},
100+
{
101+
name: "context cancelled while assigning addresses",
102+
args: args{
103+
c: func() context.Context {
104+
ctx, cancel := context.WithCancel(context.Background())
105+
go func() {
106+
// Simulate a shutdown signal being received after a short delay
107+
time.Sleep(20 * time.Millisecond)
108+
cancel()
109+
}()
110+
return ctx
111+
}(),
112+
assignerFn: func(t *testing.T) address.Assigner {
113+
mock := mocks.NewAssigner(t)
114+
mock.EXPECT().Assign(tmock.Anything, "test-instance", "test-zone", []string{"test-filter"}, "test-order-by").Return(errors.New("error")).Maybe()
115+
return mock
116+
},
117+
node: &types.Node{
118+
Name: "test-node",
119+
Instance: "test-instance",
120+
Region: "test-region",
121+
Zone: "test-zone",
122+
},
123+
cfg: &config.Config{
124+
Filter: []string{"test-filter"},
125+
OrderBy: "test-order-by",
126+
RetryAttempts: 10,
127+
RetryInterval: 5 * time.Millisecond,
128+
},
129+
},
130+
wantErr: true,
131+
},
100132
{
101133
name: "error after a few retries and context is done",
102134
args: args{

internal/address/gcp.go

+9-1
Original file line numberDiff line numberDiff line change
@@ -246,8 +246,12 @@ func (a *gcpAssigner) Assign(ctx context.Context, instanceID, zone string, filte
246246
// try to assign all available addresses until one succeeds
247247
// due to concurrency, it is possible that another kubeip instance will assign the same address
248248
for _, address := range addresses {
249+
// check if context is done before trying to assign an address
250+
if ctx.Err() != nil {
251+
return errors.Wrap(ctx.Err(), "context cancelled while assigning addresses")
252+
}
249253
if err = tryAssignAddress(ctx, a, instance, a.region, zone, address); err != nil {
250-
a.logger.WithError(err).Errorf("failed to assign static public IP address %s", address.Address)
254+
a.logger.WithError(err).WithField("address", address.Address).Error("failed to assign static public IP address")
251255
continue
252256
}
253257
// break the loop after successfully assigning an address
@@ -400,6 +404,10 @@ func (a *gcpAssigner) createUserMap(assigned []*compute.Address) map[string]stru
400404

401405
func retryAddEphemeralAddress(ctx context.Context, logger *logrus.Entry, as internalAssigner, instance *compute.Instance, zone string) error {
402406
for i := 0; i < maxRetries; i++ {
407+
// check if context is done before trying to assign an address
408+
if ctx.Err() != nil {
409+
return errors.Wrap(ctx.Err(), "context cancelled while assigning ephemeral addresses")
410+
}
403411
if err := as.AddInstanceAddress(ctx, instance, zone, nil); err != nil {
404412
logger.WithError(err).Error("failed to assign ephemeral public IP address, retrying")
405413
continue

0 commit comments

Comments
 (0)