Skip to content

Exceptions are not handled when thrown in a SQS MessageInterceptor #1595

@raihan-ste

Description

@raihan-ste

Type: Bug

Component: SQS
Describe the bug
maybe similar to #1379

Unsure whether this is expected behavior or not. Exceptions thrown in io.awspring.cloud.sqs.listener.interceptor.MessageInterceptor instances aren't caught by io.awspring.cloud.sqs.listener.errorhandler.ErrorHandler instances. Instead, an exception is thrown: java.lang.IllegalArgumentException: No ListenerExecutionFailedException found to unwrap messages. I'd like to use an interceptor to drop messages if they already been processed. My plan was to throw an exception in an interceptor and then catch it in an errorhandler.

I'm using io.awspring.cloud:spring-cloud-aws-starter-sqs version 4.0.0.

java.util.concurrent.CompletionException: java.lang.IllegalArgumentException: No ListenerExecutionFailedException found to unwrap messages.
	at java.base/java.util.concurrent.CompletableFuture.wrapInCompletionException(CompletableFuture.java:323)
	at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:359)
	at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:364)
	at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:957)
	at java.base/java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:971)
	at java.base/java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2399)
	at io.awspring.cloud.sqs.CompletableFutures.handleCompose(CompletableFutures.java:73)
	at io.awspring.cloud.sqs.listener.pipeline.AcknowledgementHandlerExecutionStage.process(AcknowledgementHandlerExecutionStage.java:47)
	at io.awspring.cloud.sqs.listener.pipeline.MessageProcessingPipelineBuilder$FutureComposingMessagePipelineStage.process(MessageProcessingPipelineBuilder.java:104)
	at io.awspring.cloud.sqs.listener.sink.AbstractMessageProcessingPipelineSink.lambda$execute$0(AbstractMessageProcessingPipelineSink.java:135)
	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1789)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1090)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:614)
	at java.base/java.lang.Thread.run(Thread.java:1474)
Caused by: java.lang.IllegalArgumentException: No ListenerExecutionFailedException found to unwrap messages.
	at io.awspring.cloud.sqs.listener.ListenerExecutionFailedException.wrapAndRethrowError(ListenerExecutionFailedException.java:115)
	at io.awspring.cloud.sqs.listener.ListenerExecutionFailedException.unwrapMessage(ListenerExecutionFailedException.java:84)
	at io.awspring.cloud.sqs.listener.pipeline.AcknowledgementHandlerExecutionStage.lambda$process$2(AcknowledgementHandlerExecutionStage.java:50)
	at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:955)
	... 10 common frames omitted
Caused by: java.util.concurrent.CompletionException: java.lang.IllegalArgumentException: No ListenerExecutionFailedException found to unwrap messages.
	at java.base/java.util.concurrent.CompletableFuture.wrapInCompletionException(CompletableFuture.java:323)
	at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:359)
	at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:364)
	at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:957)
	at java.base/java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:971)
	at java.base/java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2399)
	at io.awspring.cloud.sqs.CompletableFutures.handleCompose(CompletableFutures.java:73)
	at io.awspring.cloud.sqs.listener.pipeline.AbstractAfterProcessingInterceptorExecutionStage.process(AbstractAfterProcessingInterceptorExecutionStage.java:43)
	at io.awspring.cloud.sqs.listener.pipeline.MessageProcessingPipelineBuilder$FutureComposingMessagePipelineStage.process(MessageProcessingPipelineBuilder.java:104)
	... 6 common frames omitted
Caused by: java.lang.IllegalArgumentException: No ListenerExecutionFailedException found to unwrap messages.
	at io.awspring.cloud.sqs.listener.ListenerExecutionFailedException.wrapAndRethrowError(ListenerExecutionFailedException.java:115)
	at io.awspring.cloud.sqs.listener.ListenerExecutionFailedException.unwrapMessage(ListenerExecutionFailedException.java:84)
	at io.awspring.cloud.sqs.listener.pipeline.AbstractAfterProcessingInterceptorExecutionStage.lambda$process$1(AbstractAfterProcessingInterceptorExecutionStage.java:46)
	at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:955)
	... 11 common frames omitted
Caused by: java.util.concurrent.CompletionException: java.lang.IllegalArgumentException: No ListenerExecutionFailedException found to unwrap messages.
	at java.base/java.util.concurrent.CompletableFuture.wrapInCompletionException(CompletableFuture.java:323)
	at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:359)
	at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:364)
	at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:957)
	at java.base/java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:971)
	at java.base/java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2399)
	at io.awspring.cloud.sqs.CompletableFutures.handleCompose(CompletableFutures.java:73)
	at io.awspring.cloud.sqs.listener.pipeline.AbstractAfterProcessingInterceptorExecutionStage.process(AbstractAfterProcessingInterceptorExecutionStage.java:43)
	at io.awspring.cloud.sqs.listener.pipeline.MessageProcessingPipelineBuilder$FutureComposingMessagePipelineStage.process(MessageProcessingPipelineBuilder.java:104)
	... 7 common frames omitted
Caused by: java.lang.IllegalArgumentException: No ListenerExecutionFailedException found to unwrap messages.
	at io.awspring.cloud.sqs.listener.ListenerExecutionFailedException.wrapAndRethrowError(ListenerExecutionFailedException.java:115)
	at io.awspring.cloud.sqs.listener.ListenerExecutionFailedException.unwrapMessage(ListenerExecutionFailedException.java:84)
	at io.awspring.cloud.sqs.listener.pipeline.AbstractAfterProcessingInterceptorExecutionStage.lambda$process$1(AbstractAfterProcessingInterceptorExecutionStage.java:46)
	at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:955)
	... 12 common frames omitted
Caused by: java.util.concurrent.CompletionException: java.lang.IllegalArgumentException: No ListenerExecutionFailedException found to unwrap messages.
	at java.base/java.util.concurrent.CompletableFuture.wrapInCompletionException(CompletableFuture.java:323)
	at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:359)
	at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:364)
	at java.base/java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:1015)
	at java.base/java.util.concurrent.CompletableFuture.uniExceptionallyStage(CompletableFuture.java:1029)
	at java.base/java.util.concurrent.CompletableFuture.exceptionally(CompletableFuture.java:2423)
	at io.awspring.cloud.sqs.CompletableFutures.exceptionallyCompose(CompletableFutures.java:57)
	at io.awspring.cloud.sqs.listener.pipeline.ErrorHandlerExecutionStage.process(ErrorHandlerExecutionStage.java:50)
	at io.awspring.cloud.sqs.listener.pipeline.MessageProcessingPipelineBuilder$FutureComposingMessagePipelineStage.process(MessageProcessingPipelineBuilder.java:104)
	... 8 common frames omitted
Caused by: java.lang.IllegalArgumentException: No ListenerExecutionFailedException found to unwrap messages.
	at io.awspring.cloud.sqs.listener.ListenerExecutionFailedException.wrapAndRethrowError(ListenerExecutionFailedException.java:115)
	at io.awspring.cloud.sqs.listener.ListenerExecutionFailedException.unwrapMessage(ListenerExecutionFailedException.java:84)
	at io.awspring.cloud.sqs.listener.pipeline.ErrorHandlerExecutionStage.lambda$process$0(ErrorHandlerExecutionStage.java:51)
	at java.base/java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:1011)
	... 13 common frames omitted
Caused by: java.util.concurrent.CompletionException: io.awspring.cloud.sqs.listener.AsyncAdapterBlockingExecutionFailedException: Error executing action in BlockingMessageInterceptorAdapter
	at java.base/java.util.concurrent.CompletableFuture.wrapInCompletionException(CompletableFuture.java:323)
	at java.base/java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:412)
	at java.base/java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1210)
	at java.base/java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2368)
	at io.awspring.cloud.sqs.listener.pipeline.AbstractBeforeProcessingInterceptorExecutionStage.lambda$process$0(AbstractBeforeProcessingInterceptorExecutionStage.java:44)
	at java.base/java.util.stream.ReduceOps$1ReducingSink.accept(ReduceOps.java:80)
	at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1716)
	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:570)
	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:560)
	at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:265)
	at java.base/java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:703)
	at io.awspring.cloud.sqs.listener.pipeline.AbstractBeforeProcessingInterceptorExecutionStage.process(AbstractBeforeProcessingInterceptorExecutionStage.java:43)
	at io.awspring.cloud.sqs.listener.pipeline.MessageProcessingPipelineBuilder$ComposingMessagePipelineStage.lambda$process$0(MessageProcessingPipelineBuilder.java:80)
	at java.base/java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1208)
	at java.base/java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2368)
	at io.awspring.cloud.sqs.listener.pipeline.MessageProcessingPipelineBuilder$ComposingMessagePipelineStage.process(MessageProcessingPipelineBuilder.java:80)
	at io.awspring.cloud.sqs.listener.pipeline.MessageProcessingPipelineBuilder$ComposingMessagePipelineStage.process(MessageProcessingPipelineBuilder.java:80)
	... 9 common frames omitted
Caused by: io.awspring.cloud.sqs.listener.AsyncAdapterBlockingExecutionFailedException: Error executing action in BlockingMessageInterceptorAdapter
	at io.awspring.cloud.sqs.listener.AsyncComponentAdapters$AbstractThreadingComponentAdapter.wrapWithBlockingException(AsyncComponentAdapters.java:178)
	at io.awspring.cloud.sqs.listener.AsyncComponentAdapters$AbstractThreadingComponentAdapter.supplyInSameThread(AsyncComponentAdapters.java:160)
	at io.awspring.cloud.sqs.listener.AsyncComponentAdapters$AbstractThreadingComponentAdapter.execute(AsyncComponentAdapters.java:117)
	at io.awspring.cloud.sqs.listener.AsyncComponentAdapters$BlockingMessageInterceptorAdapter.intercept(AsyncComponentAdapters.java:231)
	at java.base/java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1208)
	... 24 common frames omitted
Caused by: com.test.sqstest.MessageAlreadyProcessedException
	at com.test.sqstest.TestInterceptor.intercept(TestInterceptor.kt:25)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
	at java.base/java.lang.reflect.Method.invoke(Method.java:565)
	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:359)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:158)
	at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:133)
	at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:371)
	at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:130)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
	at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:719)
	at com.test.sqstest.TestInterceptor$$SpringCGLIB$$0.intercept(<generated>)
	at io.awspring.cloud.sqs.listener.AsyncComponentAdapters$AbstractThreadingComponentAdapter.lambda$withFunctionThreadLocalScope$2(AsyncComponentAdapters.java:187)
	at io.awspring.cloud.sqs.listener.AsyncComponentAdapters$AbstractThreadingComponentAdapter.supplyInSameThread(AsyncComponentAdapters.java:157)
	... 27 common frames omitted

Sample

@Component
class TestInterceptor : MessageInterceptor<Any> {
    override fun intercept(message: Message<Any>): Message<Any> {
        val messageId = message.headers.id!!

        if (claimMessage(messageId) == 0) {
            // message already processed
            throw MessageAlreadyProcessedException(messageId)
        }
        return message
    }
}
@Component
class TestErrorHandler : ErrorHandler<Any> {
    // never called
    override fun handle(message: Message<Any>, t: Throwable) {
        if (t is MessageAlreadyProcessedException) {
            logger.info("Message with id $messageId already processed")
        } else {
            throw t
        }
    }
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    component: sqsSQS integration related issuetype: bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions