Skip to content

Commit c8d6bcf

Browse files
committed
use shared informer for node status check
1 parent e3b15f1 commit c8d6bcf

File tree

14 files changed

+737
-66
lines changed

14 files changed

+737
-66
lines changed

cmd/controller/run.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525

2626
"github.com/castai/cluster-controller/cmd/utils"
2727
"github.com/castai/cluster-controller/health"
28+
"github.com/castai/cluster-controller/internal/actions"
2829
"github.com/castai/cluster-controller/internal/actions/csr"
2930
"github.com/castai/cluster-controller/internal/castai"
3031
"github.com/castai/cluster-controller/internal/config"
@@ -131,6 +132,22 @@ func runController(
131132

132133
log.Infof("running castai-cluster-controller version %v, log-level: %v", binVersion, logger.Level)
133134

135+
// Create global informer manager if enabled
136+
log.Info("initializing global informer manager...")
137+
informerManager := actions.NewInformerManager(
138+
log,
139+
clientset,
140+
cfg.Informer.ResyncPeriod,
141+
)
142+
143+
// Start informer and wait for cache sync (blocks until synced)
144+
syncCtx, syncCancel := context.WithTimeout(ctx, 30*time.Second)
145+
defer syncCancel()
146+
147+
if err := informerManager.Start(syncCtx); err != nil {
148+
return fmt.Errorf("starting informer manager: %w", err)
149+
}
150+
134151
actionsConfig := controller.Config{
135152
PollWaitInterval: 5 * time.Second,
136153
PollTimeout: maxRequestTimeout,
@@ -153,11 +170,16 @@ func runController(
153170
client,
154171
helmClient,
155172
healthzAction,
173+
informerManager,
156174
)
157175
defer func() {
158176
if err := svc.Close(); err != nil {
159177
log.Errorf("failed to close controller service: %v", err)
160178
}
179+
// Stop informer manager on shutdown
180+
if informerManager != nil {
181+
informerManager.Stop()
182+
}
161183
}()
162184

163185
if cfg.Metrics.ExportEnabled {

cmd/testserver/run.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ func run(ctx context.Context) error {
5151
// Choose scenarios below by adding/removing/etc. instances of scenarios.XXX()
5252
// All scenarios in the list run in parallel (but not necessarily at the same time if preparation takes different time).
5353
testScenarios := []scenarios.TestScenario{
54-
scenarios.CheckNodeDeletedStuck(300, logger),
54+
// scenarios.CheckNodeDeletedStuck(300, logger),
55+
scenarios.CheckNodeStatus(10000, logger),
5556
}
5657

5758
var wg sync.WaitGroup

internal/actions/check_node_status.go

Lines changed: 158 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -7,29 +7,30 @@ import (
77
"reflect"
88
"time"
99

10-
"github.com/samber/lo"
1110
"github.com/sirupsen/logrus"
1211
corev1 "k8s.io/api/core/v1"
13-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
12+
k8serrors "k8s.io/apimachinery/pkg/api/errors"
1413
"k8s.io/client-go/kubernetes"
15-
"k8s.io/client-go/kubernetes/typed/core/v1"
14+
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
15+
"k8s.io/client-go/tools/cache"
1616

1717
"github.com/castai/cluster-controller/internal/castai"
18-
"github.com/castai/cluster-controller/internal/waitext"
1918
)
2019

2120
var _ ActionHandler = &CheckNodeStatusHandler{}
2221

23-
func NewCheckNodeStatusHandler(log logrus.FieldLogger, clientset kubernetes.Interface) *CheckNodeStatusHandler {
22+
func NewCheckNodeStatusHandler(log logrus.FieldLogger, clientset kubernetes.Interface, informerManager *InformerManager) *CheckNodeStatusHandler {
2423
return &CheckNodeStatusHandler{
25-
log: log,
26-
clientset: clientset,
24+
log: log,
25+
clientset: clientset,
26+
informerManager: informerManager,
2727
}
2828
}
2929

3030
type CheckNodeStatusHandler struct {
31-
log logrus.FieldLogger
32-
clientset kubernetes.Interface
31+
log logrus.FieldLogger
32+
clientset kubernetes.Interface
33+
informerManager *InformerManager
3334
}
3435

3536
func (h *CheckNodeStatusHandler) Handle(ctx context.Context, action *castai.ClusterAction) error {
@@ -50,7 +51,6 @@ func (h *CheckNodeStatusHandler) Handle(ctx context.Context, action *castai.Clus
5051
ActionIDLogField: action.ID,
5152
})
5253

53-
log.Info("checking status of node")
5454
if req.NodeName == "" ||
5555
(req.NodeID == "" && req.ProviderId == "") {
5656
return fmt.Errorf("node name or node ID/provider ID is empty %w", errAction)
@@ -77,18 +77,7 @@ func (h *CheckNodeStatusHandler) checkNodeDeleted(ctx context.Context, log *logr
7777
ctx, cancel := context.WithTimeout(ctx, time.Duration(timeout)*time.Second)
7878
defer cancel()
7979

80-
b := waitext.DefaultExponentialBackoff()
81-
return waitext.Retry(
82-
ctx,
83-
b,
84-
waitext.Forever,
85-
func(ctx context.Context) (bool, error) {
86-
return checkNodeDeleted(ctx, h.clientset.CoreV1().Nodes(), req.NodeName, req.NodeID, req.ProviderId, log)
87-
},
88-
func(err error) {
89-
log.Warnf("check node %s status failed, will retry: %v", req.NodeName, err)
90-
},
91-
)
80+
return h.checkNodeDeletedWithInformer(ctx, req.NodeName, req.NodeID, req.ProviderId, log)
9281
}
9382

9483
func checkNodeDeleted(ctx context.Context, clientSet v1.NodeInterface, nodeName, nodeID, providerID string, log logrus.FieldLogger) (bool, error) {
@@ -117,40 +106,167 @@ func checkNodeDeleted(ctx context.Context, clientSet v1.NodeInterface, nodeName,
117106
return false, errNodeNotDeleted
118107
}
119108

120-
func (h *CheckNodeStatusHandler) checkNodeReady(ctx context.Context, _ *logrus.Entry, req *castai.ActionCheckNodeStatus) error {
109+
func (h *CheckNodeStatusHandler) checkNodeDeletedWithInformer(ctx context.Context, nodeName, nodeID, providerID string, log logrus.FieldLogger) error {
110+
lister := h.informerManager.GetNodeLister()
111+
112+
// Check if node is already deleted in cache
113+
node, err := lister.Get(nodeName)
114+
if err != nil {
115+
if k8serrors.IsNotFound(err) {
116+
log.Info("node already deleted in cache")
117+
return nil // Node deleted
118+
}
119+
return fmt.Errorf("getting node from lister: %w", err)
120+
}
121+
122+
// Check if node ID/provider ID don't match (name reused)
123+
if err := isNodeIDProviderIDValid(node, nodeID, providerID, log); err != nil {
124+
if errors.Is(err, errNodeDoesNotMatch) {
125+
log.Info("node name reused, original node deleted")
126+
return nil // Name reused, original deleted
127+
}
128+
return fmt.Errorf("validating node ID/provider ID: %w", err)
129+
}
130+
131+
// Set up channel to receive notification when node is deleted
132+
deleted := make(chan struct{})
133+
errCh := make(chan error, 1)
134+
135+
informer := h.informerManager.GetNodeInformer()
136+
137+
// Register event handler to watch for node deletion
138+
registration, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
139+
UpdateFunc: func(oldObj, newObj any) {
140+
node, ok := newObj.(*corev1.Node)
141+
if !ok || node.Name != nodeName {
142+
return
143+
}
144+
// Check if node was replaced (ID mismatch)
145+
if err := isNodeIDProviderIDValid(node, nodeID, providerID, log); err != nil {
146+
if errors.Is(err, errNodeDoesNotMatch) {
147+
log.Info("node name reused, original node deleted (update event)")
148+
select {
149+
case deleted <- struct{}{}:
150+
default:
151+
}
152+
}
153+
}
154+
},
155+
DeleteFunc: func(obj any) {
156+
node, ok := obj.(*corev1.Node)
157+
if !ok {
158+
// Handle tombstone case
159+
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
160+
if !ok {
161+
return
162+
}
163+
node, ok = tombstone.Obj.(*corev1.Node)
164+
if !ok {
165+
return
166+
}
167+
}
168+
if node.Name == nodeName {
169+
log.Info("node deleted (delete event)")
170+
select {
171+
case deleted <- struct{}{}:
172+
default:
173+
}
174+
}
175+
},
176+
})
177+
if err != nil {
178+
return fmt.Errorf("failed to add event handler: %w", err)
179+
}
180+
defer func() {
181+
if err := informer.RemoveEventHandler(registration); err != nil {
182+
log.WithError(err).Warn("failed to remove event handler")
183+
}
184+
}()
185+
186+
// Wait for node to be deleted or timeout
187+
select {
188+
case <-deleted:
189+
return nil
190+
case err := <-errCh:
191+
return err
192+
case <-ctx.Done():
193+
return fmt.Errorf("timeout waiting for node to be deleted: %w", ctx.Err())
194+
}
195+
}
196+
197+
func (h *CheckNodeStatusHandler) checkNodeReady(ctx context.Context, log *logrus.Entry, req *castai.ActionCheckNodeStatus) error {
198+
return h.checkNodeReadyWithInformer(ctx, log, req)
199+
}
200+
201+
func (h *CheckNodeStatusHandler) checkNodeReadyWithInformer(ctx context.Context, log *logrus.Entry, req *castai.ActionCheckNodeStatus) error {
121202
timeout := 9 * time.Minute
122203
if req.WaitTimeoutSeconds != nil {
123204
timeout = time.Duration(*req.WaitTimeoutSeconds) * time.Second
124205
}
125206

126-
watchObject := metav1.SingleObject(metav1.ObjectMeta{
127-
Name: req.NodeName,
128-
})
129-
watchObject.TimeoutSeconds = lo.ToPtr(int64(timeout.Seconds()))
130-
131207
ctx, cancel := context.WithTimeout(ctx, timeout)
132208
defer cancel()
133209

134-
watch, err := h.clientset.CoreV1().Nodes().Watch(ctx, watchObject)
135-
if err != nil {
136-
return fmt.Errorf("creating node watch: %w", err)
210+
// Check if node is already ready in cache
211+
lister := h.informerManager.GetNodeLister()
212+
node, err := lister.Get(req.NodeName)
213+
if err == nil && h.isNodeReady(node, req.NodeID, req.ProviderId) {
214+
log.Info("node already ready in cache")
215+
return nil
137216
}
138-
defer watch.Stop()
139217

140-
for {
141-
select {
142-
case <-ctx.Done():
143-
return fmt.Errorf("node %s request timeout: %v %w", req.NodeName, timeout, ctx.Err())
144-
case r, ok := <-watch.ResultChan():
145-
if !ok {
146-
return fmt.Errorf("node %s request timeout: %v %w", req.NodeName, timeout, errNodeWatcherClosed)
218+
// Set up channel to receive notification when node is ready
219+
ready := make(chan struct{})
220+
errCh := make(chan error, 1)
221+
222+
informer := h.informerManager.GetNodeInformer()
223+
224+
// Register event handler to watch for node updates
225+
registration, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
226+
AddFunc: func(obj any) {
227+
node, ok := obj.(*corev1.Node)
228+
if !ok || node.Name != req.NodeName {
229+
return
230+
}
231+
if h.isNodeReady(node, req.NodeID, req.ProviderId) {
232+
log.Info("node became ready (add event)")
233+
select {
234+
case ready <- struct{}{}:
235+
default:
236+
}
237+
}
238+
},
239+
UpdateFunc: func(oldObj, newObj any) {
240+
node, ok := newObj.(*corev1.Node)
241+
if !ok || node.Name != req.NodeName {
242+
return
147243
}
148-
if node, ok := r.Object.(*corev1.Node); ok {
149-
if h.isNodeReady(node, req.NodeID, req.ProviderId) {
150-
return nil
244+
if h.isNodeReady(node, req.NodeID, req.ProviderId) {
245+
log.Info("node became ready (update event)")
246+
select {
247+
case ready <- struct{}{}:
248+
default:
151249
}
152250
}
251+
},
252+
})
253+
if err != nil {
254+
return fmt.Errorf("failed to add event handler: %w", err)
255+
}
256+
defer func() {
257+
if err := informer.RemoveEventHandler(registration); err != nil {
258+
log.WithError(err).Warn("failed to remove event handler")
153259
}
260+
}()
261+
262+
// Wait for node to be ready or timeout
263+
select {
264+
case <-ready:
265+
return nil
266+
case err := <-errCh:
267+
return err
268+
case <-ctx.Done():
269+
return fmt.Errorf("timeout waiting for node to be ready: %w", ctx.Err())
154270
}
155271
}
156272

0 commit comments

Comments
 (0)