Skip to content

Commit b7aedb5

Browse files
Fix SQS MessageInterceptor exceptions handling (#1600)
Closes #1595 When a MessageInterceptor throws an exception, the MessageListenerExecutionStage is never reached, so the exception never gets wrapped in ListenerExecutionFailedException. All downstream pipeline stages (ErrorHandler, after-processing interceptors, AcknowledgementHandler) expect to unwrap a message from the exception chain and fail with IllegalArgumentException: No ListenerExecutionFailedException found to unwrap messages. - Introduce MessageProcessingException interface as a common marker for exceptions that carry message references through the processing pipeline - Add InterceptorExecutionFailedException for failures in before-processing interceptors ListenerExecutionFailedException now implements MessageProcessingException; static utility methods deprecated in favor of MessageProcessingException equivalents - Wrap interceptor exceptions in InterceptorExecutionFailedException via exceptionallyCompose in AbstractBeforeProcessingInterceptorExecutionStage (both single and batch paths) - Update all downstream stages (ErrorHandlerExecutionStage, AbstractAfterProcessingInterceptorExecutionStage, AcknowledgementHandlerExecutionStage) to use MessageProcessingException for unwrapping - Update observation adapter in AsyncComponentAdapters to preserve exception type when re-wrapping with updated message headers
1 parent fc6298d commit b7aedb5

13 files changed

+452
-77
lines changed

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AsyncComponentAdapters.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -381,17 +381,23 @@ public Message<MessageType> afterExecution(Message<MessageType> message) {
381381

382382
@Override
383383
public Message<MessageType> onExecutionError(Message<MessageType> message, Throwable t) {
384-
if (observationContext != null && ListenerExecutionFailedException.hasListenerException(t)) {
384+
if (observationContext != null && MessageProcessingException.hasProcessingException(t)) {
385385
Message<MessageType> failedMessage = Objects.requireNonNull(
386-
ListenerExecutionFailedException.unwrapMessage(t),
387-
"Message not found in Listener Exception.");
386+
MessageProcessingException.unwrapMessage(t), "Message not found in processing exception.");
388387
Message<MessageType> messageWithHeader = MessageHeaderUtils.addHeaderIfAbsent(failedMessage,
389388
ObservationThreadLocalAccessor.KEY, observationContext);
390-
throw new ListenerExecutionFailedException(t.getMessage(), t.getCause(), messageWithHeader);
389+
throw rewrapWithUpdatedMessage(t, messageWithHeader);
391390
}
392391
return message;
393392
}
394393
}
395394
}
396395

396+
private static <T> RuntimeException rewrapWithUpdatedMessage(Throwable t, Message<T> message) {
397+
if (t instanceof InterceptorExecutionFailedException) {
398+
return new InterceptorExecutionFailedException(t.getMessage(), t.getCause(), message);
399+
}
400+
return new ListenerExecutionFailedException(t.getMessage(), t.getCause(), message);
401+
}
402+
397403
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright 2013-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.awspring.cloud.sqs.listener;
17+
18+
import java.util.Collection;
19+
import java.util.Collections;
20+
import java.util.stream.Collectors;
21+
import org.springframework.messaging.Message;
22+
23+
/**
24+
* Exception thrown when a {@link io.awspring.cloud.sqs.listener.interceptor.MessageInterceptor} throws during
25+
* before-processing execution. Contains the {@link Message} instance or instances for which the interceptor failed,
26+
* allowing downstream pipeline stages such as the error handler and acknowledgement handler to retrieve the message and
27+
* act accordingly.
28+
*
29+
* @author Tomaz Fernandes
30+
* @since 4.1
31+
* @see MessageProcessingException
32+
* @see ListenerExecutionFailedException
33+
*/
34+
public class InterceptorExecutionFailedException extends RuntimeException implements MessageProcessingException {
35+
36+
private final Collection<Message<?>> failedMessages;
37+
38+
public InterceptorExecutionFailedException(String message, Throwable cause, Message<?> failedMessage) {
39+
super(message, cause);
40+
this.failedMessages = Collections.singletonList(failedMessage);
41+
}
42+
43+
public <T> InterceptorExecutionFailedException(String message, Throwable cause,
44+
Collection<Message<T>> failedMessages) {
45+
super(message, cause);
46+
this.failedMessages = failedMessages.stream().map(msg -> (Message<?>) msg).collect(Collectors.toList());
47+
}
48+
49+
@Override
50+
public Collection<Message<?>> getFailedMessages() {
51+
return this.failedMessages;
52+
}
53+
54+
}

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ListenerExecutionFailedException.java

Lines changed: 19 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919
import java.util.Collections;
2020
import java.util.stream.Collectors;
2121
import org.jspecify.annotations.Nullable;
22-
import org.slf4j.Logger;
23-
import org.slf4j.LoggerFactory;
2422
import org.springframework.messaging.Message;
2523
import org.springframework.util.Assert;
2624

@@ -31,9 +29,7 @@
3129
* @author Tomaz Fernandes
3230
* @since 3.0
3331
*/
34-
public class ListenerExecutionFailedException extends RuntimeException {
35-
36-
private static final Logger logger = LoggerFactory.getLogger(ListenerExecutionFailedException.class);
32+
public class ListenerExecutionFailedException extends RuntimeException implements MessageProcessingException {
3733

3834
private final Collection<Message<?>> failedMessages;
3935

@@ -66,57 +62,42 @@ public Collection<Message<?>> getFailedMessages() {
6662
}
6763

6864
/**
69-
* Look for a potentially nested {@link ListenerExecutionFailedException} and if found return the wrapped
70-
* {@link Message} instance.
65+
* Look for a potentially nested {@link MessageProcessingException} in the cause chain and if found return the
66+
* wrapped {@link Message} instance.
7167
* @param t the throwable
7268
* @param <T> the message type.
7369
* @return the message.
70+
* @deprecated use {@link MessageProcessingException#unwrapMessage(Throwable)} instead.
7471
*/
75-
// @formatter:off
76-
@SuppressWarnings("unchecked")
72+
@Deprecated
7773
@Nullable
7874
public static <T> Message<T> unwrapMessage(Throwable t) {
79-
Throwable exception = findListenerException(t);
80-
return t == null
81-
? null
82-
: exception != null
83-
? (Message<T>) ((ListenerExecutionFailedException) exception).getFailedMessage()
84-
: (Message<T>) wrapAndRethrowError(t);
75+
return MessageProcessingException.unwrapMessage(t);
8576
}
8677

8778
/**
88-
* Look for a potentially nested {@link ListenerExecutionFailedException} and if found return the wrapped {@link Message} instances.
79+
* Look for a potentially nested {@link MessageProcessingException} in the cause chain and if found return the
80+
* wrapped {@link Message} instances.
8981
* @param t the throwable
9082
* @param <T> the message type.
9183
* @return the messages.
84+
* @deprecated use {@link MessageProcessingException#unwrapMessages(Throwable)} instead.
9285
*/
93-
@SuppressWarnings("unchecked")
86+
@Deprecated
9487
@Nullable
9588
public static <T> Collection<Message<T>> unwrapMessages(Throwable t) {
96-
Throwable exception = findListenerException(t);
97-
return t == null
98-
? null
99-
: exception != null
100-
? ((ListenerExecutionFailedException) exception).getFailedMessages().stream().map(msg -> (Message<T>) msg).collect(Collectors.toList())
101-
: (Collection<Message<T>>) wrapAndRethrowError(t);
102-
}
103-
104-
@Nullable
105-
private static Throwable findListenerException(Throwable t) {
106-
return t == null
107-
? null
108-
: t instanceof ListenerExecutionFailedException
109-
? t
110-
: findListenerException(t.getCause());
111-
}
112-
// @formatter:on
113-
114-
private static Object wrapAndRethrowError(Throwable t) {
115-
throw new IllegalArgumentException("No ListenerExecutionFailedException found to unwrap messages.", t);
89+
return MessageProcessingException.unwrapMessages(t);
11690
}
11791

92+
/**
93+
* Check whether a {@link MessageProcessingException} is present anywhere in the cause chain of {@code t}.
94+
* @param t the throwable.
95+
* @return whether a {@link MessageProcessingException} is present.
96+
* @deprecated use {@link MessageProcessingException#hasProcessingException(Throwable)} instead.
97+
*/
98+
@Deprecated
11899
public static boolean hasListenerException(Throwable t) {
119-
return findListenerException(t) != null;
100+
return MessageProcessingException.hasProcessingException(t);
120101
}
121102

122103
}
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Copyright 2013-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.awspring.cloud.sqs.listener;
17+
18+
import java.util.Collection;
19+
import java.util.stream.Collectors;
20+
import org.jspecify.annotations.Nullable;
21+
import org.springframework.messaging.Message;
22+
23+
/**
24+
* Implemented by exceptions that carry references to the {@link Message} instances that failed during processing.
25+
* Provides static utility methods for traversing the cause chain and extracting message references.
26+
*
27+
* @author Tomaz Fernandes
28+
* @since 4.1
29+
* @see ListenerExecutionFailedException
30+
* @see InterceptorExecutionFailedException
31+
*/
32+
public interface MessageProcessingException {
33+
34+
/**
35+
* Return the messages for which processing failed.
36+
* @return the messages.
37+
*/
38+
Collection<Message<?>> getFailedMessages();
39+
40+
/**
41+
* Look for a potentially nested {@link MessageProcessingException} in the cause chain and if found return the
42+
* wrapped {@link Message} instance.
43+
* @param t the throwable
44+
* @param <T> the message payload type.
45+
* @return the message, or {@code null} if {@code t} is {@code null}.
46+
*/
47+
@SuppressWarnings("unchecked")
48+
@Nullable
49+
static <T> Message<T> unwrapMessage(@Nullable Throwable t) {
50+
Throwable exception = findProcessingException(t);
51+
return t == null ? null
52+
: exception != null
53+
? (Message<T>) ((MessageProcessingException) exception).getFailedMessages().iterator().next()
54+
: (Message<T>) wrapAndRethrowError(t);
55+
}
56+
57+
/**
58+
* Look for a potentially nested {@link MessageProcessingException} in the cause chain and if found return the
59+
* wrapped {@link Message} instances.
60+
* @param t the throwable
61+
* @param <T> the message payload type.
62+
* @return the messages, or {@code null} if {@code t} is {@code null}.
63+
*/
64+
@SuppressWarnings("unchecked")
65+
@Nullable
66+
static <T> Collection<Message<T>> unwrapMessages(@Nullable Throwable t) {
67+
Throwable exception = findProcessingException(t);
68+
return t == null ? null
69+
: exception != null
70+
? ((MessageProcessingException) exception).getFailedMessages().stream()
71+
.map(msg -> (Message<T>) msg).collect(Collectors.toList())
72+
: (Collection<Message<T>>) wrapAndRethrowError(t);
73+
}
74+
75+
/**
76+
* Check whether a {@link MessageProcessingException} is present anywhere in the cause chain of {@code t}.
77+
* @param t the throwable.
78+
* @return {@code true} if a {@link MessageProcessingException} is present.
79+
*/
80+
static boolean hasProcessingException(Throwable t) {
81+
return findProcessingException(t) != null;
82+
}
83+
84+
@Nullable
85+
private static Throwable findProcessingException(@Nullable Throwable t) {
86+
return t == null ? null : t instanceof MessageProcessingException ? t : findProcessingException(t.getCause());
87+
}
88+
89+
private static Object wrapAndRethrowError(Throwable t) {
90+
throw new IllegalArgumentException("No MessageProcessingException found to unwrap messages.", t);
91+
}
92+
93+
}

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/pipeline/AbstractAfterProcessingInterceptorExecutionStage.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,12 @@
1616
package io.awspring.cloud.sqs.listener.pipeline;
1717

1818
import io.awspring.cloud.sqs.CompletableFutures;
19-
import io.awspring.cloud.sqs.listener.ListenerExecutionFailedException;
2019
import io.awspring.cloud.sqs.listener.MessageProcessingContext;
20+
import io.awspring.cloud.sqs.listener.MessageProcessingException;
2121
import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor;
2222
import java.util.Collection;
2323
import java.util.concurrent.CompletableFuture;
24+
import org.jspecify.annotations.Nullable;
2425
import org.slf4j.Logger;
2526
import org.slf4j.LoggerFactory;
2627
import org.springframework.messaging.Message;
@@ -43,7 +44,7 @@ public CompletableFuture<Message<T>> process(CompletableFuture<Message<T>> messa
4344
return CompletableFutures.handleCompose(messageFuture,
4445
(v, t) -> t == null
4546
? applyInterceptors(v, null, getMessageInterceptors(context))
46-
: applyInterceptors(ListenerExecutionFailedException.unwrapMessage(t), t, getMessageInterceptors(context))
47+
: applyInterceptors(MessageProcessingException.unwrapMessage(t), t, getMessageInterceptors(context))
4748
.thenCompose(msg -> CompletableFutures.failedFuture(t)));
4849
}
4950

@@ -64,11 +65,12 @@ public CompletableFuture<Collection<Message<T>>> processMany(
6465
return CompletableFutures.handleCompose(messagesFuture,
6566
(v, t) -> t == null
6667
? applyInterceptors(v, null, getMessageInterceptors(context))
67-
: applyInterceptors(ListenerExecutionFailedException.unwrapMessages(t), t, getMessageInterceptors(context))
68+
// unwrapMessages is @Nullable but never returns null when t != null — it either returns the collection or throws
69+
: applyInterceptors(MessageProcessingException.unwrapMessages(t), t, getMessageInterceptors(context))
6870
.thenCompose(msg -> CompletableFutures.failedFuture(t)));
6971
}
7072

71-
private CompletableFuture<Collection<Message<T>>> applyInterceptors(Collection<Message<T>> messages, Throwable t,
73+
private CompletableFuture<Collection<Message<T>>> applyInterceptors(Collection<Message<T>> messages, @Nullable Throwable t,
7274
Collection<AsyncMessageInterceptor<T>> messageInterceptors) {
7375
return messageInterceptors.stream()
7476
.reduce(CompletableFuture.<Void>completedFuture(null),

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/pipeline/AbstractBeforeProcessingInterceptorExecutionStage.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,11 @@
1515
*/
1616
package io.awspring.cloud.sqs.listener.pipeline;
1717

18+
import io.awspring.cloud.sqs.CompletableFutures;
1819
import io.awspring.cloud.sqs.MessageHeaderUtils;
20+
import io.awspring.cloud.sqs.listener.InterceptorExecutionFailedException;
1921
import io.awspring.cloud.sqs.listener.MessageProcessingContext;
22+
import io.awspring.cloud.sqs.listener.MessageProcessingException;
2023
import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor;
2124
import java.util.Collection;
2225
import java.util.concurrent.CompletableFuture;
@@ -40,20 +43,26 @@ public abstract class AbstractBeforeProcessingInterceptorExecutionStage<T> imple
4043
@Override
4144
public CompletableFuture<Message<T>> process(Message<T> message, MessageProcessingContext<T> context) {
4245
logger.trace("Processing message {}", MessageHeaderUtils.getId(message));
43-
return getInterceptors(context).stream().reduce(CompletableFuture.completedFuture(message), (messageFuture,
44-
interceptor) -> messageFuture.thenCompose(interceptor::intercept).thenApply(validateMessageNotNull()),
45-
(a, b) -> a);
46+
return CompletableFutures.exceptionallyCompose(
47+
getInterceptors(context).stream().reduce(CompletableFuture.completedFuture(message),
48+
(messageFuture, interceptor) -> messageFuture.thenCompose(interceptor::intercept)
49+
.thenApply(validateMessageNotNull()),
50+
(a, b) -> a),
51+
t -> CompletableFutures.failedFuture(MessageProcessingException.hasProcessingException(t) ? t
52+
: new InterceptorExecutionFailedException("Interceptor threw an exception", t, message)));
4653
}
4754

4855
@Override
4956
public CompletableFuture<Collection<Message<T>>> process(Collection<Message<T>> messages,
5057
MessageProcessingContext<T> context) {
5158
logger.trace("Processing messages {}", MessageHeaderUtils.getId(messages));
52-
return getInterceptors(context)
53-
.stream().reduce(
54-
CompletableFuture.completedFuture(messages), (messageFuture, interceptor) -> messageFuture
59+
return CompletableFutures.exceptionallyCompose(
60+
getInterceptors(context).stream().reduce(CompletableFuture.completedFuture(messages),
61+
(messageFuture, interceptor) -> messageFuture
5562
.thenCompose(interceptor::intercept).thenApply(validateMessagesNotEmpty()),
56-
(a, b) -> a);
63+
(a, b) -> a),
64+
t -> CompletableFutures.failedFuture(MessageProcessingException.hasProcessingException(t) ? t
65+
: new InterceptorExecutionFailedException("Interceptor threw an exception", t, messages)));
5766
}
5867

5968
protected abstract Collection<AsyncMessageInterceptor<T>> getInterceptors(MessageProcessingContext<T> context);

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/pipeline/AcknowledgementHandlerExecutionStage.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
package io.awspring.cloud.sqs.listener.pipeline;
1717

1818
import io.awspring.cloud.sqs.CompletableFutures;
19-
import io.awspring.cloud.sqs.listener.ListenerExecutionFailedException;
2019
import io.awspring.cloud.sqs.listener.MessageProcessingContext;
20+
import io.awspring.cloud.sqs.listener.MessageProcessingException;
2121
import io.awspring.cloud.sqs.listener.acknowledgement.handler.AcknowledgementHandler;
2222
import java.util.Collection;
2323
import java.util.concurrent.CompletableFuture;
@@ -47,16 +47,17 @@ public CompletableFuture<Message<T>> process(CompletableFuture<Message<T>> messa
4747
return CompletableFutures.handleCompose(messageFuture, (v, t) -> t == null
4848
? this.acknowledgementHandler.onSuccess(v, context.getAcknowledgmentCallback()).thenApply(theVoid -> v)
4949
: this.acknowledgementHandler
50-
.onError(ListenerExecutionFailedException.unwrapMessage(t), t,
51-
context.getAcknowledgmentCallback())
50+
.onError(MessageProcessingException.unwrapMessage(t), t, context.getAcknowledgmentCallback())
5251
.thenCompose(theVoid -> CompletableFutures.failedFuture(t)));
5352
}
5453

5554
@Override
5655
public CompletableFuture<Collection<Message<T>>> processMany(
5756
CompletableFuture<Collection<Message<T>>> messagesFuture, MessageProcessingContext<T> context) {
5857
return CompletableFutures.handleCompose(messagesFuture, (v, t) -> {
59-
Collection<Message<T>> originalMessages = ListenerExecutionFailedException.unwrapMessages(t);
58+
// unwrapMessages returns null when t is null, but originalMessages is only used in the t != null branch
59+
// below
60+
Collection<Message<T>> originalMessages = MessageProcessingException.unwrapMessages(t);
6061
return t == null
6162
? this.acknowledgementHandler.onSuccess(v, context.getAcknowledgmentCallback())
6263
.thenApply(theVoid -> v)

0 commit comments

Comments
 (0)