Skip to content

Try timeout on reading messages #439

@cristibozga

Description

@cristibozga

We are migration from the old servicebus sdk to the new one and after the first deployment we had several errors.
One of problems is:

We have in our logs a lot of timeout errors when reading messages from session or non)session queues. $
Try timeout is configured to 3 seconds:

    private static AmqpRetryOptions retryOptions(QueueListenerProperties properties) {
        AmqpRetryOptions o = new AmqpRetryOptions();
        o.setTryTimeout(Duration.ofMillis(properties.getTimeout()));
        o.setMode(AmqpRetryMode.FIXED);
        return o;
    }

The error we receive in the logs is:

com.azure.core.amqp.exception.AmqpException: The operation did not complete within the allotted timeout of 00:00:02. The time allotted to this operation may have been a portion of a longer timeout. For more information on exception types and proper exception handling, please refer to https://aka.ms/ServiceBusExceptions . TrackingId:0011cc1d-1633-4861-84a6-439cbab0728f_B59, SystemTracker:gi::G58:-1662651970:session-_1eff35_1735548817227, bi::in-connection1401(G58-185004108)::session1505::link199434916, Timestamp:2024-12-30T08:53:39 TrackingId:efe74dafc293404494ea059f012ebfa9_G58, SystemTracker:gateway10, Timestamp:2024-12-30T08:53:39, errorContext[NAMESPACE: sbus-rnd-dev-main-qzlcup.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: slot_booking_location_event_listener_queue, REFERENCE_ID: session-_1eff35_1735548817227, LINK_CREDIT: 0]
at com.azure.core.amqp.implementation.ExceptionUtil.toException(ExceptionUtil.java:90)
at com.azure.core.amqp.implementation.handler.LinkHandler.handleRemoteLinkClosed(LinkHandler.java:120)
at com.azure.core.amqp.implementation.handler.LinkHandler.onLinkRemoteClose(LinkHandler.java:63)
at com.azure.core.amqp.implementation.handler.ReceiveLinkHandler2.onLinkRemoteClose(ReceiveLinkHandler2.java:176)
at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:176)
at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324)
at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291)

(As a remark, in the logs we see the timeout as being 2 seconds, when it was set to 3 seconds. I remarked that the logged value is always the configured value -1 second. Why? )
Usually, when there are no messages to read we get this kind of logs:

INFO  com.azure.messaging.servicebus.SessionsMessagePump$RollingSessionReceiver  - {"az.sdk.message":"Did not a receive message within timeout.","namespace":"dummy-standard.servicebus.windows.net","entityPath":"{queueName}","roller-id":1,"sessionId":"{sessionId}","linkName":"session-_1de54b_1735548588851","timeout":"{timeout}"}
 WARN  com.azure.core.amqp.implementation.MessageFlux  - {"az.sdk.message":"Receiver emitted terminal completion.","messageFlux":"mf_678781_1735548588203","connectionId":"MF_cbecdf_1735548588231","linkName":"session-_1de54b_1735548588851","entityPath":"{queue}"}
WARN  com.azure.core.amqp.implementation.MessageFlux  - {"az.sdk.message":"Current mediator reached terminal completion-state (retriable:true).","messageFlux":"mf_678781_1735548588203","connectionId":"MF_cbecdf_1735548588231","linkName":"session-_1de54b_1735548588851","entityPath":"{queue}","retryAfter":1000}

What I want to achieve is to have (especially for the sessions) the thread waiting max 2 seconds to get a new message from a queue.
With the old sdk, we used the 'messageWaitDuration' = 1s for session queues and 3 seconds for non session queues. And it worked pretty well.
How can we do a good setup of the queue readers so that we do not block on sessiosn without messages and allow readers to check sessions that receive messages?

The code to create message processors is:

    private ServiceBusProcessorClient createQueueConsumer(String queuePath, QueueListenerProperties queueListenerProperties) {

        ServiceBusClientBuilder.ServiceBusProcessorClientBuilder builder = new ServiceBusClientBuilder()
                .connectionString(serviceBusProperties.getConnectionString())
                .retryOptions(retryOptions(queueListenerProperties))
                .processor()
                .maxConcurrentCalls(queueListenerProperties.getMaxConcurrentCalls())
                .queueName(queuePath);
        builder.processMessage(messageHandler);
        builder.processError(errorHandler);
        return builder.buildProcessorClient();
    }

    private ServiceBusProcessorClient createSessionQueueConsumer(String queuePath, SessionQueueListenerProperties queueListenerProperties) {

        ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder builder = new ServiceBusClientBuilder().
                connectionString(serviceBusProperties.getConnectionString()).
                retryOptions(retryOptions(queueListenerProperties)).
                sessionProcessor().
                maxConcurrentSessions(queueListenerProperties.getMaxConcurrentCalls()).
                maxConcurrentCalls(queueListenerProperties.getMaxConcurrentCallsPerSession()).
                queueName(queuePath);
        builder.processMessage(messageHandler);
        builder.processError(errorHandler);
        return builder.buildProcessorClient();
    }

We allow one single consumer for one session and 10 parallel sessions (by default).

NOte: I uopdated the timeout to 9 sec and I still have a couple of timeout errors. It seems all are on session queues.

SDK version 7.17.7
java version: 17

Thank you,
Cristi

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions