Skip to content

Commit 4b1812b

Browse files
authored
feat: improve api request count for node drain action (#36)
- batching pod requests in chunks of 5 to not overload control plane api - do not call for each pod individually when waiting for them to be terminated on the node, list node pods instead - expose k8s clientset rate limiting config. Defaults: 25 QPS, 150 Burst
1 parent 69d4f28 commit 4b1812b

File tree

7 files changed

+128
-123
lines changed

7 files changed

+128
-123
lines changed

actions/drain_node_handler.go

Lines changed: 96 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -20,26 +20,24 @@ import (
2020
"github.com/castai/cluster-controller/castai"
2121
)
2222

23-
var (
24-
errPodPresent = errors.New("pod is still present")
25-
)
26-
2723
type drainNodeConfig struct {
28-
podsDeleteTimeout time.Duration
29-
podDeleteRetries uint64
30-
podDeleteRetryDelay time.Duration
31-
podEvictRetryDelay time.Duration
24+
podsDeleteTimeout time.Duration
25+
podDeleteRetries uint64
26+
podDeleteRetryDelay time.Duration
27+
podEvictRetryDelay time.Duration
28+
podsTerminationWaitRetryDelay time.Duration
3229
}
3330

3431
func newDrainNodeHandler(log logrus.FieldLogger, clientset kubernetes.Interface) ActionHandler {
3532
return &drainNodeHandler{
3633
log: log,
3734
clientset: clientset,
3835
cfg: drainNodeConfig{
39-
podsDeleteTimeout: 5 * time.Minute,
40-
podDeleteRetries: 5,
41-
podDeleteRetryDelay: 5 * time.Second,
42-
podEvictRetryDelay: 5 * time.Second,
36+
podsDeleteTimeout: 5 * time.Minute,
37+
podDeleteRetries: 5,
38+
podDeleteRetryDelay: 5 * time.Second,
39+
podEvictRetryDelay: 5 * time.Second,
40+
podsTerminationWaitRetryDelay: 10 * time.Second,
4341
},
4442
}
4543
}
@@ -73,19 +71,10 @@ func (h *drainNodeHandler) Handle(ctx context.Context, data interface{}) error {
7371
return fmt.Errorf("tainting node %q: %w", req.NodeName, err)
7472
}
7573

76-
allNodePods, err := h.listNodePods(ctx, node)
77-
if err != nil {
78-
return fmt.Errorf("listing pods for node %q: %w", req.NodeName, err)
79-
}
80-
81-
podsToEvict := lo.Filter(allNodePods.Items, func(pod v1.Pod, _ int) bool {
82-
return !isDaemonSetPod(&pod) && !isStaticPod(&pod)
83-
})
84-
8574
// First try to evict pods gracefully.
8675
evictCtx, evictCancel := context.WithTimeout(ctx, time.Duration(req.DrainTimeoutSeconds)*time.Second)
8776
defer evictCancel()
88-
err = h.evictPods(evictCtx, log, podsToEvict)
77+
err = h.evictNodePods(evictCtx, log, node)
8978
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
9079
return err
9180
}
@@ -97,7 +86,7 @@ func (h *drainNodeHandler) Handle(ctx context.Context, data interface{}) error {
9786
// If force is set and evict timeout exceeded delete pods.
9887
deleteCtx, deleteCancel := context.WithTimeout(ctx, h.cfg.podsDeleteTimeout)
9988
defer deleteCancel()
100-
if err := h.deletePods(deleteCtx, log, podsToEvict); err != nil {
89+
if err := h.deleteNodePods(deleteCtx, log, node); err != nil {
10190
return err
10291
}
10392
}
@@ -107,74 +96,71 @@ func (h *drainNodeHandler) Handle(ctx context.Context, data interface{}) error {
10796
return nil
10897
}
10998

110-
func (h *drainNodeHandler) deletePods(ctx context.Context, log logrus.FieldLogger, pods []v1.Pod) error {
111-
log.Infof("forcefully deleting %d pods", len(pods))
112-
113-
g, ctx := errgroup.WithContext(ctx)
114-
for _, pod := range pods {
115-
pod := pod
116-
117-
g.Go(func() error {
118-
err := h.deletePod(ctx, pod)
119-
if err != nil {
120-
return err
121-
}
122-
return h.waitPodTerminated(ctx, log, pod)
123-
})
99+
func (h *drainNodeHandler) taintNode(ctx context.Context, node *v1.Node) error {
100+
if node.Spec.Unschedulable {
101+
return nil
124102
}
125103

126-
if err := g.Wait(); err != nil {
127-
return fmt.Errorf("deleting pods: %w", err)
104+
err := patchNode(ctx, h.clientset, node, func(n *v1.Node) error {
105+
n.Spec.Unschedulable = true
106+
return nil
107+
})
108+
if err != nil {
109+
return fmt.Errorf("patching node unschedulable: %w", err)
128110
}
129-
130111
return nil
131112
}
132113

133-
func (h *drainNodeHandler) deletePod(ctx context.Context, pod v1.Pod) error {
134-
b := backoff.WithContext(backoff.WithMaxRetries(backoff.NewConstantBackOff(h.cfg.podDeleteRetryDelay), h.cfg.podDeleteRetries), ctx) // nolint:gomnd
135-
action := func() error {
136-
err := h.clientset.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})
137-
if err != nil {
138-
// Pod is not found - ignore.
139-
if apierrors.IsNotFound(err) {
140-
return nil
141-
}
114+
func (h *drainNodeHandler) evictNodePods(ctx context.Context, log logrus.FieldLogger, node *v1.Node) error {
115+
pods, err := h.listNodePodsToEvict(ctx, node)
116+
if err != nil {
117+
return err
118+
}
142119

143-
// Pod is misconfigured - stop retry.
144-
if apierrors.IsInternalError(err) {
145-
return backoff.Permanent(err)
146-
}
147-
}
120+
log.Infof("evicting %d pods", len(pods))
148121

149-
// Other errors - retry.
122+
if err := h.sendPodsRequests(ctx, pods, h.evictPod); err != nil {
123+
return fmt.Errorf("sending evict pods requests: %w", err)
124+
}
125+
126+
return h.waitNodePodsTerminated(ctx, node)
127+
}
128+
129+
func (h *drainNodeHandler) deleteNodePods(ctx context.Context, log logrus.FieldLogger, node *v1.Node) error {
130+
pods, err := h.listNodePodsToEvict(ctx, node)
131+
if err != nil {
150132
return err
151133
}
152-
if err := backoff.Retry(action, b); err != nil {
153-
return fmt.Errorf("deleting pod %s in namespace %s: %w", pod.Name, pod.Namespace, err)
134+
135+
log.Infof("forcefully deleting %d pods", len(pods))
136+
137+
if err := h.sendPodsRequests(ctx, pods, h.deletePod); err != nil {
138+
return fmt.Errorf("sending delete pods requests: %w", err)
154139
}
155-
return nil
140+
141+
return h.waitNodePodsTerminated(ctx, node)
156142
}
157143

158-
// taintNode to make it unshedulable.
159-
func (h *drainNodeHandler) taintNode(ctx context.Context, node *v1.Node) error {
160-
if node.Spec.Unschedulable {
161-
return nil
162-
}
144+
func (h *drainNodeHandler) sendPodsRequests(ctx context.Context, pods []v1.Pod, f func(context.Context, v1.Pod) error) error {
145+
const batchSize = 5
163146

164-
err := patchNode(ctx, h.clientset, node, func(n *v1.Node) error {
165-
n.Spec.Unschedulable = true
166-
return nil
167-
})
168-
if err != nil {
169-
return fmt.Errorf("patching node unschedulable: %w", err)
147+
for _, batch := range lo.Chunk(pods, batchSize) {
148+
g, ctx := errgroup.WithContext(ctx)
149+
for _, pod := range batch {
150+
pod := pod
151+
g.Go(func() error { return f(ctx, pod) })
152+
}
153+
if err := g.Wait(); err != nil {
154+
return err
155+
}
170156
}
157+
171158
return nil
172159
}
173160

174-
// listNodePods returns a list of all pods scheduled on the provided node.
175-
func (h *drainNodeHandler) listNodePods(ctx context.Context, node *v1.Node) (*v1.PodList, error) {
161+
func (h *drainNodeHandler) listNodePodsToEvict(ctx context.Context, node *v1.Node) ([]v1.Pod, error) {
176162
var pods *v1.PodList
177-
err := backoff.Retry(func() error {
163+
if err := backoff.Retry(func() error {
178164
p, err := h.clientset.CoreV1().Pods(metav1.NamespaceAll).List(ctx, metav1.ListOptions{
179165
FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": node.Name}).String(),
180166
})
@@ -183,58 +169,28 @@ func (h *drainNodeHandler) listNodePods(ctx context.Context, node *v1.Node) (*v1
183169
}
184170
pods = p
185171
return nil
186-
}, defaultBackoff(ctx))
187-
return pods, err
188-
}
189-
190-
func (h *drainNodeHandler) evictPods(ctx context.Context, log logrus.FieldLogger, pods []v1.Pod) error {
191-
log.Infof("evicting %d pods", len(pods))
192-
193-
g, ctx := errgroup.WithContext(ctx)
194-
for _, pod := range pods {
195-
pod := pod
196-
197-
g.Go(func() error {
198-
err := h.evictPod(ctx, pod)
199-
if err != nil {
200-
return err
201-
}
202-
return h.waitPodTerminated(ctx, log, pod)
203-
})
172+
}, defaultBackoff(ctx)); err != nil {
173+
return nil, fmt.Errorf("listing node %v pods: %w", node.Name, err)
204174
}
205175

206-
if err := g.Wait(); err != nil {
207-
return fmt.Errorf("evicting pods: %w", err)
208-
}
176+
podsToEvict := lo.Filter(pods.Items, func(pod v1.Pod, _ int) bool {
177+
return !isDaemonSetPod(&pod) && !isStaticPod(&pod)
178+
})
209179

210-
return nil
180+
return podsToEvict, nil
211181
}
212182

213-
func (h *drainNodeHandler) waitPodTerminated(ctx context.Context, log logrus.FieldLogger, pod v1.Pod) error {
214-
b := backoff.WithContext(backoff.NewConstantBackOff(5*time.Second), ctx) // nolint:gomnd
215-
216-
err := backoff.Retry(func() error {
217-
p, err := h.clientset.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{})
218-
if err != nil && apierrors.IsNotFound(err) {
219-
return nil
220-
}
183+
func (h *drainNodeHandler) waitNodePodsTerminated(ctx context.Context, node *v1.Node) error {
184+
return backoff.Retry(func() error {
185+
pods, err := h.listNodePodsToEvict(ctx, node)
221186
if err != nil {
222-
return err
187+
return fmt.Errorf("waiting for node %q pods to be terminated: %w", node.Name, err)
223188
}
224-
// replicaSets will recreate pods with equal name and namespace, therefore we compare UIDs
225-
if p.GetUID() == pod.GetUID() {
226-
return errPodPresent
189+
if len(pods) > 0 {
190+
return fmt.Errorf("waiting for %d pods to be terminated on node %v", len(pods), node.Name)
227191
}
228192
return nil
229-
}, b)
230-
if err != nil && errors.Is(err, errPodPresent) {
231-
log.Infof("timeout waiting for pod %s in namespace %s to terminate", pod.Name, pod.Namespace)
232-
return nil
233-
}
234-
if err != nil {
235-
return fmt.Errorf("waiting for pod %s in namespace %s termination: %w", pod.Name, pod.Namespace, err)
236-
}
237-
return nil
193+
}, backoff.WithContext(backoff.NewConstantBackOff(h.cfg.podsTerminationWaitRetryDelay), ctx))
238194
}
239195

240196
// evictPod from the k8s node. Error handling is based on eviction api documentation:
@@ -274,6 +230,31 @@ func (h *drainNodeHandler) evictPod(ctx context.Context, pod v1.Pod) error {
274230
return nil
275231
}
276232

233+
func (h *drainNodeHandler) deletePod(ctx context.Context, pod v1.Pod) error {
234+
b := backoff.WithContext(backoff.WithMaxRetries(backoff.NewConstantBackOff(h.cfg.podDeleteRetryDelay), h.cfg.podDeleteRetries), ctx) // nolint:gomnd
235+
action := func() error {
236+
err := h.clientset.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})
237+
if err != nil {
238+
// Pod is not found - ignore.
239+
if apierrors.IsNotFound(err) {
240+
return nil
241+
}
242+
243+
// Pod is misconfigured - stop retry.
244+
if apierrors.IsInternalError(err) {
245+
return backoff.Permanent(err)
246+
}
247+
}
248+
249+
// Other errors - retry.
250+
return err
251+
}
252+
if err := backoff.Retry(action, b); err != nil {
253+
return fmt.Errorf("deleting pod %s in namespace %s: %w", pod.Name, pod.Namespace, err)
254+
}
255+
return nil
256+
}
257+
277258
func isDaemonSetPod(p *v1.Pod) bool {
278259
return isControlledBy(p, "DaemonSet")
279260
}

actions/drain_node_handler_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ func TestDrainNodeHandler(t *testing.T) {
103103
}
104104

105105
err := h.Handle(context.Background(), req)
106-
r.EqualError(err, "evicting pods: evicting pod pod1 in namespace default: internal")
106+
r.EqualError(err, "sending evict pods requests: evicting pod pod1 in namespace default: internal")
107107

108108
n, err := clientset.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{})
109109
r.NoError(err)

config/config.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ type Config struct {
1111
Log Log
1212
API API
1313
Kubeconfig string
14+
KubeClient KubeClient
1415
ClusterID string
1516
PprofPort int
1617
LeaderElection LeaderElection
@@ -32,6 +33,15 @@ type LeaderElection struct {
3233
LockName string
3334
}
3435

36+
type KubeClient struct {
37+
// K8S client rate limiter allows bursts of up to 'burst' to exceed the QPS, while still maintaining a
38+
// smoothed qps rate of 'qps'.
39+
// The bucket is initially filled with 'burst' tokens, and refills at a rate of 'qps'.
40+
// The maximum number of tokens in the bucket is capped at 'burst'.
41+
QPS int
42+
Burst int
43+
}
44+
3545
var cfg *Config
3646

3747
// Get configuration bound to environment variables.
@@ -45,6 +55,8 @@ func Get() Config {
4555
_ = viper.BindEnv("api.url", "API_URL")
4656
_ = viper.BindEnv("clusterid", "CLUSTER_ID")
4757
_ = viper.BindEnv("kubeconfig")
58+
_ = viper.BindEnv("kubeclient.qps", "KUBECLIENT_QPS")
59+
_ = viper.BindEnv("kubeclient.burst", "KUBECLIENT_BURST")
4860
_ = viper.BindEnv("pprofport", "PPROF_PORT")
4961
_ = viper.BindEnv("leaderelection.enabled", "LEADER_ELECTION_ENABLED")
5062
_ = viper.BindEnv("leaderelection.namespace", "LEADER_ELECTION_NAMESPACE")
@@ -79,6 +91,12 @@ func Get() Config {
7991
required("LEADER_ELECTION_LOCK_NAME")
8092
}
8193
}
94+
if cfg.KubeClient.QPS == 0 {
95+
cfg.KubeClient.QPS = 25
96+
}
97+
if cfg.KubeClient.Burst == 0 {
98+
cfg.KubeClient.Burst = 150
99+
}
82100

83101
return *cfg
84102
}

config/config_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,6 @@ func TestConfig(t *testing.T) {
2525
require.Equal(t, true, cfg.LeaderElection.Enabled)
2626
require.Equal(t, "castai-agent", cfg.LeaderElection.Namespace)
2727
require.Equal(t, "castai-cluster-controller", cfg.LeaderElection.LockName)
28+
require.Equal(t, 25, cfg.KubeClient.QPS)
29+
require.Equal(t, 150, cfg.KubeClient.Burst)
2830
}

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ require (
5555
github.com/fsnotify/fsnotify v1.5.1 // indirect
5656
github.com/go-errors/errors v1.0.1 // indirect
5757
github.com/go-gorp/gorp/v3 v3.0.2 // indirect
58-
github.com/go-logr/logr v1.2.2 // indirect
58+
github.com/go-logr/logr v1.2.3 // indirect
5959
github.com/go-openapi/jsonpointer v0.19.5 // indirect
6060
github.com/go-openapi/jsonreference v0.19.5 // indirect
6161
github.com/go-openapi/swag v0.19.14 // indirect
@@ -127,7 +127,7 @@ require (
127127
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect
128128
golang.org/x/net v0.0.0-20220107192237-5cfca573fb4d // indirect
129129
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 // indirect
130-
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 // indirect
130+
golang.org/x/sys v0.0.0-20220422013727-9388b58f7150 // indirect
131131
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
132132
golang.org/x/text v0.3.7 // indirect
133133
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect

go.sum

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -458,8 +458,9 @@ github.com/go-logr/logr v0.3.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTg
458458
github.com/go-logr/logr v0.4.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU=
459459
github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
460460
github.com/go-logr/logr v1.2.1/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
461-
github.com/go-logr/logr v1.2.2 h1:ahHml/yUpnlb96Rp8HCvtYVPY8ZYpxq3g7UYchIYwbs=
462461
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
462+
github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0=
463+
github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
463464
github.com/go-logr/stdr v1.2.0/go.mod h1:YkVgnZu1ZjjL7xTxrfm/LLZBfkhTqSR1ydtm6jTKKwI=
464465
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
465466
github.com/go-logr/zapr v0.2.0/go.mod h1:qhKdvif7YF5GI9NWEpyxTSSBdGmzkNguibrdCNVPunU=
@@ -1477,8 +1478,9 @@ golang.org/x/sys v0.0.0-20211116061358-0a5406a5449c/go.mod h1:oPkhp1MJrh7nUepCBc
14771478
golang.org/x/sys v0.0.0-20211124211545-fe61309f8881/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
14781479
golang.org/x/sys v0.0.0-20211205182925-97ca703d548d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
14791480
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
1480-
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 h1:XfKQ4OlFl8okEOr5UvAqFRVj8pY/4yfcXrddB8qAbU0=
14811481
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
1482+
golang.org/x/sys v0.0.0-20220422013727-9388b58f7150 h1:xHms4gcpe1YE7A3yIllJXP16CMAGuqwO2lX1mTyyRRc=
1483+
golang.org/x/sys v0.0.0-20220422013727-9388b58f7150/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
14821484
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
14831485
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
14841486
golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=

0 commit comments

Comments
 (0)