Skip to content

Spring cloud stream service bus binder health recovery #35266

Open
@dhananjay12

Description

@dhananjay12

Background

We are using service bus binder which pushes message to Azure service bus topics. The app is running in a K8s environment.

spring-cloud-azure-stream-binder-servicebus - 4.4.1
Spring boot -2.7.7

There are unknown situation where the connectivity for the app and azure-service-bus is lost, resulting in readiness probe to be down.
image

In the logs I can see that the expected error

2023-06-02 03:20:39.976 ERROR 63250 --- [ctor-executor-9] reactor.core.publisher.Operators         : Operator called default onErrorDropped

reactor.core.Exceptions$ErrorCallbackNotImplemented: com.azure.messaging.servicebus.ServiceBusException: Retries exhausted: 3/3
Caused by: com.azure.messaging.servicebus.ServiceBusException: Retries exhausted: 3/3
	at com.azure.messaging.servicebus.ServiceBusSenderAsyncClient.mapError(ServiceBusSenderAsyncClient.java:848) ~[azure-messaging-servicebus-7.11.0.jar:7.11.0]
	at reactor.core.publisher.Mono.lambda$onErrorMap$31(Mono.java:3811) ~[reactor-core-3.4.26.jar:3.4.26]
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94) ~[reactor-core-3.4.26.jar:3.4.26]
	at reactor.core.publisher.FluxDoOnEach$DoOnEachSubscriber.onError(FluxDoOnEach.java:195) ~[reactor-core-3.4.26.jar:3.4.26]
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onError(MonoPeekTerminal.java:258) ~[reactor-core-3.4.26.jar:3.4.26]
	at reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:124) ~[reactor-core-3.4.26.jar:3.4.26]
	at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.whenError(FluxRetryWhen.java:225) ~[reactor-core-3.4.26.jar:3.4.26]
	at reactor.core.publisher.FluxRetryWhen$RetryWhenOtherSubscriber.onError(FluxRetryWhen.java:274) ~[reactor-core-3.4.26.jar:3.4.26]
	at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onError(FluxContextWrite.java:121) ~[reactor-core-3.4.26.jar:3.4.26]
	at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:415) ~[reactor-core-3.4.26.jar:3.4.26]
	at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onNext(FluxConcatMap.java:251) ~[reactor-core-3.4.26.jar:3.4.26]
	at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107) ~[reactor-core-3.4.26.jar:3.4.26]
	at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:537) ~[reactor-core-3.4.26.jar:3.4.26]
	at reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:343) ~[reactor-core-3.4.26.jar:3.4.26]
	at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100) ~[reactor-core-3.4.26.jar:3.4.26]
	at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27) ~[reactor-core-3.4.26.jar:3.4.26]
	at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onError(FluxRetryWhen.java:190) ~[reactor-core-3.4.26.jar:3.4.26]
	at reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:124) ~[reactor-core-3.4.26.jar:3.4.26]
	at reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:124) ~[reactor-core-3.4.26.jar:3.4.26]
	at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.onError(FluxTimeout.java:219) ~[reactor-core-3.4.26.jar:3.4.26]
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:172) ~[reactor-core-3.4.26.jar:3.4.26]
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onError(FluxPeekFuseable.java:234) ~[reactor-core-3.4.26.jar:3.4.26]
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:192) ~[reactor-core-3.4.26.jar:3.4.26]
	at reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:259) ~[reactor-core-3.4.26.jar:3.4.26]
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:172) ~[reactor-core-3.4.26.jar:3.4.26]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:142) ~[reactor-core-3.4.26.jar:3.4.26]
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:172) ~[reactor-core-3.4.26.jar:3.4.26]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:142) ~[reactor-core-3.4.26.jar:3.4.26]
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onError(MonoPeekTerminal.java:258) ~[reactor-core-3.4.26.jar:3.4.26]
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onError(FluxHide.java:142) ~[reactor-core-3.4.26.jar:3.4.26]
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:278) ~[reactor-core-3.4.26.jar:3.4.26]
	at reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:124) ~[reactor-core-3.4.26.jar:3.4.26]
	at reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:124) ~[reactor-core-3.4.26.jar:3.4.26]
	at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.onError(FluxTimeout.java:219) ~[reactor-core-3.4.26.jar:3.4.26]
	at reactor.core.publisher.MonoNext$NextSubscriber.onError(MonoNext.java:93) ~[reactor-core-3.4.26.jar:3.4.26]
	at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onError(FluxFilterFuseable.java:162) ~[reactor-core-3.4.26.jar:3.4.26]
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:865) ~[reactor-core-3.4.26.jar:3.4.26]
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:965) ~[reactor-core-3.4.26.jar:3.4.26]
	at reactor.core.publisher.FluxReplay$ReplaySubscriber.onError(FluxReplay.java:1360) ~[reactor-core-3.4.26.jar:3.4.26]
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:222) ~[reactor-core-3.4.26.jar:3.4.26]
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:106) ~[reactor-core-3.4.26.jar:3.4.26]
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:278) ~[reactor-core-3.4.26.jar:3.4.26]
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:231) ~[reactor-core-3.4.26.jar:3.4.26]
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203) ~[reactor-core-3.4.26.jar:3.4.26]
	at reactor.core.publisher.SinkEmptyMulticast$VoidInner.complete(SinkEmptyMulticast.java:272) ~[reactor-core-3.4.26.jar:3.4.26]
	at reactor.core.publisher.SinkEmptyMulticast.tryEmitEmpty(SinkEmptyMulticast.java:86) ~[reactor-core-3.4.26.jar:3.4.26]
	at reactor.core.publisher.SinkEmptySerialized.tryEmitEmpty(SinkEmptySerialized.java:46) ~[reactor-core-3.4.26.jar:3.4.26]
	at reactor.core.publisher.InternalEmptySink.emitEmpty(InternalEmptySink.java:26) ~[reactor-core-3.4.26.jar:3.4.26]
	at com.azure.core.amqp.implementation.ReactorConnection.lambda$closeConnectionWork$34(ReactorConnection.java:527) ~[azure-core-amqp-2.7.1.jar:2.7.1]
	at reactor.core.publisher.MonoRunnable.call(MonoRunnable.java:73) ~[reactor-core-3.4.26.jar:3.4.26]
	at reactor.core.publisher.MonoRunnable.call(MonoRunnable.java:32) ~[reactor-core-3.4.26.jar:3.4.26]
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:228) ~[reactor-core-3.4.26.jar:3.4.26]
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203) ~[reactor-core-3.4.26.jar:3.4.26]
	at reactor.core.publisher.SinkEmptyMulticast$VoidInner.complete(SinkEmptyMulticast.java:272) ~[reactor-core-3.4.26.jar:3.4.26]
	at reactor.core.publisher.SinkEmptyMulticast.tryEmitEmpty(SinkEmptyMulticast.java:86) ~[reactor-core-3.4.26.jar:3.4.26]
	at reactor.core.publisher.SinkEmptySerialized.tryEmitEmpty(SinkEmptySerialized.java:46) ~[reactor-core-3.4.26.jar:3.4.26]
	at reactor.core.publisher.InternalEmptySink.emitEmpty(InternalEmptySink.java:26) ~[reactor-core-3.4.26.jar:3.4.26]
	at com.azure.core.amqp.implementation.ReactorExecutor.close(ReactorExecutor.java:188) ~[azure-core-amqp-2.7.1.jar:2.7.1]
	at com.azure.core.amqp.implementation.ReactorExecutor.lambda$scheduleCompletePendingTasks$1(ReactorExecutor.java:173) ~[azure-core-amqp-2.7.1.jar:2.7.1]
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) ~[reactor-core-3.4.26.jar:3.4.26]
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) ~[reactor-core-3.4.26.jar:3.4.26]
	at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) ~[na:na]
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na]
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
	at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Caused by: reactor.core.Exceptions$RetryExhaustedException: Retries exhausted: 3/3
	at reactor.core.Exceptions.retryExhausted(Exceptions.java:294) ~[reactor-core-3.4.26.jar:3.4.26]

Spring actuator configurations for readiness/liveness:

management.endpoint.health.group.readiness.include=*
management.endpoint.health.group.readiness.show-details=always
management.endpoint.health.group.liveness.include=ping
management.endpoint.health.group.liveness.show-details=never

readiness.show-details=always is only to debug the application.

Because the readiness probe is DOWN, K8s stops sending traffic to the POD.

Issue

After the connection is restored, the readiness probe remains down and does not recover. In this state, even though a successful request is technically possible, it never occurs because Kubernetes has halted traffic to the affected container.

When reproducing this issue locally, if I send a request to the service, it establishes a connection and sends the message to the service bus, causing the Health Endpoint status to return to UP. However, the situation differs in Kubernetes, where the traffic is blocked to the container, leaving it in a persistent state of unavailability.

I did find this class in the azure-binder class

image
The health instrumentation for the topic is always down until there is a successful delivery which is not possible in K8s.

Is there any other configuration you would suggest to tackle this scenario?

Setup

The only way I could reproduce this situation locally is by running a simple app when the connection with the service bus is stable. I make a few successful requests and then intentionally disable the internet connection on my laptop. When attempting to make another request, it throws a retry exception, causing the readiness probe to fail (and the application to become unavailable if it was in K8s env). Even after restoring the internet connection, the app's readiness probe never recovers.

Simple app - https://github.com/dhananjay12/learn-azure/tree/main/scs-service-bus
NOTE - For connection we are using service principal way to connect to the service-bus topic.

Metadata

Metadata

Assignees

Labels

ClientThis issue points to a problem in the data-plane of the library.azure-springAll azure-spring related issuesazure-spring-servicebusSpring service bus related issues.customer-reportedIssues that are reported by GitHub users external to the Azure organization.needs-team-attentionWorkflow: This issue needs attention from Azure service team or SDK teamquestionThe issue doesn't require a change to the product in order to be resolved. Most issues start as that

Type

No type

Projects

Status

Todo

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions