Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
5723e46
KUBE-996: add informer for CSR from "kubernetes.io/kubelet-serving"
ValyaB Mar 20, 2025
0400264
lint
ValyaB Mar 20, 2025
601e57d
add logs
ValyaB Mar 21, 2025
600e016
add debug
ValyaB Mar 21, 2025
b5db061
fix cert key in progress
ValyaB Mar 21, 2025
7cf37b3
update toCertificate
ValyaB Mar 21, 2025
cc33413
update toCertificate
ValyaB Mar 21, 2025
ec32520
rename
ValyaB Mar 21, 2025
7d5f9bf
clean
ValyaB Mar 21, 2025
5b2bb7f
clean
ValyaB Mar 21, 2025
93f5b3d
clean
ValyaB Mar 21, 2025
771bf33
update
ValyaB Mar 21, 2025
5c59af9
clean
ValyaB Mar 21, 2025
b9a3652
back to size 1
ValyaB Mar 21, 2025
f4688bc
return error on not supported CSR
ValyaB Mar 25, 2025
e4e45e0
Merge branch 'main' into KUBE-996
ValyaB Mar 25, 2025
42e1131
rename
ValyaB Mar 25, 2025
26fe10f
rename
ValyaB Mar 25, 2025
88242b2
rename
ValyaB Mar 25, 2025
a5e2abe
tests
ValyaB Mar 25, 2025
243033f
validate certificate
ValyaB Mar 25, 2025
d9fdc04
fix
ValyaB Mar 25, 2025
6af0981
revert return err on empty
ValyaB Mar 25, 2025
87f9b41
lint
ValyaB Mar 25, 2025
26b7dfd
back with error on empty
ValyaB Mar 25, 2025
eabc3bb
clean handler action
ValyaB Mar 25, 2025
c0629de
clean handler action
ValyaB Mar 25, 2025
0e01d87
clean and refactor
ValyaB Mar 25, 2025
d9c8486
lint
ValyaB Mar 25, 2025
77dccf2
add logging
ValyaB Mar 25, 2025
e1ec4c1
fix
ValyaB Mar 25, 2025
e0f95c3
tests
ValyaB Mar 26, 2025
7da7545
lint
ValyaB Mar 26, 2025
5c3aaad
upd
ValyaB Mar 26, 2025
76af4db
fix usages
ValyaB Mar 26, 2025
ecb70d0
clean
ValyaB Mar 26, 2025
c2c61de
added tests
ValyaB Mar 26, 2025
3c3aacc
clean original csrName
ValyaB Mar 26, 2025
d32ecc3
clean
ValyaB Mar 26, 2025
94508f8
clean
ValyaB Mar 26, 2025
bfedbe7
lint
ValyaB Mar 26, 2025
aa10490
lint
ValyaB Mar 26, 2025
a722d21
clean
ValyaB Mar 26, 2025
c01fff6
rename
ValyaB Mar 26, 2025
39f6525
add todo
ValyaB Mar 26, 2025
e33c215
fix
ValyaB Mar 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions e2e/suites/gke.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,14 @@ func (ts *gkeTestSuite) Run(ctx context.Context, t *testing.T) {
},
})

r.NoError(err, "failed to add node", err)

t.Cleanup(func() {
if err := cleanupNode(); err != nil {
ts.t.Logf("failed to cleanup node %s: %v", *node.Id, err)
}
})

r.NoError(err)

ts.t.Logf("node %s ready", *node.Id)

r.NoError(backoff.Retry(func() error {
Expand Down
162 changes: 7 additions & 155 deletions internal/actions/approve_csr_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,168 +2,20 @@ package actions

import (
"context"
"errors"
"fmt"
"reflect"
"time"

"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"

"github.com/castai/cluster-controller/internal/actions/csr"
"github.com/castai/cluster-controller/internal/castai"
"github.com/castai/cluster-controller/internal/waitext"
)

const (
approveCSRTimeout = 4 * time.Minute
)

var _ ActionHandler = &ApproveCSRHandler{}

func NewApproveCSRHandler(log logrus.FieldLogger, clientset kubernetes.Interface) *ApproveCSRHandler {
return &ApproveCSRHandler{
log: log,
clientset: clientset,
initialCSRFetchTimeout: 5 * time.Minute,
csrFetchInterval: 5 * time.Second,
}
}

type ApproveCSRHandler struct {
log logrus.FieldLogger
clientset kubernetes.Interface
initialCSRFetchTimeout time.Duration
csrFetchInterval time.Duration
}

func (h *ApproveCSRHandler) Handle(ctx context.Context, action *castai.ClusterAction) error {
req, ok := action.Data().(*castai.ActionApproveCSR)
if !ok {
return newUnexpectedTypeErr(action.Data(), req)
}
log := h.log.WithFields(logrus.Fields{
"node_name": req.NodeName,
"node_id": req.NodeID,
"type": reflect.TypeOf(action.Data().(*castai.ActionApproveCSR)).String(),
ActionIDLogField: action.ID,
})

if req.AllowAutoApprove != nil {
// CSR action may be used only to instruct whether to start / stop watcher responsible for auto-approving; in
// this case, there is nothing more to do.
if req.NodeName == "" {
return nil
}
}

cert, err := h.getInitialNodeCSR(ctx, log, req.NodeName)
if err != nil {
return fmt.Errorf("getting initial csr: %w", err)
}

if cert.Approved() {
log.Debug("csr is already approved")
return nil
}

return h.handleWithRetry(ctx, log, cert)
}

func (h *ApproveCSRHandler) handleWithRetry(ctx context.Context, log *logrus.Entry, cert *csr.Certificate) error {
ctx, cancel := context.WithTimeout(ctx, approveCSRTimeout)
defer cancel()

b := newApproveCSRExponentialBackoff()
return waitext.Retry(
ctx,
b,
waitext.Forever,
func(ctx context.Context) (bool, error) {
return true, h.handle(ctx, log, cert)
},
func(err error) {
log.Warnf("csr approval failed, will retry: %v", err)
},
)
}

func (h *ApproveCSRHandler) handle(ctx context.Context, log logrus.FieldLogger, cert *csr.Certificate) (reterr error) {
// Since this new csr may be denied we need to delete it.
log.Debug("deleting old csr")
if err := cert.DeleteCSR(ctx, h.clientset); err != nil {
return fmt.Errorf("deleting csr: %w", err)
}

// Create a new CSR with the same request data as the original one.
log.Debug("requesting new csr")
newCert, err := cert.NewCSR(ctx, h.clientset)
if err != nil {
return fmt.Errorf("requesting new csr: %w", err)
}
// // TODO clean up after proper handling unknown actions https://castai.atlassian.net/browse/KUBE-1036.

// Approve new csr.
log.Debug("approving new csr")
resp, err := newCert.ApproveCSRCertificate(ctx, h.clientset)
if err != nil {
return fmt.Errorf("approving csr: %w", err)
}
if resp.Approved() {
return nil
}
var _ ActionHandler = &ApproveCSRHandlerDeprecated{}

return errors.New("certificate signing request was not approved")
func NewApproveCSRHandler() *ApproveCSRHandlerDeprecated {
return &ApproveCSRHandlerDeprecated{}
}

func (h *ApproveCSRHandler) getInitialNodeCSR(ctx context.Context, log logrus.FieldLogger, nodeName string) (*csr.Certificate, error) {
log.Debug("getting initial csr")

ctx, cancel := context.WithTimeout(ctx, h.initialCSRFetchTimeout)
defer cancel()

poll := func() (*csr.Certificate, error) {
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(h.csrFetchInterval):
cert, err := csr.GetCertificateByNodeName(ctx, h.clientset, nodeName)
if err != nil && !errors.Is(err, csr.ErrNodeCertificateNotFound) {
return nil, err
}
if cert != nil {
return cert, nil
}
}
}
}

var cert *csr.Certificate
var err error

b := waitext.DefaultExponentialBackoff()
err = waitext.Retry(
ctx,
b,
3,
func(ctx context.Context) (bool, error) {
cert, err = poll()
if errors.Is(err, context.DeadlineExceeded) {
return false, err
}
return true, err
},
func(err error) {
log.Warnf("getting initial csr, will retry: %v", err)
},
)

return cert, err
}
type ApproveCSRHandlerDeprecated struct{}

func newApproveCSRExponentialBackoff() wait.Backoff {
b := waitext.DefaultExponentialBackoff()
b.Factor = 2
return b
func (h *ApproveCSRHandlerDeprecated) Handle(_ context.Context, _ *castai.ClusterAction) error {
return nil
}
Loading
Loading