Skip to content

Commit d8460d2

Browse files
authored
multithreaded event processor (#317)
This replaces the single process execution of events to parallel processing, solving the issue that happens when NTH is busy / blocked (retrying to evict) and eventually will miss to process events for other nodes going down the same time time. Example: 3 nodes roll at a time because of batchSize or spot interruption. A deployment has a pdb limit of maxUnavailable of 1 - that will block NTH in a eviction retry loop and it will miss the third node eviction. The amount of workers are capped to prevent a memory runnaway
1 parent 1319288 commit d8460d2

File tree

8 files changed

+75
-47
lines changed

8 files changed

+75
-47
lines changed

cmd/node-termination-handler.go

Lines changed: 61 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"os"
1919
"os/signal"
2020
"strings"
21+
"sync"
2122
"syscall"
2223
"time"
2324

@@ -194,16 +195,30 @@ func main() {
194195
go watchForCancellationEvents(cancelChan, interruptionEventStore, node, metrics)
195196
log.Log().Msg("Started watching for event cancellations")
196197

198+
var wg sync.WaitGroup
199+
197200
for range time.NewTicker(1 * time.Second).C {
198201
select {
199-
case _ = <-signalChan:
202+
case <-signalChan:
200203
// Exit interruption loop if a SIGTERM is received or the channel is closed
201204
break
202205
default:
203-
drainOrCordonIfNecessary(interruptionEventStore, *node, nthConfig, nodeMetadata, metrics)
206+
for event, ok := interruptionEventStore.GetActiveEvent(); ok && !event.InProgress; event, ok = interruptionEventStore.GetActiveEvent() {
207+
select {
208+
case interruptionEventStore.Workers <- 1:
209+
event.InProgress = true
210+
wg.Add(1)
211+
go drainOrCordonIfNecessary(interruptionEventStore, event, *node, nthConfig, nodeMetadata, metrics, &wg)
212+
default:
213+
log.Warn().Msg("all workers busy, waiting")
214+
break
215+
}
216+
}
204217
}
205218
}
206219
log.Log().Msg("AWS Node Termination Handler is shutting down")
220+
wg.Wait()
221+
log.Debug().Msg("all event processors finished")
207222
}
208223

209224
func handleRebootUncordon(nodeName string, interruptionEventStore *interruptioneventstore.Store, node node.Node) error {
@@ -254,59 +269,60 @@ func watchForCancellationEvents(cancelChan <-chan monitor.InterruptionEvent, int
254269
}
255270
}
256271

257-
func drainOrCordonIfNecessary(interruptionEventStore *interruptioneventstore.Store, node node.Node, nthConfig config.Config, nodeMetadata ec2metadata.NodeMetadata, metrics observability.Metrics) {
258-
if drainEvent, ok := interruptionEventStore.GetActiveEvent(); ok {
259-
nodeName := drainEvent.NodeName
260-
if drainEvent.PreDrainTask != nil {
261-
err := drainEvent.PreDrainTask(*drainEvent, node)
262-
if err != nil {
263-
log.Log().Err(err).Msg("There was a problem executing the pre-drain task")
264-
}
265-
metrics.NodeActionsInc("pre-drain", nodeName, err)
272+
func drainOrCordonIfNecessary(interruptionEventStore *interruptioneventstore.Store, drainEvent *monitor.InterruptionEvent, node node.Node, nthConfig config.Config, nodeMetadata ec2metadata.NodeMetadata, metrics observability.Metrics, wg *sync.WaitGroup) {
273+
defer wg.Done()
274+
nodeName := drainEvent.NodeName
275+
if drainEvent.PreDrainTask != nil {
276+
err := drainEvent.PreDrainTask(*drainEvent, node)
277+
if err != nil {
278+
log.Log().Err(err).Msg("There was a problem executing the pre-drain task")
266279
}
280+
metrics.NodeActionsInc("pre-drain", nodeName, err)
281+
}
267282

268-
if nthConfig.CordonOnly || drainEvent.IsRebalanceRecommendation() {
269-
err := node.Cordon(nodeName)
270-
if err != nil {
271-
if errors.IsNotFound(err) {
272-
log.Warn().Err(err).Msgf("node '%s' not found in the cluster", nodeName)
273-
} else {
274-
log.Log().Err(err).Msg("There was a problem while trying to cordon the node")
275-
os.Exit(1)
276-
}
283+
if nthConfig.CordonOnly || drainEvent.IsRebalanceRecommendation() {
284+
err := node.Cordon(nodeName)
285+
if err != nil {
286+
if errors.IsNotFound(err) {
287+
log.Warn().Err(err).Msgf("node '%s' not found in the cluster", nodeName)
277288
} else {
278-
log.Log().Str("node_name", nodeName).Msg("Node successfully cordoned")
279-
err = node.LogPods(nodeName)
280-
if err != nil {
281-
log.Log().Err(err).Msg("There was a problem while trying to log all pod names on the node")
282-
}
283-
metrics.NodeActionsInc("cordon", nodeName, err)
289+
log.Log().Err(err).Msg("There was a problem while trying to cordon the node")
290+
os.Exit(1)
284291
}
285292
} else {
286-
err := node.CordonAndDrain(nodeName)
293+
log.Log().Str("node_name", nodeName).Msg("Node successfully cordoned")
294+
err = node.LogPods(nodeName)
287295
if err != nil {
288-
if errors.IsNotFound(err) {
289-
log.Warn().Err(err).Msgf("node '%s' not found in the cluster", nodeName)
290-
} else {
291-
log.Log().Err(err).Msg("There was a problem while trying to cordon and drain the node")
292-
os.Exit(1)
293-
}
296+
log.Log().Err(err).Msg("There was a problem while trying to log all pod names on the node")
297+
}
298+
metrics.NodeActionsInc("cordon", nodeName, err)
299+
}
300+
} else {
301+
err := node.CordonAndDrain(nodeName)
302+
if err != nil {
303+
if errors.IsNotFound(err) {
304+
log.Warn().Err(err).Msgf("node '%s' not found in the cluster", nodeName)
294305
} else {
295-
log.Log().Str("node_name", nodeName).Msg("Node successfully cordoned and drained")
296-
metrics.NodeActionsInc("cordon-and-drain", nodeName, err)
306+
log.Log().Err(err).Msg("There was a problem while trying to cordon and drain the node")
307+
os.Exit(1)
297308
}
309+
} else {
310+
log.Log().Str("node_name", nodeName).Msg("Node successfully cordoned and drained")
311+
metrics.NodeActionsInc("cordon-and-drain", nodeName, err)
298312
}
313+
}
299314

300-
interruptionEventStore.MarkAllAsDrained(nodeName)
301-
if nthConfig.WebhookURL != "" {
302-
webhook.Post(nodeMetadata, drainEvent, nthConfig)
303-
}
304-
if drainEvent.PostDrainTask != nil {
305-
err := drainEvent.PostDrainTask(*drainEvent, node)
306-
if err != nil {
307-
log.Err(err).Msg("There was a problem executing the post-drain task")
308-
}
309-
metrics.NodeActionsInc("post-drain", nodeName, err)
315+
interruptionEventStore.MarkAllAsDrained(nodeName)
316+
if nthConfig.WebhookURL != "" {
317+
webhook.Post(nodeMetadata, drainEvent, nthConfig)
318+
}
319+
if drainEvent.PostDrainTask != nil {
320+
err := drainEvent.PostDrainTask(*drainEvent, node)
321+
if err != nil {
322+
log.Err(err).Msg("There was a problem executing the post-drain task")
310323
}
324+
metrics.NodeActionsInc("post-drain", nodeName, err)
311325
}
326+
<-interruptionEventStore.Workers
327+
312328
}

config/helm/aws-node-termination-handler/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,6 @@ Parameter | Description | Default
7979
`podMonitor.sampleLimit` | Number of scraped samples accepted | `5000`
8080
`podMonitor.labels` | Additional PodMonitor metadata labels | `{}`
8181

82-
8382
### AWS Node Termination Handler - Queue-Processor Mode Configuration
8483

8584
Parameter | Description | Default
@@ -89,6 +88,7 @@ Parameter | Description | Default
8988
`awsRegion` | If specified, use the AWS region for AWS API calls, else NTH will try to find the region through AWS_REGION env var, IMDS, or the specified queue URL | ``
9089
`checkASGTagBeforeDraining` | If true, check that the instance is tagged with "aws-node-termination-handler/managed" as the key before draining the node | `true`
9190
`managedAsgTag` | The tag to ensure is on a node if checkASGTagBeforeDraining is true | `aws-node-termination-handler/managed`
91+
`workers` | The maximum amount of parallel event processors | `10`
9292

9393
### AWS Node Termination Handler - IMDS Mode Configuration
9494

config/helm/aws-node-termination-handler/templates/deployment.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,8 @@ spec:
144144
value: {{ .Values.checkASGTagBeforeDraining | quote }}
145145
- name: MANAGED_ASG_TAG
146146
value: {{ .Values.managedAsgTag | quote }}
147+
- name: WORKERS
148+
value: {{ .Values.workers | quote }}
147149
resources:
148150
{{- toYaml .Values.resources | nindent 12 }}
149151
{{- if .Values.enablePrometheusServer }}

config/helm/aws-node-termination-handler/values.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,3 +188,6 @@ windowsUpdateStrategy: ""
188188
# If you have disabled IMDSv1 and are relying on IMDSv2, you'll need to increase the IP hop count to 2 before switching this to false
189189
# https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html
190190
useHostNetwork: true
191+
192+
# The maximal amount of parallel event processors to handle concurrent events
193+
workers: 10

pkg/config/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ const (
7070
logLevelDefault = "INFO"
7171
uptimeFromFileConfigKey = "UPTIME_FROM_FILE"
7272
uptimeFromFileDefault = ""
73+
workersConfigKey = "WORKERS"
74+
workersDefault = 10
7375
// prometheus
7476
enablePrometheusDefault = false
7577
enablePrometheusConfigKey = "ENABLE_PROMETHEUS_SERVER"
@@ -116,6 +118,7 @@ type Config struct {
116118
AWSRegion string
117119
AWSEndpoint string
118120
QueueURL string
121+
Workers int
119122
AWSSession *session.Session
120123
}
121124

@@ -162,6 +165,7 @@ func ParseCliArgs() (config Config, err error) {
162165
flag.StringVar(&config.AWSRegion, "aws-region", getEnv(awsRegionConfigKey, ""), "If specified, use the AWS region for AWS API calls")
163166
flag.StringVar(&config.AWSEndpoint, "aws-endpoint", getEnv(awsEndpointConfigKey, ""), "[testing] If specified, use the AWS endpoint to make API calls")
164167
flag.StringVar(&config.QueueURL, "queue-url", getEnv(queueURLConfigKey, ""), "Listens for messages on the specified SQS queue URL")
168+
flag.IntVar(&config.Workers, "workers", getIntEnv(workersConfigKey, workersDefault), "The amount of parallel event processors.")
165169

166170
flag.Parse()
167171

pkg/interruptioneventstore/interruption-event-store.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ type Store struct {
3030
interruptionEventStore map[string]*monitor.InterruptionEvent
3131
ignoredEvents map[string]struct{}
3232
atLeastOneEvent bool
33+
Workers chan int
3334
}
3435

3536
// New Creates a new interruption event store
@@ -38,6 +39,7 @@ func New(nthConfig config.Config) *Store {
3839
NthConfig: nthConfig,
3940
interruptionEventStore: make(map[string]*monitor.InterruptionEvent),
4041
ignoredEvents: make(map[string]struct{}),
42+
Workers: make(chan int, nthConfig.Workers),
4143
}
4244
}
4345

pkg/monitor/sqsevent/sqs-monitor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ func (m SQSMonitor) retrieveNodeName(instanceID string) (string, error) {
196196
}
197197
// anything except running might not contain PrivateDnsName
198198
if state != ec2.InstanceStateNameRunning {
199-
return "", ErrNodeStateNotRunning
199+
return "", fmt.Errorf("node: '%s' in state '%s': %w", instanceID, state, ErrNodeStateNotRunning)
200200
}
201201
return "", fmt.Errorf("unable to retrieve PrivateDnsName name for '%s' in state '%s'", instanceID, state)
202202
}

pkg/monitor/types.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ type InterruptionEvent struct {
3333
StartTime time.Time
3434
EndTime time.Time
3535
Drained bool
36+
InProgress bool
3637
PreDrainTask DrainTask `json:"-"`
3738
PostDrainTask DrainTask `json:"-"`
3839
}

0 commit comments

Comments
 (0)