@@ -98,6 +98,8 @@ public class CoreMessageReceiver extends ClientEntity implements IAmqpReceiver,
9898 private Exception lastKnownLinkError ;
9999 private Instant lastKnownErrorReportedAt ;
100100 private final AtomicInteger creditToFlow ;
101+ private final AtomicInteger creditNeededtoServePendingReceives ;
102+ private final AtomicInteger currentPrefetechedMessagesCount ; // size() on concurrentlinkedqueue is o(n) operation
101103 private ScheduledFuture <?> sasTokenRenewTimerFuture ;
102104 private CompletableFuture <Void > requestResponseLinkCreationFuture ;
103105 private CompletableFuture <Void > receiveLinkReopenFuture ;
@@ -138,13 +140,15 @@ private CoreMessageReceiver(final MessagingFactory factory,
138140 this .lastKnownErrorReportedAt = Instant .now ();
139141 this .receiveLinkReopenFuture = null ;
140142 this .creditToFlow = new AtomicInteger ();
143+ this .creditNeededtoServePendingReceives = new AtomicInteger ();
144+ this .currentPrefetechedMessagesCount = new AtomicInteger ();
141145
142146 this .timedOutUpdateStateRequestsDaemon = new Runnable () {
143147 @ Override
144148 public void run () {
145149 try
146150 {
147- TRACE_LOGGER .debug ("Starting '{}' core message receiver's internal loop to complete timed out update state requests." , CoreMessageReceiver .this .receivePath );
151+ TRACE_LOGGER .trace ("Starting '{}' core message receiver's internal loop to complete timed out update state requests." , CoreMessageReceiver .this .receivePath );
148152 for (Map .Entry <String , UpdateStateWorkItem > entry : CoreMessageReceiver .this .pendingUpdateStateRequests .entrySet ())
149153 {
150154 Duration remainingTime = entry .getValue ().getTimeoutTracker ().remaining ();
@@ -160,7 +164,7 @@ public void run() {
160164 entry .getValue ().getWork ().completeExceptionally (exception );
161165 }
162166 }
163- TRACE_LOGGER .debug ("'{}' core message receiver's internal loop to complete timed out update state requests stopped." , CoreMessageReceiver .this .receivePath );
167+ TRACE_LOGGER .trace ("'{}' core message receiver's internal loop to complete timed out update state requests stopped." , CoreMessageReceiver .this .receivePath );
164168 }
165169 catch (Throwable e )
166170 {
@@ -175,7 +179,7 @@ public void run() {
175179 public void run () {
176180 try
177181 {
178- TRACE_LOGGER .debug ("Starting '{}' core message receiver's internal loop to return messages to waiting clients." , CoreMessageReceiver .this .receivePath );
182+ TRACE_LOGGER .trace ("Starting '{}' core message receiver's internal loop to return messages to waiting clients." , CoreMessageReceiver .this .receivePath );
179183 while (!CoreMessageReceiver .this .prefetchedMessages .isEmpty ())
180184 {
181185 ReceiveWorkItem currentReceive = CoreMessageReceiver .this .pendingReceives .poll ();
@@ -186,6 +190,7 @@ public void run() {
186190 TRACE_LOGGER .debug ("Returning the message received from '{}' to a pending receive request" , CoreMessageReceiver .this .receivePath );
187191 currentReceive .cancelTimeoutTask (false );
188192 List <MessageWithDeliveryTag > messages = CoreMessageReceiver .this .receiveCore (currentReceive .getMaxMessageCount ());
193+ CoreMessageReceiver .this .reduceCreditForCompletedReceiveRequest (currentReceive .getMaxMessageCount ());
189194 AsyncUtil .completeFuture (currentReceive .getWork (), messages );
190195 }
191196 }
@@ -194,7 +199,7 @@ public void run() {
194199 break ;
195200 }
196201 }
197- TRACE_LOGGER .debug ("'{}' core message receiver's internal loop to return messages to waiting clients stopped." , CoreMessageReceiver .this .receivePath );
202+ TRACE_LOGGER .trace ("'{}' core message receiver's internal loop to return messages to waiting clients stopped." , CoreMessageReceiver .this .receivePath );
198203 }
199204 catch (Throwable e )
200205 {
@@ -427,6 +432,7 @@ private List<MessageWithDeliveryTag> receiveCore(int messageCount)
427432 int returnedMessageCount = 0 ;
428433 while (currentMessage != null )
429434 {
435+ this .currentPrefetechedMessagesCount .decrementAndGet ();
430436 if (returnMessages == null )
431437 {
432438 returnMessages = new LinkedList <MessageWithDeliveryTag >();
@@ -518,6 +524,7 @@ public CompletableFuture<Collection<MessageWithDeliveryTag>> receiveAsync(final
518524 TRACE_LOGGER .debug ("Receiving maximum of '{}' messages from '{}'" , maxMessageCount , this .receivePath );
519525 CompletableFuture <Collection <MessageWithDeliveryTag >> onReceive = new CompletableFuture <Collection <MessageWithDeliveryTag >>();
520526 final ReceiveWorkItem receiveWorkItem = new ReceiveWorkItem (onReceive , timeout , maxMessageCount );
527+ this .creditNeededtoServePendingReceives .addAndGet (maxMessageCount );
521528 this .pendingReceives .add (receiveWorkItem );
522529 // ZERO timeout is special case in SBMP clients where the timeout is sent to the service along with request. It meant 'give me messages you already have, but don't wait'.
523530 // As we don't send timeout to service in AMQP, treating this as a special case and using a very short timeout
@@ -533,33 +540,7 @@ public void run()
533540 {
534541 if ( CoreMessageReceiver .this .pendingReceives .remove (receiveWorkItem ))
535542 {
536- // TODO: can we do it better?
537- // workaround to push the sendflow-performative to reactor
538- // this sets the receiveLink endpoint to modified state
539- // (and increment the unsentCredits in proton by 0)
540- // try
541- // {
542- // CoreMessageReceiver.this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler()
543- // {
544- // @Override
545- // public void onEvent()
546- // {
547- // //TODO: not working
548- // // Make credit 0, to stop further receiving on this link
549- // //MessageReceiver.this.receiveLink.flow(-1 * MessageReceiver.this.receiveLink.getCredit());
550- // CoreMessageReceiver.this.receiveLink.flow(0);
551- //
552- // // See if detach stops
553- //// MessageReceiver.this.receiveLink.detach();
554- //// MessageReceiver.this.receiveLink.close();
555- //// MessageReceiver.this.underlyingFactory.deregisterForConnectionError(MessageReceiver.this.receiveLink);
556- // }
557- // });
558- // }
559- // catch (IOException ignore)
560- // {
561- // }
562-
543+ CoreMessageReceiver .this .reduceCreditForCompletedReceiveRequest (receiveWorkItem .getMaxMessageCount ());
563544 TRACE_LOGGER .warn ("No messages received from '{}'. Pending receive request timed out. Returning null to the client." , CoreMessageReceiver .this .receivePath );
564545 receiveWorkItem .getWork ().complete (null );
565546 }
@@ -622,7 +603,7 @@ public void onOpenComplete(Exception exception)
622603
623604 this .underlyingFactory .getRetryPolicy ().resetRetryCount (this .underlyingFactory .getClientId ());
624605
625- this .sendFlow (this .prefetchCount - this .prefetchedMessages . size ());
606+ this .sendFlow (this .prefetchCount - this .currentPrefetechedMessagesCount . get ());
626607
627608 TRACE_LOGGER .debug ("receiverPath:{}, linkname:{}, updated-link-credit:{}, sentCredits:{}" ,
628609 this .receivePath , this .receiveLink .getName (), this .receiveLink .getCredit (), this .prefetchCount );
@@ -681,6 +662,8 @@ public void onReceiveComplete(Delivery delivery)
681662 receiveLink .advance ();
682663 }
683664
665+ // Accuracy of count is not that important. So not making those two operations atomic
666+ this .currentPrefetechedMessagesCount .incrementAndGet ();
684667 this .prefetchedMessages .add (new MessageWithDeliveryTag (message , delivery .getTag ()));
685668 }
686669 catch (Exception e )
@@ -796,6 +779,7 @@ public void onError(Exception exception)
796779 if (this .settleModePair .getSenderSettleMode () == SenderSettleMode .UNSETTLED )
797780 {
798781 this .prefetchedMessages .clear ();
782+ this .currentPrefetechedMessagesCount .set (0 );
799783 this .tagsToDeliveriesMap .clear ();
800784 }
801785
@@ -848,30 +832,45 @@ public void onError(Exception exception)
848832 }
849833 }
850834
835+ private void reduceCreditForCompletedReceiveRequest (int maxCreditCountOfReceiveRequest )
836+ {
837+ this .creditNeededtoServePendingReceives .updateAndGet ((c ) -> {
838+ int updatedCredit = c - maxCreditCountOfReceiveRequest ;
839+ return (updatedCredit > 0 ) ? updatedCredit : 0 ;
840+ });
841+ }
842+
851843 private void addCredit (ReceiveWorkItem receiveWorkItem )
852844 {
853- int currentTotalCreditToSend = this .creditToFlow .addAndGet (receiveWorkItem .getMaxMessageCount ());
854- if (currentTotalCreditToSend >= this .prefetchCount || currentTotalCreditToSend >= CREDIT_FLOW_BATCH_SIZE )
845+ // Timed out receive requests and batch receive requests completed with less than maxCount messages might have sent more credit
846+ // than consumed by the receiver resulting in excess credit at the service endpoint.
847+ int creditToFlowForWorkItem = this .creditNeededtoServePendingReceives .get () - (this .receiveLink .getCredit () + this .currentPrefetechedMessagesCount .get () + this .creditToFlow .get ()) + this .prefetchCount ;
848+ if (creditToFlowForWorkItem > 0 )
855849 {
856- try
857- {
858- this .underlyingFactory .scheduleOnReactorThread (new DispatchHandler ()
859- {
860- @ Override
861- public void onEvent ()
862- {
863- // Send credit accumulated so far to make it less chat-ty
864- int accumulatedCredit = CoreMessageReceiver .this .creditToFlow .getAndSet (0 );
865- sendFlow (accumulatedCredit );
866- }
867- });
868- }
869- catch (IOException ioException )
870- {
871- this .pendingReceives .remove (receiveWorkItem );
872- AsyncUtil .completeFutureExceptionally (receiveWorkItem .getWork (), generateDispatacherSchedulingFailedException ("completeMessage" , ioException ));
873- receiveWorkItem .cancelTimeoutTask (false );
874- }
850+ int currentTotalCreditToSend = this .creditToFlow .addAndGet (creditToFlowForWorkItem );
851+ if (currentTotalCreditToSend >= this .prefetchCount || currentTotalCreditToSend >= CREDIT_FLOW_BATCH_SIZE )
852+ {
853+ try
854+ {
855+ this .underlyingFactory .scheduleOnReactorThread (new DispatchHandler ()
856+ {
857+ @ Override
858+ public void onEvent ()
859+ {
860+ // Send credit accumulated so far to make it less chat-ty
861+ int accumulatedCredit = CoreMessageReceiver .this .creditToFlow .getAndSet (0 );
862+ sendFlow (accumulatedCredit );
863+ }
864+ });
865+ }
866+ catch (IOException ioException )
867+ {
868+ this .pendingReceives .remove (receiveWorkItem );
869+ this .reduceCreditForCompletedReceiveRequest (receiveWorkItem .getMaxMessageCount ());
870+ AsyncUtil .completeFutureExceptionally (receiveWorkItem .getWork (), generateDispatacherSchedulingFailedException ("completeMessage" , ioException ));
871+ receiveWorkItem .cancelTimeoutTask (false );
872+ }
873+ }
875874 }
876875 }
877876
@@ -958,7 +957,7 @@ public ErrorContext getContext()
958957 referenceId ,
959958 isLinkOpened ? this .prefetchCount : null ,
960959 isLinkOpened && this .receiveLink != null ? this .receiveLink .getCredit (): null ,
961- isLinkOpened && this .prefetchedMessages != null ? this . prefetchedMessages . size (): null );
960+ this .currentPrefetechedMessagesCount . get () );
962961
963962 return errorContext ;
964963 }
@@ -1214,6 +1213,8 @@ private void clearAllPendingWorkItems(Exception exception)
12141213 pendingRecivesIterator .remove ();
12151214
12161215 CompletableFuture <Collection <MessageWithDeliveryTag >> future = workItem .getWork ();
1216+ workItem .cancelTimeoutTask (false );
1217+ this .reduceCreditForCompletedReceiveRequest (workItem .getMaxMessageCount ());
12171218 if (isTransientException )
12181219 {
12191220 AsyncUtil .completeFuture (future , null );
@@ -1222,8 +1223,6 @@ private void clearAllPendingWorkItems(Exception exception)
12221223 {
12231224 ExceptionUtil .completeExceptionally (future , exception , this , true );
12241225 }
1225-
1226- workItem .cancelTimeoutTask (false );
12271226 }
12281227
12291228 for (Map .Entry <String , UpdateStateWorkItem > pendingUpdate : this .pendingUpdateStateRequests .entrySet ())
0 commit comments