@@ -160,9 +160,16 @@ private void receiveAndPumpMessage() {
160160 }
161161 } else {
162162 // Abandon message
163- TRACE_LOGGER .debug ("Abandoning message with sequence number '{}'" , message .getSequenceNumber ());
164163 dispositionPhase = ExceptionPhase .ABANDON ;
165- updateDispositionFuture = this .innerReceiver .abandonAsync (message .getLockToken ());
164+ if (this .messageHandlerOptions .isAutoComplete ())
165+ {
166+ TRACE_LOGGER .debug ("Abandoning message with sequence number '{}'" , message .getSequenceNumber ());
167+ updateDispositionFuture = this .innerReceiver .abandonAsync (message .getLockToken ());
168+ }
169+ else
170+ {
171+ updateDispositionFuture = CompletableFuture .completedFuture (null );
172+ }
166173 }
167174
168175 updateDispositionFuture .handleAsync ((u , updateDispositionEx ) -> {
@@ -292,9 +299,16 @@ private void receiveFromSessionAndPumpMessage(SessionTracker sessionTracker) {
292299 }
293300 } else {
294301 // Abandon message
295- TRACE_LOGGER .debug ("Abandoning message with sequence number '{}'" , message .getSequenceNumber ());
296302 dispositionPhase = ExceptionPhase .ABANDON ;
297- updateDispositionFuture = session .abandonAsync (message .getLockToken ());
303+ if (this .sessionHandlerOptions .isAutoComplete ())
304+ {
305+ TRACE_LOGGER .debug ("Abandoning message with sequence number '{}'" , message .getSequenceNumber ());
306+ updateDispositionFuture = session .abandonAsync (message .getLockToken ());
307+ }
308+ else
309+ {
310+ updateDispositionFuture = CompletableFuture .completedFuture (null );
311+ }
298312 }
299313
300314 updateDispositionFuture .handleAsync ((u , updateDispositionEx ) -> {
@@ -452,17 +466,16 @@ public void cancelLoop() {
452466 }
453467 }
454468
455- protected static Duration getNextRenewInterval (Instant lockedUntilUtc ) {
469+ protected static Duration getNextRenewInterval (Instant lockedUntilUtc , String identifier ) {
456470 Duration remainingTime = Duration .between (Instant .now (), lockedUntilUtc );
457471 if (remainingTime .isNegative ()) {
458-
459472 // Lock likely expired. May be there is clock skew. Assume some minimum time
460473 remainingTime = MessageAndSessionPump .MINIMUM_MESSAGE_LOCK_VALIDITY ;
461- TRACE_LOGGER .warn ("Lock already expired. May be there is clock skew. Still trying to renew lock" );
474+ TRACE_LOGGER .warn ("Lock of '{}' already expired. May be there is clock skew. Still trying to renew lock" , identifier );
462475 }
463476
464477 Duration buffer = remainingTime .dividedBy (2 ).compareTo (MAXIMUM_RENEW_LOCK_BUFFER ) > 0 ? MAXIMUM_RENEW_LOCK_BUFFER : remainingTime .dividedBy (2 );
465- TRACE_LOGGER .debug ("Lock is valid for '{}'. It will be renewed '{}' before it expires." , remainingTime , buffer );
478+ TRACE_LOGGER .debug ("Lock of '{}' is valid for '{}'. It will be renewed '{}' before it expires." , identifier , remainingTime , buffer );
466479 return remainingTime .minus (buffer );
467480 }
468481 }
@@ -472,6 +485,7 @@ private static class MessgeRenewLockLoop extends RenewLockLoop {
472485 private MessageAndSessionPump messageAndSessionPump ;
473486 private IMessage message ;
474487 private Instant stopRenewalAt ;
488+ private String messageIdentifier ;
475489 ScheduledFuture <?> timerFuture ;
476490
477491 MessgeRenewLockLoop (IMessageReceiver innerReceiver , MessageAndSessionPump messageAndSessionPump , IMessage message , Instant stopRenewalAt ) {
@@ -480,6 +494,7 @@ private static class MessgeRenewLockLoop extends RenewLockLoop {
480494 this .messageAndSessionPump = messageAndSessionPump ;
481495 this .message = message ;
482496 this .stopRenewalAt = stopRenewalAt ;
497+ this .messageIdentifier = String .format ("message with locktoken : %s, sequence number : %s" , this .message .getLockToken (), this .message .getSequenceNumber ());
483498 }
484499
485500 @ Override
@@ -493,22 +508,21 @@ protected void loop() {
493508 Duration renewInterval = this .getNextRenewInterval ();
494509 if (renewInterval != null && !renewInterval .isNegative ()) {
495510 this .timerFuture = Timer .schedule (() -> {
496- TRACE_LOGGER .debug ("Renewing lock on message with sequence number '{}'" , this .message . getSequenceNumber () );
511+ TRACE_LOGGER .debug ("Renewing lock on '{}'" , this .messageIdentifier );
497512 this .innerReceiver .renewMessageLockAsync (message ).handleAsync ((v , renewLockEx ) ->
498513 {
499514 if (renewLockEx != null ) {
500515 renewLockEx = ExceptionUtil .extractAsyncCompletionCause (renewLockEx );
501- TRACE_LOGGER .error ("Renewing lock on message with sequence number '{}' failed" , this .message . getSequenceNumber () , renewLockEx );
516+ TRACE_LOGGER .error ("Renewing lock on '{}' failed" , this .messageIdentifier , renewLockEx );
502517 this .messageAndSessionPump .notifyExceptionToMessageHandler (renewLockEx , ExceptionPhase .RENEWMESSAGELOCK );
503518 if (!(renewLockEx instanceof MessageLockLostException || renewLockEx instanceof OperationCancelledException )) {
504519 this .loop ();
505520 }
506521 } else {
507- TRACE_LOGGER .debug ("Renewed lock on message with sequence number '{}'" , this .message . getSequenceNumber () );
522+ TRACE_LOGGER .debug ("Renewed lock on '{}'" , this .messageIdentifier );
508523 this .loop ();
509524 }
510525
511-
512526 return null ;
513527 });
514528 }, renewInterval , TimerType .OneTimeRun );
@@ -518,7 +532,7 @@ protected void loop() {
518532
519533 private Duration getNextRenewInterval () {
520534 if (this .message .getLockedUntilUtc ().isBefore (stopRenewalAt )) {
521- return RenewLockLoop .getNextRenewInterval (this .message .getLockedUntilUtc ());
535+ return RenewLockLoop .getNextRenewInterval (this .message .getLockedUntilUtc (), this . messageIdentifier );
522536 } else {
523537 return null ;
524538 }
@@ -528,12 +542,14 @@ private Duration getNextRenewInterval() {
528542 private static class SessionRenewLockLoop extends RenewLockLoop {
529543 private IMessageSession session ;
530544 private MessageAndSessionPump messageAndSessionPump ;
545+ private String sessionIdentifier ;
531546 ScheduledFuture <?> timerFuture ;
532547
533548 SessionRenewLockLoop (IMessageSession session , MessageAndSessionPump messageAndSessionPump ) {
534549 super ();
535550 this .session = session ;
536551 this .messageAndSessionPump = messageAndSessionPump ;
552+ this .sessionIdentifier = String .format ("session with id:%s" , this .session .getSessionId ());
537553 }
538554
539555 @ Override
@@ -544,25 +560,24 @@ protected ScheduledFuture<?> getTimerFuture() {
544560 @ Override
545561 protected void loop () {
546562 if (!this .isCancelled ()) {
547- Duration renewInterval = RenewLockLoop .getNextRenewInterval (this .session .getLockedUntilUtc ());
563+ Duration renewInterval = RenewLockLoop .getNextRenewInterval (this .session .getLockedUntilUtc (), this . sessionIdentifier );
548564 if (renewInterval != null && !renewInterval .isNegative ()) {
549565 this .timerFuture = Timer .schedule (() -> {
550- TRACE_LOGGER .debug ("Renewing lock on session '{}'" , this .session . getSessionId () );
566+ TRACE_LOGGER .debug ("Renewing lock on '{}'" , this .sessionIdentifier );
551567 this .session .renewSessionLockAsync ().handleAsync ((v , renewLockEx ) ->
552568 {
553569 if (renewLockEx != null ) {
554570 renewLockEx = ExceptionUtil .extractAsyncCompletionCause (renewLockEx );
555- TRACE_LOGGER .error ("Renewing lock on session '{}' failed" , this .session . getSessionId () , renewLockEx );
571+ TRACE_LOGGER .error ("Renewing lock on '{}' failed" , this .sessionIdentifier , renewLockEx );
556572 this .messageAndSessionPump .notifyExceptionToSessionHandler (renewLockEx , ExceptionPhase .RENEWSESSIONLOCK );
557573 if (!(renewLockEx instanceof SessionLockLostException || renewLockEx instanceof OperationCancelledException )) {
558574 this .loop ();
559575 }
560576 } else {
561- TRACE_LOGGER .debug ("Renewed lock on session '{}'" , this .session . getSessionId () );
577+ TRACE_LOGGER .debug ("Renewed lock on '{}'" , this .sessionIdentifier );
562578 this .loop ();
563579 }
564580
565-
566581 return null ;
567582 });
568583 }, renewInterval , TimerType .OneTimeRun );
0 commit comments