Skip to content

Commit f718727

Browse files
authored
Sqs deletion issue 360 (#364)
* Refactor sqs message handling Process all messages received one at a time * Update tests for error handling The existing tests all assumed that errors should be returned from Monitor() if something fails. Now those are logged and no event is generated. Refactor tests to remove go functions and use a channel with non-zero capacity to buffer generated events. * Return error if no messages can be processed If none of the messages received from SQS can be processed, return an error. This will allow the NTH to detect repeated issues processing the queue.
1 parent e14d45f commit f718727

6 files changed

+280
-127
lines changed

pkg/monitor/sqsevent/asg-lifecycle-event.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ type LifecycleDetail struct {
5757
LifecycleTransition string `json:"LifecycleTransition"`
5858
}
5959

60-
func (m SQSMonitor) asgTerminationToInterruptionEvent(event EventBridgeEvent, messages []*sqs.Message) (monitor.InterruptionEvent, error) {
60+
func (m SQSMonitor) asgTerminationToInterruptionEvent(event EventBridgeEvent, message *sqs.Message) (monitor.InterruptionEvent, error) {
6161
lifecycleDetail := &LifecycleDetail{}
6262
err := json.Unmarshal(event.Detail, lifecycleDetail)
6363
if err != nil {
@@ -94,7 +94,7 @@ func (m SQSMonitor) asgTerminationToInterruptionEvent(event EventBridgeEvent, me
9494
log.Info().Msgf("Completed ASG Lifecycle Hook (%s) for instance %s",
9595
lifecycleDetail.LifecycleHookName,
9696
lifecycleDetail.EC2InstanceID)
97-
errs := m.deleteMessages(messages)
97+
errs := m.deleteMessages([]*sqs.Message{message})
9898
if errs != nil {
9999
return errs[0]
100100
}
@@ -111,7 +111,7 @@ func (m SQSMonitor) asgTerminationToInterruptionEvent(event EventBridgeEvent, me
111111

112112
if nodeName == "" {
113113
log.Info().Msg("Node name is empty, assuming instance was already terminated, deleting queue message")
114-
errs := m.deleteMessages(messages)
114+
errs := m.deleteMessages([]*sqs.Message{message})
115115
if errs != nil {
116116
log.Warn().Errs("errors", errs).Msg("There was an error deleting the messages")
117117
}

pkg/monitor/sqsevent/ec2-state-change-event.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ type EC2StateChangeDetail struct {
5050

5151
const instanceStatesToDrain = "stopping,stopped,shutting-down,terminated"
5252

53-
func (m SQSMonitor) ec2StateChangeToInterruptionEvent(event EventBridgeEvent, messages []*sqs.Message) (monitor.InterruptionEvent, error) {
53+
func (m SQSMonitor) ec2StateChangeToInterruptionEvent(event EventBridgeEvent, message *sqs.Message) (monitor.InterruptionEvent, error) {
5454
ec2StateChangeDetail := &EC2StateChangeDetail{}
5555
err := json.Unmarshal(event.Detail, ec2StateChangeDetail)
5656
if err != nil {
@@ -75,7 +75,7 @@ func (m SQSMonitor) ec2StateChangeToInterruptionEvent(event EventBridgeEvent, me
7575
Description: fmt.Sprintf("EC2 State Change event received. Instance went into %s at %s \n", ec2StateChangeDetail.State, event.getTime()),
7676
}
7777
interruptionEvent.PostDrainTask = func(interruptionEvent monitor.InterruptionEvent, n node.Node) error {
78-
errs := m.deleteMessages([]*sqs.Message{messages[0]})
78+
errs := m.deleteMessages([]*sqs.Message{message})
7979
if errs != nil {
8080
return errs[0]
8181
}

pkg/monitor/sqsevent/rebalance-recommendation-event.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ type RebalanceRecommendationDetail struct {
4646
InstanceID string `json:"instance-id"`
4747
}
4848

49-
func (m SQSMonitor) rebalanceRecommendationToInterruptionEvent(event EventBridgeEvent, messages []*sqs.Message) (monitor.InterruptionEvent, error) {
49+
func (m SQSMonitor) rebalanceRecommendationToInterruptionEvent(event EventBridgeEvent, message *sqs.Message) (monitor.InterruptionEvent, error) {
5050
rebalanceRecDetail := &RebalanceRecommendationDetail{}
5151
err := json.Unmarshal(event.Detail, rebalanceRecDetail)
5252
if err != nil {
@@ -67,7 +67,7 @@ func (m SQSMonitor) rebalanceRecommendationToInterruptionEvent(event EventBridge
6767
Description: fmt.Sprintf("Rebalance recommendation event received. Instance will be cordoned at %s \n", event.getTime()),
6868
}
6969
interruptionEvent.PostDrainTask = func(interruptionEvent monitor.InterruptionEvent, n node.Node) error {
70-
errs := m.deleteMessages(messages)
70+
errs := m.deleteMessages([]*sqs.Message{message})
7171
if errs != nil {
7272
return errs[0]
7373
}

pkg/monitor/sqsevent/spot-itn-event.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ type SpotInterruptionDetail struct {
4848
InstanceAction string `json:"instance-action"`
4949
}
5050

51-
func (m SQSMonitor) spotITNTerminationToInterruptionEvent(event EventBridgeEvent, messages []*sqs.Message) (monitor.InterruptionEvent, error) {
51+
func (m SQSMonitor) spotITNTerminationToInterruptionEvent(event EventBridgeEvent, message *sqs.Message) (monitor.InterruptionEvent, error) {
5252
spotInterruptionDetail := &SpotInterruptionDetail{}
5353
err := json.Unmarshal(event.Detail, spotInterruptionDetail)
5454
if err != nil {
@@ -69,7 +69,7 @@ func (m SQSMonitor) spotITNTerminationToInterruptionEvent(event EventBridgeEvent
6969
Description: fmt.Sprintf("Spot Interruption event received. Instance will be interrupted at %s \n", event.getTime()),
7070
}
7171
interruptionEvent.PostDrainTask = func(interruptionEvent monitor.InterruptionEvent, n node.Node) error {
72-
errs := m.deleteMessages([]*sqs.Message{messages[0]})
72+
errs := m.deleteMessages([]*sqs.Message{message})
7373
if errs != nil {
7474
return errs[0]
7575
}

pkg/monitor/sqsevent/sqs-monitor.go

Lines changed: 38 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -56,35 +56,48 @@ func (m SQSMonitor) Kind() string {
5656

5757
// Monitor continuously monitors SQS for events and sends interruption events to the passed in channel
5858
func (m SQSMonitor) Monitor() error {
59-
interruptionEvent, err := m.checkForSQSMessage()
59+
log.Debug().Msg("Checking for queue messages")
60+
messages, err := m.receiveQueueMessages(m.QueueURL)
6061
if err != nil {
61-
if errors.Is(err, ErrNodeStateNotRunning) {
62-
log.Warn().Err(err).Msg("dropping event for an already terminated node")
63-
return nil
64-
}
6562
return err
6663
}
67-
if interruptionEvent != nil && interruptionEvent.Kind == SQSTerminateKind {
68-
log.Debug().Msgf("Sending %s interruption event to the interruption channel", SQSTerminateKind)
69-
m.InterruptionChan <- *interruptionEvent
70-
}
71-
return nil
72-
}
7364

74-
// checkForSpotInterruptionNotice checks sqs for new messages and returns interruption events
75-
func (m SQSMonitor) checkForSQSMessage() (*monitor.InterruptionEvent, error) {
65+
failedEvents := 0
66+
for _, message := range messages {
67+
interruptionEvent, err := m.processSQSMessage(message)
68+
switch {
69+
case errors.Is(err, ErrNodeStateNotRunning):
70+
// If the node is no longer running, just log and delete the message. If message deletion fails, count it as an error.
71+
log.Warn().Err(err).Msg("dropping event for an already terminated node")
72+
errs := m.deleteMessages([]*sqs.Message{message})
73+
if len(errs) > 0 {
74+
log.Warn().Err(errs[0]).Msg("error deleting event for already terminated node")
75+
failedEvents++
76+
}
7677

77-
log.Debug().Msg("Checking for queue messages")
78-
messages, err := m.receiveQueueMessages(m.QueueURL)
79-
if err != nil {
80-
return nil, err
78+
case err != nil:
79+
// Log errors and record as failed events
80+
log.Warn().Err(err).Msg("ignoring event due to error")
81+
failedEvents++
82+
83+
case err == nil && interruptionEvent != nil && interruptionEvent.Kind == SQSTerminateKind:
84+
// Successfully processed SQS message into a SQSTerminateKind interruption event
85+
log.Debug().Msgf("Sending %s interruption event to the interruption channel", SQSTerminateKind)
86+
m.InterruptionChan <- *interruptionEvent
87+
}
8188
}
82-
if len(messages) == 0 {
83-
return nil, nil
89+
90+
if len(messages) > 0 && failedEvents == len(messages) {
91+
return fmt.Errorf("All of the waiting queue events could not be processed")
8492
}
8593

94+
return nil
95+
}
96+
97+
// processSQSMessage checks sqs for new messages and returns interruption events
98+
func (m SQSMonitor) processSQSMessage(message *sqs.Message) (*monitor.InterruptionEvent, error) {
8699
event := EventBridgeEvent{}
87-
err = json.Unmarshal([]byte(*messages[0].Body), &event)
100+
err := json.Unmarshal([]byte(*message.Body), &event)
88101
if err != nil {
89102
return nil, err
90103
}
@@ -93,17 +106,17 @@ func (m SQSMonitor) checkForSQSMessage() (*monitor.InterruptionEvent, error) {
93106

94107
switch event.Source {
95108
case "aws.autoscaling":
96-
interruptionEvent, err = m.asgTerminationToInterruptionEvent(event, messages)
109+
interruptionEvent, err = m.asgTerminationToInterruptionEvent(event, message)
97110
if err != nil {
98111
return nil, err
99112
}
100113
case "aws.ec2":
101114
if event.DetailType == "EC2 Instance State-change Notification" {
102-
interruptionEvent, err = m.ec2StateChangeToInterruptionEvent(event, messages)
115+
interruptionEvent, err = m.ec2StateChangeToInterruptionEvent(event, message)
103116
} else if event.DetailType == "EC2 Spot Instance Interruption Warning" {
104-
interruptionEvent, err = m.spotITNTerminationToInterruptionEvent(event, messages)
117+
interruptionEvent, err = m.spotITNTerminationToInterruptionEvent(event, message)
105118
} else if event.DetailType == "EC2 Instance Rebalance Recommendation" {
106-
interruptionEvent, err = m.rebalanceRecommendationToInterruptionEvent(event, messages)
119+
interruptionEvent, err = m.rebalanceRecommendationToInterruptionEvent(event, message)
107120
}
108121
if err != nil {
109122
return nil, err
@@ -140,7 +153,7 @@ func (m SQSMonitor) receiveQueueMessages(qURL string) ([]*sqs.Message, error) {
140153
aws.String(sqs.QueueAttributeNameAll),
141154
},
142155
QueueUrl: &qURL,
143-
MaxNumberOfMessages: aws.Int64(2),
156+
MaxNumberOfMessages: aws.Int64(5),
144157
VisibilityTimeout: aws.Int64(20), // 20 seconds
145158
WaitTimeSeconds: aws.Int64(0),
146159
})

0 commit comments

Comments
 (0)