Skip to content

Commit 7d3815c

Browse files
authored
fix: add csr approval retry (#31)
1 parent d560044 commit 7d3815c

File tree

2 files changed

+30
-0
lines changed

2 files changed

+30
-0
lines changed

actions/approve_csr_handler.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"time"
88

9+
"github.com/cenkalti/backoff/v4"
910
"github.com/sirupsen/logrus"
1011
"k8s.io/client-go/kubernetes"
1112

@@ -19,6 +20,8 @@ func newApproveCSRHandler(log logrus.FieldLogger, clientset kubernetes.Interface
1920
clientset: clientset,
2021
csrFetchInterval: 5 * time.Second,
2122
initialCSRFetchTimeout: 5 * time.Minute,
23+
maxRetries: 10,
24+
retryAfter: 1 * time.Second,
2225
}
2326
}
2427

@@ -27,6 +30,8 @@ type approveCSRHandler struct {
2730
clientset kubernetes.Interface
2831
csrFetchInterval time.Duration
2932
initialCSRFetchTimeout time.Duration
33+
maxRetries uint64
34+
retryAfter time.Duration
3035
}
3136

3237
func (h *approveCSRHandler) Handle(ctx context.Context, data interface{}) error {
@@ -37,6 +42,20 @@ func (h *approveCSRHandler) Handle(ctx context.Context, data interface{}) error
3742

3843
log := h.log.WithField("node_name", req.NodeName)
3944

45+
b := backoff.WithContext(
46+
backoff.WithMaxRetries(backoff.NewConstantBackOff(h.retryAfter), h.maxRetries),
47+
ctx,
48+
)
49+
return backoff.RetryNotify(func() error {
50+
return h.handle(ctx, log, req)
51+
}, b, func(err error, duration time.Duration) {
52+
if err != nil {
53+
log.Warnf("csr approval failed, will retry: %v", err)
54+
}
55+
})
56+
}
57+
58+
func (h *approveCSRHandler) handle(ctx context.Context, log logrus.FieldLogger, req *castai.ActionApproveCSR) error {
4059
// First get original csr which is created by kubelet.
4160
log.Debug("getting initial csr")
4261
cert, err := h.getInitialNodeCSR(ctx, req.NodeName)

actions/approve_csr_handler_test.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package actions
22

33
import (
44
"context"
5+
"fmt"
6+
"sync/atomic"
57
"testing"
68
"time"
79

@@ -61,6 +63,7 @@ AiAHVYZXHxxspoV0hcfn2Pdsl89fIPCOFy/K1PqSUR6QNAIgYdt51ZbQt9rgM2BD
6163
}
6264
return
6365
})
66+
var approveCalls int32
6467
client.PrependReactor("update", "certificatesigningrequests", func(action ktest.Action) (handled bool, ret runtime.Object, err error) {
6568
approved := csr.DeepCopy()
6669
approved.Status.Conditions = []certv1.CertificateSigningRequestCondition{
@@ -72,6 +75,12 @@ AiAHVYZXHxxspoV0hcfn2Pdsl89fIPCOFy/K1PqSUR6QNAIgYdt51ZbQt9rgM2BD
7275
Status: v1.ConditionTrue,
7376
},
7477
}
78+
// Simulate failure for some initial calls to test retry.
79+
calls := atomic.LoadInt32(&approveCalls)
80+
if calls < 2 {
81+
atomic.AddInt32(&approveCalls, 1)
82+
return true, approved, fmt.Errorf("ups")
83+
}
7584
return true, approved, nil
7685
})
7786

@@ -80,6 +89,8 @@ AiAHVYZXHxxspoV0hcfn2Pdsl89fIPCOFy/K1PqSUR6QNAIgYdt51ZbQt9rgM2BD
8089
clientset: client,
8190
csrFetchInterval: 1 * time.Millisecond,
8291
initialCSRFetchTimeout: 10 * time.Millisecond,
92+
retryAfter: 100 * time.Millisecond,
93+
maxRetries: 5,
8394
}
8495

8596
ctx := context.Background()

0 commit comments

Comments
 (0)