Skip to content

Commit a43be02

Browse files
authored
CC refactor iteration 4 (#224)
1 parent 83d21a6 commit a43be02

File tree

7 files changed

+63
-54
lines changed

7 files changed

+63
-54
lines changed

internal/actions/delete_node_handler.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ type DeleteNodeHandler struct {
5151
cfg deleteNodeConfig
5252
}
5353

54+
// nolint: gocognit
5455
func (h *DeleteNodeHandler) Handle(ctx context.Context, action *castai.ClusterAction) error {
5556
if action == nil {
5657
return fmt.Errorf("action is nil %w", k8s.ErrAction)
@@ -115,7 +116,7 @@ func (h *DeleteNodeHandler) Handle(ctx context.Context, action *castai.ClusterAc
115116
}
116117

117118
podsListingBackoff := waitext.NewConstantBackoff(h.cfg.podsTerminationWait)
118-
var pods []v1.Pod
119+
var pods []*v1.Pod
119120
err = waitext.Retry(
120121
ctx,
121122
podsListingBackoff,
@@ -127,7 +128,10 @@ func (h *DeleteNodeHandler) Handle(ctx context.Context, action *castai.ClusterAc
127128
if err != nil {
128129
return true, err
129130
}
130-
pods = podList.Items
131+
pods = make([]*v1.Pod, len(podList.Items))
132+
for i := range podList.Items {
133+
pods[i] = &podList.Items[i]
134+
}
131135
return false, nil
132136
},
133137
func(err error) {

internal/actions/drain_node_handler.go

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,8 @@ type DrainNodeHandler struct {
6464
}
6565

6666
type nodePods struct {
67-
toEvict []v1.Pod
68-
nonEvictable []v1.Pod
67+
toEvict []*v1.Pod
68+
nonEvictable []*v1.Pod
6969
}
7070

7171
func (h *DrainNodeHandler) Handle(ctx context.Context, action *castai.ClusterAction) error {
@@ -103,7 +103,7 @@ func (h *DrainNodeHandler) Handle(ctx context.Context, action *castai.ClusterAct
103103

104104
log.Info("cordoning node for draining")
105105

106-
if err := k8s.CordonNode(ctx, h.log, h.clientset, node); err != nil {
106+
if err = k8s.CordonNode(ctx, h.log, h.clientset, node); err != nil {
107107
return fmt.Errorf("cordoning node %q: %w", req.NodeName, err)
108108
}
109109

@@ -177,7 +177,7 @@ func (h *DrainNodeHandler) Handle(ctx context.Context, action *castai.ClusterAct
177177
// waitForVolumeDetachIfEnabled waits for VolumeAttachments to be deleted if the feature is enabled.
178178
// This is called after successful drain to give CSI drivers time to clean up volumes.
179179
// nonEvictablePods are pods that won't be evicted (DaemonSet, static) - their was are excluded from waiting.
180-
func (h *DrainNodeHandler) waitForVolumeDetachIfEnabled(ctx context.Context, log logrus.FieldLogger, nodeName string, req *castai.ActionDrainNode, nonEvictablePods []v1.Pod) {
180+
func (h *DrainNodeHandler) waitForVolumeDetachIfEnabled(ctx context.Context, log logrus.FieldLogger, nodeName string, req *castai.ActionDrainNode, nonEvictablePods []*v1.Pod) {
181181
if !ShouldWaitForVolumeDetach(req) || h.vaWaiter == nil {
182182
return
183183
}
@@ -204,7 +204,7 @@ func (h *DrainNodeHandler) waitForVolumeDetachIfEnabled(ctx context.Context, log
204204
// The method will still wait for termination of other evicted pods first.
205205
// Returns non-evictable pods (DaemonSet, static).
206206
// A return value of (pods, nil) means all evictable pods on the node should be evicted and terminated.
207-
func (h *DrainNodeHandler) evictNodePods(ctx context.Context, log logrus.FieldLogger, node *v1.Node) ([]v1.Pod, error) {
207+
func (h *DrainNodeHandler) evictNodePods(ctx context.Context, log logrus.FieldLogger, node *v1.Node) ([]*v1.Pod, error) {
208208
nodePods, err := h.listNodePods(ctx, log, node)
209209
if err != nil {
210210
return nil, err
@@ -257,7 +257,7 @@ func (h *DrainNodeHandler) evictNodePods(ctx context.Context, log logrus.FieldLo
257257
// The method will still wait for termination of other deleted pods first.
258258
// Returns non-evictable pods (DaemonSet, static).
259259
// A return value of (pods, nil) means all evictable pods on the node should be deleted and terminated.
260-
func (h *DrainNodeHandler) deleteNodePods(ctx context.Context, log logrus.FieldLogger, node *v1.Node, options metav1.DeleteOptions) ([]v1.Pod, error) {
260+
func (h *DrainNodeHandler) deleteNodePods(ctx context.Context, log logrus.FieldLogger, node *v1.Node, options metav1.DeleteOptions) ([]*v1.Pod, error) {
261261
nodePods, err := h.listNodePods(ctx, log, node)
262262
if err != nil {
263263
return nil, err
@@ -338,11 +338,16 @@ func (h *DrainNodeHandler) listNodePods(ctx context.Context, log logrus.FieldLog
338338
return nil, fmt.Errorf("listing node %v pods: %w", node.Name, err)
339339
}
340340

341-
partitioned := k8s.PartitionPodsForEviction(pods.Items, h.cfg.castNamespace, h.cfg.skipDeletedTimeoutSeconds)
341+
podPtrs := make([]*v1.Pod, len(pods.Items))
342+
for i := range pods.Items {
343+
podPtrs[i] = &pods.Items[i]
344+
}
345+
346+
partitioned := k8s.PartitionPodsForEviction(podPtrs, h.cfg.castNamespace, h.cfg.skipDeletedTimeoutSeconds)
342347

343348
result := &nodePods{
344-
toEvict: make([]v1.Pod, 0, len(partitioned.Evictable)+len(partitioned.CastPods)),
345-
nonEvictable: make([]v1.Pod, 0, len(partitioned.NonEvictable)),
349+
toEvict: make([]*v1.Pod, 0, len(partitioned.Evictable)+len(partitioned.CastPods)),
350+
nonEvictable: make([]*v1.Pod, 0, len(partitioned.NonEvictable)),
346351
}
347352

348353
logCastPodsToEvict(log, partitioned.CastPods)
@@ -381,7 +386,7 @@ func (h *DrainNodeHandler) waitNodePodsTerminated(ctx context.Context, log logru
381386
return true, fmt.Errorf("listing %q pods to be terminated: %w", node.Name, err)
382387
}
383388

384-
podsNames := lo.Map(nodePods.toEvict, func(p v1.Pod, _ int) string {
389+
podsNames := lo.Map(nodePods.toEvict, func(p *v1.Pod, _ int) string {
385390
return fmt.Sprintf("%s/%s", p.Namespace, p.Name)
386391
})
387392

@@ -403,7 +408,7 @@ func (h *DrainNodeHandler) waitNodePodsTerminated(ctx context.Context, log logru
403408
)
404409
}
405410

406-
func logCastPodsToEvict(log logrus.FieldLogger, castPods []v1.Pod) {
411+
func logCastPodsToEvict(log logrus.FieldLogger, castPods []*v1.Pod) {
407412
if len(castPods) == 0 {
408413
return
409414
}

internal/actions/drain_node_handler_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ func TestLogCastPodsToEvict(t *testing.T) {
8383
t.Run("should log pods to evict", func(t *testing.T) {
8484
r := require.New(t)
8585
log, hook := test.NewNullLogger()
86-
pods := []v1.Pod{
86+
pods := []*v1.Pod{
8787
{ObjectMeta: metav1.ObjectMeta{Name: "pod1", Namespace: "ns1"}},
8888
{ObjectMeta: metav1.ObjectMeta{Name: "pod2", Namespace: "ns2"}},
8989
}
@@ -97,7 +97,7 @@ func TestLogCastPodsToEvict(t *testing.T) {
9797
r := require.New(t)
9898
log, hook := test.NewNullLogger()
9999

100-
var pods []v1.Pod
100+
var pods []*v1.Pod
101101
logCastPodsToEvict(log, pods)
102102

103103
r.Len(hook.Entries, 0)

internal/k8s/kubernetes.go

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,11 @@ const (
3333
)
3434

3535
var (
36-
ErrAction = errors.New("not valid action")
37-
ErrNodeNotFound = errors.New("node not found")
38-
ErrNodeDoesNotMatch = fmt.Errorf("node does not match")
39-
ErrNodeWatcherClosed = fmt.Errorf("node watcher closed, no more events will be received")
36+
ErrAction = errors.New("not valid action")
37+
ErrNodeNotFound = errors.New("node not found")
38+
ErrNodeDoesNotMatch = fmt.Errorf("node does not match")
39+
ErrProviderIDMismatch = errors.New("provider ID mismatch")
40+
ErrNodeWatcherClosed = fmt.Errorf("node watcher closed, no more events will be received")
4041
)
4142

4243
const (
@@ -179,7 +180,7 @@ func (c *Client) GetNodeByIDs(ctx context.Context, nodeName, nodeID, providerID
179180
// actionName might be used to distinguish what is the operation (for logs, debugging, etc.) but is optional.
180181
func (c *Client) ExecuteBatchPodActions(
181182
ctx context.Context,
182-
pods []v1.Pod,
183+
pods []*v1.Pod,
183184
action func(context.Context, v1.Pod) error,
184185
actionName string,
185186
) ([]*v1.Pod, []PodActionFailure) {
@@ -201,7 +202,7 @@ func (c *Client) ExecuteBatchPodActions(
201202
wg sync.WaitGroup
202203
)
203204

204-
logger.Debugf("Starting %d parallel tasks for %d pods: [%v]", parallelTasks, len(pods), lo.Map(pods, func(t v1.Pod, i int) string {
205+
logger.Debugf("Starting %d parallel tasks for %d pods: [%v]", parallelTasks, len(pods), lo.Map(pods, func(t *v1.Pod, i int) string {
205206
return fmt.Sprintf("%s/%s", t.Namespace, t.Name)
206207
}))
207208

@@ -226,7 +227,7 @@ func (c *Client) ExecuteBatchPodActions(
226227
}
227228

228229
for _, pod := range pods {
229-
taskChan <- pod
230+
taskChan <- *pod
230231
}
231232

232233
close(taskChan)
@@ -256,7 +257,6 @@ func (c *Client) EvictPod(ctx context.Context, pod v1.Pod, podEvictRetryDelay ti
256257
action := func(ctx context.Context) (bool, error) {
257258
var err error
258259

259-
c.log.Debugf("requesting eviction for pod %s/%s", pod.Namespace, pod.Name)
260260
if version == policyv1.SchemeGroupVersion {
261261
err = c.clientset.PolicyV1().Evictions(pod.Namespace).Evict(ctx, &policyv1.Eviction{
262262
ObjectMeta: metav1.ObjectMeta{
@@ -402,15 +402,15 @@ type PodActionFailure struct {
402402
}
403403

404404
type PartitionResult struct {
405-
Evictable []v1.Pod
406-
NonEvictable []v1.Pod
407-
CastPods []v1.Pod
405+
Evictable []*v1.Pod
406+
NonEvictable []*v1.Pod
407+
CastPods []*v1.Pod
408408
}
409409

410-
func PartitionPodsForEviction(pods []v1.Pod, castNamespace string, skipDeletedTimeoutSeconds int) *PartitionResult {
411-
castPods := make([]v1.Pod, 0)
412-
evictable := make([]v1.Pod, 0)
413-
nonEvictable := make([]v1.Pod, 0)
410+
func PartitionPodsForEviction(pods []*v1.Pod, castNamespace string, skipDeletedTimeoutSeconds int) *PartitionResult {
411+
castPods := make([]*v1.Pod, 0)
412+
evictable := make([]*v1.Pod, 0)
413+
nonEvictable := make([]*v1.Pod, 0)
414414

415415
for _, p := range pods {
416416
// Skip pods that have been recently removed.
@@ -424,7 +424,7 @@ func PartitionPodsForEviction(pods []v1.Pod, castNamespace string, skipDeletedTi
424424
continue
425425
}
426426

427-
if IsDaemonSetPod(&p) || IsStaticPod(&p) {
427+
if IsDaemonSetPod(p) || IsStaticPod(p) {
428428
nonEvictable = append(nonEvictable, p)
429429
continue
430430
}
@@ -502,7 +502,7 @@ func GetNodeByIDs(ctx context.Context, clientSet corev1client.NodeInterface, nod
502502
func ExecuteBatchPodActions(
503503
ctx context.Context,
504504
log logrus.FieldLogger,
505-
pods []v1.Pod,
505+
pods []*v1.Pod,
506506
action func(context.Context, v1.Pod) error,
507507
actionName string,
508508
) ([]*v1.Pod, []PodActionFailure) {

internal/k8s/kubernetes_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,7 @@ func TestClient_ExecuteBatchPodActions(t *testing.T) {
380380

381381
tests := []struct {
382382
name string
383-
pods []v1.Pod
383+
pods []*v1.Pod
384384
action func(context.Context, v1.Pod) error
385385
actionName string
386386
wantSuccessCount int
@@ -389,15 +389,15 @@ func TestClient_ExecuteBatchPodActions(t *testing.T) {
389389
}{
390390
{
391391
name: "empty pod list",
392-
pods: []v1.Pod{},
392+
pods: []*v1.Pod{},
393393
action: func(ctx context.Context, pod v1.Pod) error { return nil },
394394
actionName: "test-action",
395395
wantSuccessCount: 0,
396396
wantFailureCount: 0,
397397
},
398398
{
399399
name: "all pods succeed",
400-
pods: []v1.Pod{
400+
pods: []*v1.Pod{
401401
{ObjectMeta: metav1.ObjectMeta{Name: "pod1", Namespace: "default"}},
402402
{ObjectMeta: metav1.ObjectMeta{Name: "pod2", Namespace: "default"}},
403403
{ObjectMeta: metav1.ObjectMeta{Name: "pod3", Namespace: "default"}},
@@ -409,7 +409,7 @@ func TestClient_ExecuteBatchPodActions(t *testing.T) {
409409
},
410410
{
411411
name: "all pods fail",
412-
pods: []v1.Pod{
412+
pods: []*v1.Pod{
413413
{ObjectMeta: metav1.ObjectMeta{Name: "pod1", Namespace: "default"}},
414414
{ObjectMeta: metav1.ObjectMeta{Name: "pod2", Namespace: "default"}},
415415
},
@@ -421,7 +421,7 @@ func TestClient_ExecuteBatchPodActions(t *testing.T) {
421421
},
422422
{
423423
name: "mixed success and failure",
424-
pods: []v1.Pod{
424+
pods: []*v1.Pod{
425425
{ObjectMeta: metav1.ObjectMeta{Name: "pod1", Namespace: "default"}},
426426
{ObjectMeta: metav1.ObjectMeta{Name: "pod-fail", Namespace: "default"}},
427427
{ObjectMeta: metav1.ObjectMeta{Name: "pod2", Namespace: "default"}},
@@ -439,7 +439,7 @@ func TestClient_ExecuteBatchPodActions(t *testing.T) {
439439
},
440440
{
441441
name: "default action name when empty",
442-
pods: []v1.Pod{
442+
pods: []*v1.Pod{
443443
{ObjectMeta: metav1.ObjectMeta{Name: "pod1", Namespace: "default"}},
444444
},
445445
action: func(ctx context.Context, pod v1.Pod) error { return fmt.Errorf("fail") },

internal/volume/detachment_waiter.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ type DetachmentWaitOptions struct {
2929

3030
// PodsToExclude are pods whose VolumeAttachments should not be waited for
3131
// (e.g., DaemonSet pods, static pods that won't be evicted).
32-
PodsToExclude []v1.Pod
32+
PodsToExclude []*v1.Pod
3333
}
3434

3535
// DetachmentWaiter waits for VolumeAttachments to be detached from a node.
@@ -99,15 +99,15 @@ func (w *detachmentWaiter) Wait(
9999

100100
// getExcludedPVsForNode returns PV names that should NOT be waited for.
101101
// These are PVs used by podsToExclude (e.g., DaemonSets, static pods) since
102-
// those pods won't be evicted and would cause a deadlock waiting for their VAs.
102+
// those pods won't be evicted and would cause a deadlock waiting for their VolumeAttachments.
103103
func (w *detachmentWaiter) getExcludedPVsForNode(
104104
ctx context.Context,
105105
log logrus.FieldLogger,
106-
podsToExclude []v1.Pod,
106+
podsToExclude []*v1.Pod,
107107
) map[string]struct{} {
108108
excludedPVs := make(map[string]struct{})
109109
for i := range podsToExclude {
110-
pod := &podsToExclude[i]
110+
pod := podsToExclude[i]
111111
for _, vol := range pod.Spec.Volumes {
112112
if vol.PersistentVolumeClaim == nil {
113113
continue
@@ -126,8 +126,8 @@ func (w *detachmentWaiter) getExcludedPVsForNode(
126126

127127
// waitForVolumeDetach waits for all VolumeAttachments on the node to be deleted,
128128
// except those belonging to excludedPVs (e.g., DaemonSet pods).
129-
// Returns nil when all VAs are deleted.
130-
// Returns DetachmentError on timeout with the list of remaining VAs.
129+
// Returns nil when all VolumeAttachments are deleted.
130+
// Returns DetachmentError on timeout with the list of remaining VolumeAttachments.
131131
// Respects context cancellation.
132132
func (w *detachmentWaiter) waitForVolumeDetach(
133133
ctx context.Context,

internal/volume/detachment_waiter_test.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@ import (
2020
"github.com/castai/cluster-controller/internal/informer"
2121
)
2222

23-
func newTestWaiterVAInformer(t *testing.T, vas []*storagev1.VolumeAttachment, additionalObjs ...runtime.Object) (cache.Indexer, kubernetes.Interface) {
23+
func newTestWaiterVAInformer(t *testing.T, volumeAttachments []*storagev1.VolumeAttachment, additionalObjs ...runtime.Object) (cache.Indexer, kubernetes.Interface) {
2424
t.Helper()
2525

26-
objs := make([]runtime.Object, 0, len(vas)+len(additionalObjs))
27-
for _, va := range vas {
26+
objs := make([]runtime.Object, 0, len(volumeAttachments)+len(additionalObjs))
27+
for _, va := range volumeAttachments {
2828
objs = append(objs, va)
2929
}
3030
objs = append(objs, additionalObjs...)
@@ -63,7 +63,7 @@ func vaStrPtr(s string) *string {
6363
func TestDetachmentWaiter_Wait(t *testing.T) {
6464
t.Parallel()
6565

66-
t.Run("should return immediately when no VAs on node", func(t *testing.T) {
66+
t.Run("should return immediately when no VolumeAttachments on node", func(t *testing.T) {
6767
synctest.Test(t, func(t *testing.T) {
6868
r := require.New(t)
6969
log := logrus.New()
@@ -79,7 +79,7 @@ func TestDetachmentWaiter_Wait(t *testing.T) {
7979
})
8080
})
8181

82-
t.Run("should complete when VAs are deleted", func(t *testing.T) {
82+
t.Run("should complete when VolumeAttachments are deleted", func(t *testing.T) {
8383
synctest.Test(t, func(t *testing.T) {
8484
r := require.New(t)
8585
log := logrus.New()
@@ -163,7 +163,7 @@ func TestDetachmentWaiter_Wait(t *testing.T) {
163163
})
164164
})
165165

166-
t.Run("should only wait for VAs on specified node", func(t *testing.T) {
166+
t.Run("should only wait for VolumeAttachments on specified node", func(t *testing.T) {
167167
synctest.Test(t, func(t *testing.T) {
168168
r := require.New(t)
169169
log := logrus.New()
@@ -199,7 +199,7 @@ func TestDetachmentWaiter_Wait(t *testing.T) {
199199
})
200200
})
201201

202-
t.Run("should exclude VAs from excluded pods", func(t *testing.T) {
202+
t.Run("should exclude VolumeAttachments from excluded pods", func(t *testing.T) {
203203
synctest.Test(t, func(t *testing.T) {
204204
r := require.New(t)
205205
log := logrus.New()
@@ -256,13 +256,13 @@ func TestDetachmentWaiter_Wait(t *testing.T) {
256256
err := waiter.Wait(context.Background(), log, DetachmentWaitOptions{
257257
NodeName: "node1",
258258
Timeout: 2 * time.Second,
259-
PodsToExclude: []v1.Pod{dsPod},
259+
PodsToExclude: []*v1.Pod{&dsPod},
260260
})
261261
r.NoError(err)
262262
})
263263
})
264264

265-
t.Run("should exclude VAs from static pods", func(t *testing.T) {
265+
t.Run("should exclude VolumeAttachments from static pods", func(t *testing.T) {
266266
synctest.Test(t, func(t *testing.T) {
267267
r := require.New(t)
268268
log := logrus.New()
@@ -319,7 +319,7 @@ func TestDetachmentWaiter_Wait(t *testing.T) {
319319
err := waiter.Wait(context.Background(), log, DetachmentWaitOptions{
320320
NodeName: "node1",
321321
Timeout: 2 * time.Second,
322-
PodsToExclude: []v1.Pod{staticPod},
322+
PodsToExclude: []*v1.Pod{&staticPod},
323323
})
324324
r.NoError(err)
325325
})

0 commit comments

Comments
 (0)