Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -381,17 +381,23 @@ public Message<MessageType> afterExecution(Message<MessageType> message) {

@Override
public Message<MessageType> onExecutionError(Message<MessageType> message, Throwable t) {
if (observationContext != null && ListenerExecutionFailedException.hasListenerException(t)) {
if (observationContext != null && MessageProcessingException.hasProcessingException(t)) {
Message<MessageType> failedMessage = Objects.requireNonNull(
ListenerExecutionFailedException.unwrapMessage(t),
"Message not found in Listener Exception.");
MessageProcessingException.unwrapMessage(t), "Message not found in processing exception.");
Message<MessageType> messageWithHeader = MessageHeaderUtils.addHeaderIfAbsent(failedMessage,
ObservationThreadLocalAccessor.KEY, observationContext);
throw new ListenerExecutionFailedException(t.getMessage(), t.getCause(), messageWithHeader);
throw rewrapWithUpdatedMessage(t, messageWithHeader);
}
return message;
}
}
}

private static <T> RuntimeException rewrapWithUpdatedMessage(Throwable t, Message<T> message) {
if (t instanceof InterceptorExecutionFailedException) {
return new InterceptorExecutionFailedException(t.getMessage(), t.getCause(), message);
}
return new ListenerExecutionFailedException(t.getMessage(), t.getCause(), message);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2013-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.awspring.cloud.sqs.listener;

import java.util.Collection;
import java.util.Collections;
import java.util.stream.Collectors;
import org.springframework.messaging.Message;

/**
* Exception thrown when a {@link io.awspring.cloud.sqs.listener.interceptor.MessageInterceptor} throws during
* before-processing execution. Contains the {@link Message} instance or instances for which the interceptor failed,
* allowing downstream pipeline stages such as the error handler and acknowledgement handler to retrieve the message and
* act accordingly.
*
* @author Tomaz Fernandes
* @since 4.1
* @see MessageProcessingException
* @see ListenerExecutionFailedException
*/
public class InterceptorExecutionFailedException extends RuntimeException implements MessageProcessingException {

private final Collection<Message<?>> failedMessages;

public InterceptorExecutionFailedException(String message, Throwable cause, Message<?> failedMessage) {
super(message, cause);
this.failedMessages = Collections.singletonList(failedMessage);
}

public <T> InterceptorExecutionFailedException(String message, Throwable cause,
Collection<Message<T>> failedMessages) {
super(message, cause);
this.failedMessages = failedMessages.stream().map(msg -> (Message<?>) msg).collect(Collectors.toList());
}

@Override
public Collection<Message<?>> getFailedMessages() {
return this.failedMessages;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
import java.util.Collections;
import java.util.stream.Collectors;
import org.jspecify.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;

Expand All @@ -31,9 +29,7 @@
* @author Tomaz Fernandes
* @since 3.0
*/
public class ListenerExecutionFailedException extends RuntimeException {

private static final Logger logger = LoggerFactory.getLogger(ListenerExecutionFailedException.class);
public class ListenerExecutionFailedException extends RuntimeException implements MessageProcessingException {

private final Collection<Message<?>> failedMessages;

Expand Down Expand Up @@ -66,57 +62,42 @@ public Collection<Message<?>> getFailedMessages() {
}

/**
* Look for a potentially nested {@link ListenerExecutionFailedException} and if found return the wrapped
* {@link Message} instance.
* Look for a potentially nested {@link MessageProcessingException} in the cause chain and if found return the
* wrapped {@link Message} instance.
* @param t the throwable
* @param <T> the message type.
* @return the message.
* @deprecated use {@link MessageProcessingException#unwrapMessage(Throwable)} instead.
*/
// @formatter:off
@SuppressWarnings("unchecked")
@Deprecated
@Nullable
public static <T> Message<T> unwrapMessage(Throwable t) {
Throwable exception = findListenerException(t);
return t == null
? null
: exception != null
? (Message<T>) ((ListenerExecutionFailedException) exception).getFailedMessage()
: (Message<T>) wrapAndRethrowError(t);
return MessageProcessingException.unwrapMessage(t);
}

/**
* Look for a potentially nested {@link ListenerExecutionFailedException} and if found return the wrapped {@link Message} instances.
* Look for a potentially nested {@link MessageProcessingException} in the cause chain and if found return the
* wrapped {@link Message} instances.
* @param t the throwable
* @param <T> the message type.
* @return the messages.
* @deprecated use {@link MessageProcessingException#unwrapMessages(Throwable)} instead.
*/
@SuppressWarnings("unchecked")
@Deprecated
@Nullable
public static <T> Collection<Message<T>> unwrapMessages(Throwable t) {
Throwable exception = findListenerException(t);
return t == null
? null
: exception != null
? ((ListenerExecutionFailedException) exception).getFailedMessages().stream().map(msg -> (Message<T>) msg).collect(Collectors.toList())
: (Collection<Message<T>>) wrapAndRethrowError(t);
}

@Nullable
private static Throwable findListenerException(Throwable t) {
return t == null
? null
: t instanceof ListenerExecutionFailedException
? t
: findListenerException(t.getCause());
}
// @formatter:on

private static Object wrapAndRethrowError(Throwable t) {
throw new IllegalArgumentException("No ListenerExecutionFailedException found to unwrap messages.", t);
return MessageProcessingException.unwrapMessages(t);
}

/**
* Check whether a {@link MessageProcessingException} is present anywhere in the cause chain of {@code t}.
* @param t the throwable.
* @return whether a {@link MessageProcessingException} is present.
* @deprecated use {@link MessageProcessingException#hasProcessingException(Throwable)} instead.
*/
@Deprecated
public static boolean hasListenerException(Throwable t) {
return findListenerException(t) != null;
return MessageProcessingException.hasProcessingException(t);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright 2013-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.awspring.cloud.sqs.listener;

import java.util.Collection;
import java.util.stream.Collectors;
import org.jspecify.annotations.Nullable;
import org.springframework.messaging.Message;

/**
* Implemented by exceptions that carry references to the {@link Message} instances that failed during processing.
* Provides static utility methods for traversing the cause chain and extracting message references.
*
* @author Tomaz Fernandes
* @since 4.1
* @see ListenerExecutionFailedException
* @see InterceptorExecutionFailedException
*/
public interface MessageProcessingException {

/**
* Return the messages for which processing failed.
* @return the messages.
*/
Collection<Message<?>> getFailedMessages();

/**
* Look for a potentially nested {@link MessageProcessingException} in the cause chain and if found return the
* wrapped {@link Message} instance.
* @param t the throwable
* @param <T> the message payload type.
* @return the message, or {@code null} if {@code t} is {@code null}.
*/
@SuppressWarnings("unchecked")
@Nullable
static <T> Message<T> unwrapMessage(@Nullable Throwable t) {
Throwable exception = findProcessingException(t);
return t == null ? null
: exception != null
? (Message<T>) ((MessageProcessingException) exception).getFailedMessages().iterator().next()
: (Message<T>) wrapAndRethrowError(t);
}

/**
* Look for a potentially nested {@link MessageProcessingException} in the cause chain and if found return the
* wrapped {@link Message} instances.
* @param t the throwable
* @param <T> the message payload type.
* @return the messages, or {@code null} if {@code t} is {@code null}.
*/
@SuppressWarnings("unchecked")
@Nullable
static <T> Collection<Message<T>> unwrapMessages(@Nullable Throwable t) {
Throwable exception = findProcessingException(t);
return t == null ? null
: exception != null
? ((MessageProcessingException) exception).getFailedMessages().stream()
.map(msg -> (Message<T>) msg).collect(Collectors.toList())
: (Collection<Message<T>>) wrapAndRethrowError(t);
}

/**
* Check whether a {@link MessageProcessingException} is present anywhere in the cause chain of {@code t}.
* @param t the throwable.
* @return {@code true} if a {@link MessageProcessingException} is present.
*/
static boolean hasProcessingException(Throwable t) {
return findProcessingException(t) != null;
}

@Nullable
private static Throwable findProcessingException(@Nullable Throwable t) {
return t == null ? null : t instanceof MessageProcessingException ? t : findProcessingException(t.getCause());
}

private static Object wrapAndRethrowError(Throwable t) {
throw new IllegalArgumentException("No MessageProcessingException found to unwrap messages.", t);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@
package io.awspring.cloud.sqs.listener.pipeline;

import io.awspring.cloud.sqs.CompletableFutures;
import io.awspring.cloud.sqs.listener.ListenerExecutionFailedException;
import io.awspring.cloud.sqs.listener.MessageProcessingContext;
import io.awspring.cloud.sqs.listener.MessageProcessingException;
import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import org.jspecify.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
Expand All @@ -43,7 +44,7 @@ public CompletableFuture<Message<T>> process(CompletableFuture<Message<T>> messa
return CompletableFutures.handleCompose(messageFuture,
(v, t) -> t == null
? applyInterceptors(v, null, getMessageInterceptors(context))
: applyInterceptors(ListenerExecutionFailedException.unwrapMessage(t), t, getMessageInterceptors(context))
: applyInterceptors(MessageProcessingException.unwrapMessage(t), t, getMessageInterceptors(context))
.thenCompose(msg -> CompletableFutures.failedFuture(t)));
}

Expand All @@ -64,11 +65,12 @@ public CompletableFuture<Collection<Message<T>>> processMany(
return CompletableFutures.handleCompose(messagesFuture,
(v, t) -> t == null
? applyInterceptors(v, null, getMessageInterceptors(context))
: applyInterceptors(ListenerExecutionFailedException.unwrapMessages(t), t, getMessageInterceptors(context))
// unwrapMessages is @Nullable but never returns null when t != null — it either returns the collection or throws
: applyInterceptors(MessageProcessingException.unwrapMessages(t), t, getMessageInterceptors(context))
.thenCompose(msg -> CompletableFutures.failedFuture(t)));
}

private CompletableFuture<Collection<Message<T>>> applyInterceptors(Collection<Message<T>> messages, Throwable t,
private CompletableFuture<Collection<Message<T>>> applyInterceptors(Collection<Message<T>> messages, @Nullable Throwable t,
Collection<AsyncMessageInterceptor<T>> messageInterceptors) {
return messageInterceptors.stream()
.reduce(CompletableFuture.<Void>completedFuture(null),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
*/
package io.awspring.cloud.sqs.listener.pipeline;

import io.awspring.cloud.sqs.CompletableFutures;
import io.awspring.cloud.sqs.MessageHeaderUtils;
import io.awspring.cloud.sqs.listener.InterceptorExecutionFailedException;
import io.awspring.cloud.sqs.listener.MessageProcessingContext;
import io.awspring.cloud.sqs.listener.MessageProcessingException;
import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
Expand All @@ -40,20 +43,26 @@ public abstract class AbstractBeforeProcessingInterceptorExecutionStage<T> imple
@Override
public CompletableFuture<Message<T>> process(Message<T> message, MessageProcessingContext<T> context) {
logger.trace("Processing message {}", MessageHeaderUtils.getId(message));
return getInterceptors(context).stream().reduce(CompletableFuture.completedFuture(message), (messageFuture,
interceptor) -> messageFuture.thenCompose(interceptor::intercept).thenApply(validateMessageNotNull()),
(a, b) -> a);
return CompletableFutures.exceptionallyCompose(
getInterceptors(context).stream().reduce(CompletableFuture.completedFuture(message),
(messageFuture, interceptor) -> messageFuture.thenCompose(interceptor::intercept)
.thenApply(validateMessageNotNull()),
(a, b) -> a),
t -> CompletableFutures.failedFuture(MessageProcessingException.hasProcessingException(t) ? t
: new InterceptorExecutionFailedException("Interceptor threw an exception", t, message)));
}

@Override
public CompletableFuture<Collection<Message<T>>> process(Collection<Message<T>> messages,
MessageProcessingContext<T> context) {
logger.trace("Processing messages {}", MessageHeaderUtils.getId(messages));
return getInterceptors(context)
.stream().reduce(
CompletableFuture.completedFuture(messages), (messageFuture, interceptor) -> messageFuture
return CompletableFutures.exceptionallyCompose(
getInterceptors(context).stream().reduce(CompletableFuture.completedFuture(messages),
(messageFuture, interceptor) -> messageFuture
.thenCompose(interceptor::intercept).thenApply(validateMessagesNotEmpty()),
(a, b) -> a);
(a, b) -> a),
t -> CompletableFutures.failedFuture(MessageProcessingException.hasProcessingException(t) ? t
: new InterceptorExecutionFailedException("Interceptor threw an exception", t, messages)));
}

protected abstract Collection<AsyncMessageInterceptor<T>> getInterceptors(MessageProcessingContext<T> context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
package io.awspring.cloud.sqs.listener.pipeline;

import io.awspring.cloud.sqs.CompletableFutures;
import io.awspring.cloud.sqs.listener.ListenerExecutionFailedException;
import io.awspring.cloud.sqs.listener.MessageProcessingContext;
import io.awspring.cloud.sqs.listener.MessageProcessingException;
import io.awspring.cloud.sqs.listener.acknowledgement.handler.AcknowledgementHandler;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -47,16 +47,17 @@ public CompletableFuture<Message<T>> process(CompletableFuture<Message<T>> messa
return CompletableFutures.handleCompose(messageFuture, (v, t) -> t == null
? this.acknowledgementHandler.onSuccess(v, context.getAcknowledgmentCallback()).thenApply(theVoid -> v)
: this.acknowledgementHandler
.onError(ListenerExecutionFailedException.unwrapMessage(t), t,
context.getAcknowledgmentCallback())
.onError(MessageProcessingException.unwrapMessage(t), t, context.getAcknowledgmentCallback())
.thenCompose(theVoid -> CompletableFutures.failedFuture(t)));
}

@Override
public CompletableFuture<Collection<Message<T>>> processMany(
CompletableFuture<Collection<Message<T>>> messagesFuture, MessageProcessingContext<T> context) {
return CompletableFutures.handleCompose(messagesFuture, (v, t) -> {
Collection<Message<T>> originalMessages = ListenerExecutionFailedException.unwrapMessages(t);
// unwrapMessages returns null when t is null, but originalMessages is only used in the t != null branch
// below
Collection<Message<T>> originalMessages = MessageProcessingException.unwrapMessages(t);
return t == null
? this.acknowledgementHandler.onSuccess(v, context.getAcknowledgmentCallback())
.thenApply(theVoid -> v)
Expand Down
Loading
Loading