@@ -32,6 +32,7 @@ class MessageAndSessionPump extends InitializableEntity implements IMessageAndSe
3232 private static final Duration MINIMUM_MESSAGE_LOCK_VALIDITY = Duration .ofSeconds (4 );
3333 private static final Duration MAXIMUM_RENEW_LOCK_BUFFER = Duration .ofSeconds (10 );
3434 private static final Duration SLEEP_DURATION_ON_ACCEPT_SESSION_EXCEPTION = Duration .ofMinutes (1 );
35+ private static final int UNSET_PREFETCH_COUNT = -1 ; // Means prefetch count not set
3536
3637 private final ConcurrentHashMap <String , IMessageSession > openSessions ;
3738 private final MessagingFactory factory ;
@@ -44,13 +45,15 @@ class MessageAndSessionPump extends InitializableEntity implements IMessageAndSe
4445 private ISessionHandler sessionHandler ;
4546 private MessageHandlerOptions messageHandlerOptions ;
4647 private SessionHandlerOptions sessionHandlerOptions ;
48+ private int prefetchCount ;
4749
4850 public MessageAndSessionPump (MessagingFactory factory , String entityPath , ReceiveMode receiveMode ) {
4951 super (StringUtil .getShortRandomString (), null );
5052 this .factory = factory ;
5153 this .entityPath = entityPath ;
5254 this .receiveMode = receiveMode ;
5355 this .openSessions = new ConcurrentHashMap <>();
56+ this .prefetchCount = UNSET_PREFETCH_COUNT ;
5457 }
5558
5659 @ Override
@@ -67,6 +70,10 @@ public void registerMessageHandler(IMessageHandler handler, MessageHandlerOption
6770
6871 this .innerReceiver = ClientFactory .createMessageReceiverFromEntityPath (this .factory , this .entityPath , this .receiveMode );
6972 TRACE_LOGGER .info ("Created MessageReceiver to entity '{}'" , this .entityPath );
73+ if (this .prefetchCount != UNSET_PREFETCH_COUNT )
74+ {
75+ this .innerReceiver .setPrefetchCount (this .prefetchCount );
76+ }
7077 for (int i = 0 ; i < handlerOptions .getMaxConcurrentCalls (); i ++) {
7178 this .receiveAndPumpMessage ();
7279 }
@@ -221,6 +228,14 @@ private void acceptSessionsAndPumpMessage() {
221228 } else {
222229 // Received a session.. Now pump messages..
223230 TRACE_LOGGER .debug ("Accepted a session '{}' from entity '{}'" , session .getSessionId (), this .entityPath );
231+ if (this .prefetchCount != UNSET_PREFETCH_COUNT )
232+ {
233+ try {
234+ session .setPrefetchCount (this .prefetchCount );
235+ } catch (ServiceBusException e ) {
236+ // Should not happen as long as reactor is running. So ignoring
237+ }
238+ }
224239 this .openSessions .put (session .getSessionId (), session );
225240 SessionRenewLockLoop sessionRenewLockLoop = new SessionRenewLockLoop (session , this );
226241 sessionRenewLockLoop .startLoop ();
@@ -713,13 +728,34 @@ private void notifyExceptionToMessageHandler(Throwable ex, ExceptionPhase phase)
713728
714729 @ Override
715730 public int getPrefetchCount () {
716- this .checkInnerReceiveCreated ();
717- return this .innerReceiver .getPrefetchCount ();
731+ return this .prefetchCount ;
718732 }
719733
720734 @ Override
721735 public void setPrefetchCount (int prefetchCount ) throws ServiceBusException {
722- this .checkInnerReceiveCreated ();
723- this .innerReceiver .setPrefetchCount (prefetchCount );
736+ if (prefetchCount < 0 )
737+ {
738+ throw new IllegalArgumentException ("Prefetch count cannot be negative." );
739+ }
740+
741+ this .prefetchCount = prefetchCount ;
742+ if (this .innerReceiver != null )
743+ {
744+ this .innerReceiver .setPrefetchCount (prefetchCount );
745+ }
746+
747+ // For accepted session receivers also
748+ IMessageSession [] currentAcceptedSessions = this .openSessions .values ().toArray (new IMessageSession [0 ]);
749+ for (IMessageSession session : currentAcceptedSessions )
750+ {
751+ try
752+ {
753+ session .setPrefetchCount (prefetchCount );
754+ }
755+ catch (IllegalStateException ise )
756+ {
757+ // Session might have been closed.. Ignore the exception as this is a best effort setter on already accepted sessions
758+ }
759+ }
724760 }
725761}
0 commit comments