Description
Describe the bug
When receiving shutdown signal, consumer beans are continuing to consume events, even after expected InputBindingLifecycle
phase of Integer.MAX_VALUE - 2048
(a guess).
Additionally output binders are disconnected earlier then input binders.
Went though stack trace and code and found ServiceBusInboundChannelAdapter
does not override doStop
method, although that bean is using different phase. Which looks weird and more confusing to answer question at which phase should binder stop accepting new events.
Exception or Stack Trace
2025-04-14T13:45:07.550+03:00 INFO 33481 --- [ receiverPump-2] c.s.demo.consumer.ShutdownTestConsumer : Consuming test event: 0069, source: test
2025-04-14T13:45:08.053+03:00 INFO 33481 --- [ receiverPump-2] c.s.demo.consumer.ShutdownTestConsumer : Triggering new event from consumer: 0069-out
2025-04-14T13:45:08.054+03:00 INFO 33481 --- [ receiverPump-2] c.a.m.s.ServiceBusSenderAsyncClient : {"az.sdk.message":"Sending batch.","batchSize":1}
2025-04-14T13:45:08.056+03:00 INFO 33481 --- [ionShutdownHook] .s.i.s.i.ServiceBusInboundChannelAdapter : stopped com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter@1c0e4262
2025-04-14T13:45:08.057+03:00 INFO 33481 --- [ionShutdownHook] o.s.i.monitor.IntegrationMBeanExporter : Registering MessageChannel 1213485528.shutdown-test.errors
2025-04-14T13:45:08.064+03:00 INFO 33481 --- [ionShutdownHook] o.s.c.stream.binder.BinderErrorChannel : Channel 'application.1213485528.shutdown-test.errors' has 1 subscriber(s).
2025-04-14T13:45:08.064+03:00 INFO 33481 --- [ionShutdownHook] o.s.c.stream.binder.BinderErrorChannel : Channel 'application.1213485528.shutdown-test.errors' has 0 subscriber(s).
2025-04-14T13:45:08.064+03:00 INFO 33481 --- [ionShutdownHook] o.s.i.endpoint.ReactiveStreamsConsumer : stopped bean 'produceShutdownTest_integrationflow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'
2025-04-14T13:45:08.064+03:00 INFO 33481 --- [ionShutdownHook] com.shut.demo.EventBindingsLifecycle : Received shutdown signal.
2025-04-14T13:45:08.064+03:00 INFO 33481 --- [ionShutdownHook] com.shut.demo.EventBindingsLifecycle : Sleeping for PT15S seconds after inbound binders should be shutdown.
2025-04-14T13:45:08.208+03:00 INFO 33481 --- [ receiverPump-2] c.s.demo.consumer.ShutdownTestConsumer : Consuming test event: 0070, source: test
2025-04-14T13:45:08.710+03:00 INFO 33481 --- [ receiverPump-2] c.s.demo.consumer.ShutdownTestConsumer : Triggering new event from consumer: 0070-out
2025-04-14T13:45:08.712+03:00 WARN 33481 --- [ receiverPump-2] o.s.i.channel.FluxMessageChannel : Error during processing event: GenericMessage [payload=byte[37], headers={contentType=application/json, id=b178bed1-eb11-43c9-bd46-fe88ab13f1a9, timestamp=1744627508711}]
org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'produceShutdownTest_integrationflow.channel#0'
at org.springframework.integration.support.utils.IntegrationUtils.wrapInDeliveryExceptionIfNecessary(IntegrationUtils.java:166) ~[spring-integration-core-6.4.3.jar:6.4.3]
at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:405) ~[spring-integration-core-6.4.3.jar:6.4.3]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:334) ~[spring-integration-core-6.4.3.jar:6.4.3]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:304) ~[spring-integration-core-6.4.3.jar:6.4.3]
at org.springframework.integration.channel.FluxMessageChannel.sendReactiveMessage(FluxMessageChannel.java:165) ~[spring-integration-core-6.4.3.jar:6.4.3]
at org.springframework.integration.channel.FluxMessageChannel.lambda$subscribeTo$4(FluxMessageChannel.java:148) ~[spring-integration-core-6.4.3.jar:6.4.3]
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:179) ~[reactor-core-3.7.4.jar:3.7.4]
at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2571) ~[reactor-core-3.7.4.jar:3.7.4]
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.request(FluxHandleFuseable.java:260) ~[reactor-core-3.7.4.jar:3.7.4]
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.request(FluxContextWrite.java:136) ~[reactor-core-3.7.4.jar:3.7.4]
at reactor.core.publisher.FluxFlatMap$FlatMapInner.onSubscribe(FluxFlatMap.java:968) ~[reactor-core-3.7.4.jar:3.7.4]
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onSubscribe(FluxContextWrite.java:101) ~[reactor-core-3.7.4.jar:3.7.4]
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onSubscribe(FluxHandleFuseable.java:164) ~[reactor-core-3.7.4.jar:3.7.4]
at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55) ~[reactor-core-3.7.4.jar:3.7.4]
at reactor.core.publisher.Mono.subscribe(Mono.java:4576) ~[reactor-core-3.7.4.jar:3.7.4]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:430) ~[reactor-core-3.7.4.jar:3.7.4]
at reactor.core.publisher.FluxDelaySubscription$DelaySubscriptionMainSubscriber.onNext(FluxDelaySubscription.java:189) ~[reactor-core-3.7.4.jar:3.7.4]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129) ~[reactor-core-3.7.4.jar:3.7.4]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129) ~[reactor-core-3.7.4.jar:3.7.4]
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210) ~[reactor-core-3.7.4.jar:3.7.4]
at reactor.core.publisher.SinkManyUnicast.drainRegular(SinkManyUnicast.java:284) ~[reactor-core-3.7.4.jar:3.7.4]
at reactor.core.publisher.SinkManyUnicast.drain(SinkManyUnicast.java:365) ~[reactor-core-3.7.4.jar:3.7.4]
at reactor.core.publisher.SinkManyUnicast.tryEmitNext(SinkManyUnicast.java:239) ~[reactor-core-3.7.4.jar:3.7.4]
at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100) ~[reactor-core-3.7.4.jar:3.7.4]
at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27) ~[reactor-core-3.7.4.jar:3.7.4]
at com.shut.demo.consumer.ShutdownTestConsumer.emitEventIfNeeded(ShutdownTestConsumer.java:33) ~[main/:na]
at com.shut.demo.consumer.ShutdownTestConsumer.accept(ShutdownTestConsumer.java:25) ~[main/:na]
at com.shut.demo.consumer.ShutdownTestConsumer.accept(ShutdownTestConsumer.java:13) ~[main/:na]
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.invokeConsumer(SimpleFunctionRegistry.java:1064) ~[spring-cloud-function-context-4.2.2.jar:4.2.2]
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:762) ~[spring-cloud-function-context-4.2.2.jar:4.2.2]
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:593) ~[spring-cloud-function-context-4.2.2.jar:4.2.2]
at org.springframework.cloud.stream.function.PartitionAwareFunctionWrapper.apply(PartitionAwareFunctionWrapper.java:92) ~[spring-cloud-stream-4.2.1.jar:4.2.1]
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionWrapper.apply(FunctionConfiguration.java:823) ~[spring-cloud-stream-4.2.1.jar:4.2.1]
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.handleMessageInternal(FunctionConfiguration.java:661) ~[spring-cloud-stream-4.2.1.jar:4.2.1]
at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105) ~[spring-integration-core-6.4.3.jar:6.4.3]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73) ~[spring-integration-core-6.4.3.jar:6.4.3]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:132) ~[spring-integration-core-6.4.3.jar:6.4.3]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148) ~[spring-integration-core-6.4.3.jar:6.4.3]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) ~[spring-integration-core-6.4.3.jar:6.4.3]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-6.4.3.jar:6.4.3]
at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:390) ~[spring-integration-core-6.4.3.jar:6.4.3]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:334) ~[spring-integration-core-6.4.3.jar:6.4.3]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:304) ~[spring-integration-core-6.4.3.jar:6.4.3]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-6.2.5.jar:6.2.5]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-6.2.5.jar:6.2.5]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-6.2.5.jar:6.2.5]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-6.2.5.jar:6.2.5]
at org.springframework.integration.endpoint.MessageProducerSupport.lambda$sendMessage$1(MessageProducerSupport.java:261) ~[spring-integration-core-6.4.3.jar:6.4.3]
at io.micrometer.observation.Observation.observe(Observation.java:498) ~[micrometer-observation-1.14.5.jar:1.14.5]
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:261) ~[spring-integration-core-6.4.3.jar:6.4.3]
at com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter.access$000(ServiceBusInboundChannelAdapter.java:73) ~[spring-integration-azure-servicebus-5.22.0.jar:5.22.0]
at com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter$IntegrationRecordMessageListener.onMessage(ServiceBusInboundChannelAdapter.java:202) ~[spring-integration-azure-servicebus-5.22.0.jar:5.22.0]
at com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter$IntegrationRecordMessageListener.onMessage(ServiceBusInboundChannelAdapter.java:187) ~[spring-integration-azure-servicebus-5.22.0.jar:5.22.0]
at com.azure.messaging.servicebus.MessagePump.notifyMessage(MessagePump.java:163) ~[azure-messaging-servicebus-7.17.10.jar:7.17.10]
at com.azure.messaging.servicebus.MessagePump.lambda$handleMessage$3(MessagePump.java:148) ~[azure-messaging-servicebus-7.17.10.jar:7.17.10]
at com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation.wrap(ServiceBusReceiverInstrumentation.java:176) ~[azure-messaging-servicebus-7.17.10.jar:7.17.10]
at com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation.instrumentProcess(ServiceBusReceiverInstrumentation.java:106) ~[azure-messaging-servicebus-7.17.10.jar:7.17.10]
at com.azure.messaging.servicebus.MessagePump.handleMessage(MessagePump.java:141) ~[azure-messaging-servicebus-7.17.10.jar:7.17.10]
at com.azure.messaging.servicebus.MessagePump$RunOnWorker.lambda$apply$0(MessagePump.java:226) ~[azure-messaging-servicebus-7.17.10.jar:7.17.10]
at reactor.core.publisher.MonoRunnable.call(MonoRunnable.java:73) ~[reactor-core-3.7.4.jar:3.7.4]
at reactor.core.publisher.MonoRunnable.call(MonoRunnable.java:32) ~[reactor-core-3.7.4.jar:3.7.4]
at reactor.core.publisher.FluxSubscribeOnCallable$CallableSubscribeOnSubscription.run(FluxSubscribeOnCallable.java:228) ~[reactor-core-3.7.4.jar:3.7.4]
at reactor.core.scheduler.ImmediateScheduler.schedule(ImmediateScheduler.java:52) ~[reactor-core-3.7.4.jar:3.7.4]
at reactor.core.publisher.MonoSubscribeOnCallable.subscribe(MonoSubscribeOnCallable.java:52) ~[reactor-core-3.7.4.jar:3.7.4]
at reactor.core.publisher.Mono.subscribe(Mono.java:4576) ~[reactor-core-3.7.4.jar:3.7.4]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:430) ~[reactor-core-3.7.4.jar:3.7.4]
at com.azure.messaging.servicebus.TracingFluxOperator.lambda$hookOnNext$0(TracingFluxOperator.java:69) ~[azure-messaging-servicebus-7.17.10.jar:7.17.10]
at com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation.instrumentProcess(ServiceBusReceiverInstrumentation.java:99) ~[azure-messaging-servicebus-7.17.10.jar:7.17.10]
at com.azure.messaging.servicebus.TracingFluxOperator$1.lambda$subscribe$0(TracingFluxOperator.java:39) ~[azure-messaging-servicebus-7.17.10.jar:7.17.10]
at com.azure.messaging.servicebus.TracingFluxOperator.hookOnNext(TracingFluxOperator.java:68) ~[azure-messaging-servicebus-7.17.10.jar:7.17.10]
at reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160) ~[reactor-core-3.7.4.jar:3.7.4]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122) ~[reactor-core-3.7.4.jar:3.7.4]
at com.azure.core.amqp.implementation.MessageFlux$RecoverableReactorReceiver.drainLoop(MessageFlux.java:476) ~[azure-core-amqp-2.9.16.jar:2.9.16]
at com.azure.core.amqp.implementation.MessageFlux$RecoverableReactorReceiver.drain(MessageFlux.java:405) ~[azure-core-amqp-2.9.16.jar:2.9.16]
at com.azure.core.amqp.implementation.MessageFlux$ReactorReceiverMediator.onNext(MessageFlux.java:878) ~[azure-core-amqp-2.9.16.jar:2.9.16]
at com.azure.core.amqp.implementation.MessageFlux$ReactorReceiverMediator.onNext(MessageFlux.java:722) ~[azure-core-amqp-2.9.16.jar:2.9.16]
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:446) ~[reactor-core-3.7.4.jar:3.7.4]
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:533) ~[reactor-core-3.7.4.jar:3.7.4]
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84) ~[reactor-core-3.7.4.jar:3.7.4]
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37) ~[reactor-core-3.7.4.jar:3.7.4]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) ~[na:na]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]
Caused by: java.lang.IllegalStateException: The [bean 'produceShutdownTest_integrationflow.channel#0'] doesn't have subscribers to accept messages
at org.springframework.util.Assert.state(Assert.java:101) ~[spring-core-6.2.5.jar:6.2.5]
at org.springframework.integration.channel.FluxMessageChannel.doSend(FluxMessageChannel.java:63) ~[spring-integration-core-6.4.3.jar:6.4.3]
at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:390) ~[spring-integration-core-6.4.3.jar:6.4.3]
... 83 common frames omitted
To Reproduce
Created project to reproduce shutdown-investigation-spring-3.
EventBindingsLifecycle
is created to emulate long shutdown process (15sec) at phase Integer.MIN_VALUE + 1001
Code Snippet
@Component("consumeShutdownTest")
class ShutdownTestConsumer implements Consumer<ShutdownTestDto> {
private static final Logger log = LoggerFactory.getLogger(ShutdownTestConsumer.class);
@Autowired
private ShutdownTestSupplier supplier;
@Override
public void accept(ShutdownTestDto event) {
log.info("Consuming test event: {}, source: {}", event.id(), event.source());
sleep(event);
emitEventIfNeeded(event);
}
private void emitEventIfNeeded(ShutdownTestDto event) {
if (event.source().equals("test")) {
ShutdownTestDto newDto = new ShutdownTestDto(event.id() + "-out", "consumer");
log.info("Triggering new event from consumer: {}", newDto.id());
supplier.getSink().emitNext(
MessageBuilder.withPayload(newDto).build(),
Sinks.EmitFailureHandler.busyLooping(Duration.ofMillis(500))
);
}
}
private void sleep(ShutdownTestDto event) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("Interrupted while processing event {}", event.id());
}
}
}
Expected behavior
- Stop accepting new events for consumption
- Wait until all events are processed
- Stop output binders
Setup (please complete the following information):
- Gradle
- com.azure.spring:spring-cloud-azure-stream-binder-servicebus:5.22.0
- com.azure:azure-messaging-servicebus:7.17.10
- org.springframework.cloud:spring-cloud-stream:4.2.1
- Java version: 21
- Frameworks: Spring Boot 3.4.4
Information Checklist
Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report
- Bug Description Added
- Repro Steps Added
- Setup information Added
Metadata
Metadata
Assignees
Labels
Type
Projects
Status