Skip to content

Commit 273f8f9

Browse files
author
João Victor Calassio
authored
Add acknowledgement mode on @SqsListener annotation (#870)
* Add ack mode parameter to SqsListener annotation Fixes gh-761 * Remove unrelated file from PR * Fix style differences * Rename acknowledgement enum to be consistent with other nomenclatures * Update docs with @SqsListener acknowledgement mode param * Add JavaDoc for method and test enum value - Added missing JavaDoc for getAcknowledgementMode public method - Added test to ensure that new SqsListenerAcknowledgementMode enum will have the same values as the original AcknowledgementMode enum * Change annotation ack from enum to string - Change SqsListenerAcknowledgementMode from enum to class with const strings - Change typo on "acknowledgement" annotation field - Remove unnecessary field on SqsContainerOptionsBuilder - Move integration tests to its own test suite * Update documentation and code comments - Update docs to reflect new "DEFAULT" annotation ack mode - Update comments with latest changes * Set empty string as default behavior - Remove "DEFAULT" annotation acknowledgement mode, use empty string as default - Update docs * Add author after latest changes - Add author on modified classes * Fix inverted condition when resolving AcknowledgementMode value
1 parent f7eb0ab commit 273f8f9

File tree

9 files changed

+572
-16
lines changed

9 files changed

+572
-16
lines changed

docs/src/main/asciidoc/sqs.adoc

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -636,7 +636,9 @@ Note that if there are messages available the call may return earlier than this
636636
- `messageVisibilitySeconds` - Set the minimum visibility for the messages retrieved in a poll.
637637
Note that for `FIFO` single message listener methods, this visibility is applied to the whole batch before each message is sent to the listener.
638638
See <<FIFO Support>> for more information.
639-
639+
- `acknowledgementMode` - Set the acknowledgement mode for the container.
640+
If any value is set, it will take precedence over the acknowledgement mode defined for the container factory options.
641+
See <<Acknowledgement Mode>> for more information.
640642

641643
===== Listener Method Arguments
642644

@@ -1326,6 +1328,8 @@ NOTE: All options are available for both `single message` and `batch` message li
13261328
- `ALWAYS` - Acknowledges a message or batch of messages after processing returns success or error.
13271329
- `MANUAL` - The framework won't acknowledge messages automatically and `Acknowledgement` objects can be received in the listener method.
13281330

1331+
The `Acknowledgement` strategy can be configured in the `SqsContainerOptions` or in the `@SqsListener` annotation.
1332+
13291333
==== Acknowledgement Batching
13301334

13311335
The `acknowledgementInterval` and `acknowledgementThreshold` options enable acknowledgement batching.

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/annotation/AbstractListenerAnnotationBeanPostProcessor.java

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,22 +22,10 @@
2222
import io.awspring.cloud.sqs.config.HandlerMethodEndpoint;
2323
import io.awspring.cloud.sqs.config.SqsEndpoint;
2424
import io.awspring.cloud.sqs.config.SqsListenerConfigurer;
25+
import io.awspring.cloud.sqs.listener.acknowledgement.handler.AcknowledgementMode;
2526
import io.awspring.cloud.sqs.support.resolver.AcknowledgmentHandlerMethodArgumentResolver;
2627
import io.awspring.cloud.sqs.support.resolver.BatchAcknowledgmentArgumentResolver;
2728
import io.awspring.cloud.sqs.support.resolver.BatchPayloadMethodArgumentResolver;
28-
import java.lang.annotation.Annotation;
29-
import java.lang.reflect.Method;
30-
import java.util.ArrayList;
31-
import java.util.Arrays;
32-
import java.util.Collection;
33-
import java.util.Collections;
34-
import java.util.HashSet;
35-
import java.util.List;
36-
import java.util.Map;
37-
import java.util.concurrent.atomic.AtomicInteger;
38-
import java.util.stream.Collectors;
39-
import java.util.stream.Stream;
40-
import java.util.stream.StreamSupport;
4129
import org.springframework.aop.support.AopUtils;
4230
import org.springframework.beans.BeansException;
4331
import org.springframework.beans.factory.BeanFactory;
@@ -69,11 +57,26 @@
6957
import org.springframework.util.Assert;
7058
import org.springframework.util.StringUtils;
7159

60+
import java.lang.annotation.Annotation;
61+
import java.lang.reflect.Method;
62+
import java.util.ArrayList;
63+
import java.util.Arrays;
64+
import java.util.Collection;
65+
import java.util.Collections;
66+
import java.util.HashSet;
67+
import java.util.List;
68+
import java.util.Map;
69+
import java.util.concurrent.atomic.AtomicInteger;
70+
import java.util.stream.Collectors;
71+
import java.util.stream.Stream;
72+
import java.util.stream.StreamSupport;
73+
7274
/**
7375
* {@link BeanPostProcessor} implementation that scans beans for a {@link SqsListener @SqsListener} annotation, extracts
7476
* information to a {@link SqsEndpoint}, and registers it in the {@link EndpointRegistrar}.
7577
*
7678
* @author Tomaz Fernandes
79+
* @author Joao Calassio
7780
* @since 3.0
7881
*/
7982
public abstract class AbstractListenerAnnotationBeanPostProcessor<A extends Annotation>
@@ -219,6 +222,17 @@ protected Integer resolveAsInteger(String value, String propertyName) {
219222
}
220223
}
221224

225+
@Nullable
226+
protected AcknowledgementMode resolveAcknowledgement(String value) {
227+
try {
228+
final String resolvedValue = resolveAsString(value, "acknowledgementMode");
229+
return StringUtils.hasText(resolvedValue) ? AcknowledgementMode.valueOf(resolvedValue) : null;
230+
}
231+
catch (IllegalArgumentException e) {
232+
throw new IllegalArgumentException("Cannot resolve " + value + " as AcknowledgementMode", e);
233+
}
234+
}
235+
222236
protected String getEndpointId(String id) {
223237
if (StringUtils.hasText(id)) {
224238
return resolveAsString(id, "id");

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/annotation/SqsListener.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
* @author Alain Sahli
7575
* @author Matej Nedic
7676
* @author Tomaz Fernandes
77+
* @author Joao Calassio
7778
* @since 1.1
7879
*/
7980
@Target(ElementType.METHOD)
@@ -137,4 +138,10 @@
137138
*/
138139
String messageVisibilitySeconds() default "";
139140

141+
/**
142+
* The acknowledgement mode to be used for the provided queues. If not specified, the acknowledgement mode defined
143+
* for the container factory will be used.
144+
*/
145+
String acknowledgementMode() default "";
146+
140147
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright 2013-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.awspring.cloud.sqs.annotation;
17+
18+
import io.awspring.cloud.sqs.listener.acknowledgement.handler.AlwaysAcknowledgementHandler;
19+
import io.awspring.cloud.sqs.listener.acknowledgement.handler.NeverAcknowledgementHandler;
20+
import io.awspring.cloud.sqs.listener.acknowledgement.handler.OnSuccessAcknowledgementHandler;
21+
22+
/**
23+
* Acknowledgement strategies supported by the {@link SqsListener} annotation.
24+
*
25+
* @author Joao Calassio
26+
* @since 3.1
27+
* @see OnSuccessAcknowledgementHandler
28+
* @see AlwaysAcknowledgementHandler
29+
* @see NeverAcknowledgementHandler
30+
* @see io.awspring.cloud.sqs.listener.ContainerOptions
31+
* @see SqsListener
32+
*/
33+
public class SqsListenerAcknowledgementMode {
34+
35+
/**
36+
* Messages will be acknowledged when message processing is successful.
37+
*/
38+
public static final String ON_SUCCESS = "ON_SUCCESS";
39+
40+
/**
41+
* Messages will be acknowledged whether processing was completed successfully or with an error.
42+
*/
43+
public static final String ALWAYS = "ALWAYS";
44+
45+
/**
46+
* Messages will not be acknowledged automatically by the container.
47+
* @see io.awspring.cloud.sqs.listener.acknowledgement.Acknowledgement
48+
*/
49+
public static final String MANUAL = "MANUAL";
50+
51+
}

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/annotation/SqsListenerAnnotationBeanPostProcessor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
* {@link AbstractListenerAnnotationBeanPostProcessor} implementation for {@link SqsListener @SqsListener}.
3131
*
3232
* @author Tomaz Fernandes
33+
* @author Joao Calassio
3334
* @since 3.0
3435
*/
3536
public class SqsListenerAnnotationBeanPostProcessor extends AbstractListenerAnnotationBeanPostProcessor<SqsListener> {
@@ -51,7 +52,7 @@ protected Endpoint createEndpoint(SqsListener sqsListenerAnnotation) {
5152
resolveAsInteger(sqsListenerAnnotation.maxConcurrentMessages(), "maxConcurrentMessages"))
5253
.messageVisibility(
5354
resolveAsInteger(sqsListenerAnnotation.messageVisibilitySeconds(), "messageVisibility"))
54-
.build();
55+
.acknowledgementMode(resolveAcknowledgement(sqsListenerAnnotation.acknowledgementMode())).build();
5556
}
5657

5758
@Override

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/SqsEndpoint.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package io.awspring.cloud.sqs.config;
1717

1818
import io.awspring.cloud.sqs.annotation.SqsListener;
19+
import io.awspring.cloud.sqs.listener.acknowledgement.handler.AcknowledgementMode;
1920
import java.time.Duration;
2021
import java.util.Collection;
2122
import org.springframework.lang.Nullable;
@@ -26,6 +27,7 @@
2627
* Contains properties that should be mapped from {@link SqsListener @SqsListener} annotations.
2728
*
2829
* @author Tomaz Fernandes
30+
* @author Joao Calassio
2931
* @since 3.0
3032
*/
3133
public class SqsEndpoint extends AbstractEndpoint {
@@ -38,12 +40,16 @@ public class SqsEndpoint extends AbstractEndpoint {
3840

3941
private final Integer maxMessagesPerPoll;
4042

43+
@Nullable
44+
private final AcknowledgementMode acknowledgementMode;
45+
4146
protected SqsEndpoint(SqsEndpointBuilder builder) {
4247
super(builder.queueNames, builder.factoryName, builder.id);
4348
this.maxConcurrentMessages = builder.maxConcurrentMessages;
4449
this.pollTimeoutSeconds = builder.pollTimeoutSeconds;
4550
this.messageVisibility = builder.messageVisibility;
4651
this.maxMessagesPerPoll = builder.maxMessagesPerPoll;
52+
this.acknowledgementMode = builder.acknowledgementMode;
4753
}
4854

4955
/**
@@ -91,6 +97,15 @@ public Duration getMessageVisibility() {
9197
return this.messageVisibility != null ? Duration.ofSeconds(this.messageVisibility) : null;
9298
}
9399

100+
/**
101+
* Returns the acknowledgement mode configured for this endpoint.
102+
* @return the acknowledgement mode.
103+
*/
104+
@Nullable
105+
public AcknowledgementMode getAcknowledgementMode() {
106+
return this.acknowledgementMode;
107+
}
108+
94109
public static class SqsEndpointBuilder {
95110

96111
private Collection<String> queueNames;
@@ -107,6 +122,9 @@ public static class SqsEndpointBuilder {
107122

108123
private Integer maxMessagesPerPoll;
109124

125+
@Nullable
126+
private AcknowledgementMode acknowledgementMode;
127+
110128
public SqsEndpointBuilder queueNames(Collection<String> queueNames) {
111129
this.queueNames = queueNames;
112130
return this;
@@ -142,6 +160,11 @@ public SqsEndpointBuilder id(String id) {
142160
return this;
143161
}
144162

163+
public SqsEndpointBuilder acknowledgementMode(@Nullable AcknowledgementMode acknowledgementMode) {
164+
this.acknowledgementMode = acknowledgementMode;
165+
return this;
166+
}
167+
145168
public SqsEndpoint build() {
146169
return new SqsEndpoint(this);
147170
}

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/SqsMessageListenerContainerFactory.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@
122122
* used.
123123
*
124124
* @author Tomaz Fernandes
125+
* @author Joao Calassio
125126
* @since 3.0
126127
* @see SqsMessageListenerContainer
127128
* @see ContainerOptions
@@ -161,7 +162,8 @@ private void configureFromSqsEndpoint(SqsEndpoint sqsEndpoint, SqsContainerOptio
161162
ConfigUtils.INSTANCE.acceptIfNotNull(sqsEndpoint.getMaxConcurrentMessages(), options::maxConcurrentMessages)
162163
.acceptIfNotNull(sqsEndpoint.getMaxMessagesPerPoll(), options::maxMessagesPerPoll)
163164
.acceptIfNotNull(sqsEndpoint.getPollTimeout(), options::pollTimeout)
164-
.acceptIfNotNull(sqsEndpoint.getMessageVisibility(), options::messageVisibility);
165+
.acceptIfNotNull(sqsEndpoint.getMessageVisibility(), options::messageVisibility)
166+
.acceptIfNotNull(sqsEndpoint.getAcknowledgementMode(), options::acknowledgementMode);
165167
}
166168

167169
/**
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright 2013-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.awspring.cloud.sqs.annotation;
17+
18+
import static org.junit.jupiter.api.Assertions.assertEquals;
19+
20+
import io.awspring.cloud.sqs.listener.acknowledgement.handler.AcknowledgementMode;
21+
import java.lang.reflect.Field;
22+
import org.junit.jupiter.params.ParameterizedTest;
23+
import org.junit.jupiter.params.provider.EnumSource;
24+
25+
/**
26+
* Tests for {@link SqsListenerAcknowledgementMode} enum values
27+
*
28+
* @author Joao Calassio
29+
*/
30+
class SqsListenerAcknowledgementModeTests {
31+
32+
@ParameterizedTest
33+
@EnumSource(AcknowledgementMode.class)
34+
void shouldHaveAllValuesOfAcknowledgementModeEnum(final AcknowledgementMode acknowledgementMode)
35+
throws NoSuchFieldException, IllegalAccessException {
36+
Class<SqsListenerAcknowledgementMode> clz = SqsListenerAcknowledgementMode.class;
37+
Field correspondingValue = clz.getDeclaredField(acknowledgementMode.name());
38+
assertEquals(acknowledgementMode.name(), correspondingValue.get(clz));
39+
}
40+
41+
}

0 commit comments

Comments
 (0)