@@ -160,16 +160,9 @@ private void receiveAndPumpMessage() {
160160 }
161161 } else {
162162 // Abandon message
163+ TRACE_LOGGER .debug ("Abandoning message with sequence number '{}'" , message .getSequenceNumber ());
163164 dispositionPhase = ExceptionPhase .ABANDON ;
164- if (this .messageHandlerOptions .isAutoComplete ())
165- {
166- TRACE_LOGGER .debug ("Abandoning message with sequence number '{}'" , message .getSequenceNumber ());
167- updateDispositionFuture = this .innerReceiver .abandonAsync (message .getLockToken ());
168- }
169- else
170- {
171- updateDispositionFuture = CompletableFuture .completedFuture (null );
172- }
165+ updateDispositionFuture = this .innerReceiver .abandonAsync (message .getLockToken ());
173166 }
174167
175168 updateDispositionFuture .handleAsync ((u , updateDispositionEx ) -> {
@@ -299,16 +292,9 @@ private void receiveFromSessionAndPumpMessage(SessionTracker sessionTracker) {
299292 }
300293 } else {
301294 // Abandon message
295+ TRACE_LOGGER .debug ("Abandoning message with sequence number '{}'" , message .getSequenceNumber ());
302296 dispositionPhase = ExceptionPhase .ABANDON ;
303- if (this .sessionHandlerOptions .isAutoComplete ())
304- {
305- TRACE_LOGGER .debug ("Abandoning message with sequence number '{}'" , message .getSequenceNumber ());
306- updateDispositionFuture = session .abandonAsync (message .getLockToken ());
307- }
308- else
309- {
310- updateDispositionFuture = CompletableFuture .completedFuture (null );
311- }
297+ updateDispositionFuture = session .abandonAsync (message .getLockToken ());
312298 }
313299
314300 updateDispositionFuture .handleAsync ((u , updateDispositionEx ) -> {
@@ -466,16 +452,17 @@ public void cancelLoop() {
466452 }
467453 }
468454
469- protected static Duration getNextRenewInterval (Instant lockedUntilUtc , String identifier ) {
455+ protected static Duration getNextRenewInterval (Instant lockedUntilUtc ) {
470456 Duration remainingTime = Duration .between (Instant .now (), lockedUntilUtc );
471457 if (remainingTime .isNegative ()) {
458+
472459 // Lock likely expired. May be there is clock skew. Assume some minimum time
473460 remainingTime = MessageAndSessionPump .MINIMUM_MESSAGE_LOCK_VALIDITY ;
474- TRACE_LOGGER .warn ("Lock of '{}' already expired. May be there is clock skew. Still trying to renew lock" , identifier );
461+ TRACE_LOGGER .warn ("Lock already expired. May be there is clock skew. Still trying to renew lock" );
475462 }
476463
477464 Duration buffer = remainingTime .dividedBy (2 ).compareTo (MAXIMUM_RENEW_LOCK_BUFFER ) > 0 ? MAXIMUM_RENEW_LOCK_BUFFER : remainingTime .dividedBy (2 );
478- TRACE_LOGGER .debug ("Lock of '{}' is valid for '{}'. It will be renewed '{}' before it expires." , identifier , remainingTime , buffer );
465+ TRACE_LOGGER .debug ("Lock is valid for '{}'. It will be renewed '{}' before it expires." , remainingTime , buffer );
479466 return remainingTime .minus (buffer );
480467 }
481468 }
@@ -485,7 +472,6 @@ private static class MessgeRenewLockLoop extends RenewLockLoop {
485472 private MessageAndSessionPump messageAndSessionPump ;
486473 private IMessage message ;
487474 private Instant stopRenewalAt ;
488- private String messageIdentifier ;
489475 ScheduledFuture <?> timerFuture ;
490476
491477 MessgeRenewLockLoop (IMessageReceiver innerReceiver , MessageAndSessionPump messageAndSessionPump , IMessage message , Instant stopRenewalAt ) {
@@ -494,7 +480,6 @@ private static class MessgeRenewLockLoop extends RenewLockLoop {
494480 this .messageAndSessionPump = messageAndSessionPump ;
495481 this .message = message ;
496482 this .stopRenewalAt = stopRenewalAt ;
497- this .messageIdentifier = String .format ("message with locktoken : %s, sequence number : %s" , this .message .getLockToken (), this .message .getSequenceNumber ());
498483 }
499484
500485 @ Override
@@ -508,21 +493,22 @@ protected void loop() {
508493 Duration renewInterval = this .getNextRenewInterval ();
509494 if (renewInterval != null && !renewInterval .isNegative ()) {
510495 this .timerFuture = Timer .schedule (() -> {
511- TRACE_LOGGER .debug ("Renewing lock on '{}'" , this .messageIdentifier );
496+ TRACE_LOGGER .debug ("Renewing lock on message with sequence number '{}'" , this .message . getSequenceNumber () );
512497 this .innerReceiver .renewMessageLockAsync (message ).handleAsync ((v , renewLockEx ) ->
513498 {
514499 if (renewLockEx != null ) {
515500 renewLockEx = ExceptionUtil .extractAsyncCompletionCause (renewLockEx );
516- TRACE_LOGGER .error ("Renewing lock on '{}' failed" , this .messageIdentifier , renewLockEx );
501+ TRACE_LOGGER .error ("Renewing lock on message with sequence number '{}' failed" , this .message . getSequenceNumber () , renewLockEx );
517502 this .messageAndSessionPump .notifyExceptionToMessageHandler (renewLockEx , ExceptionPhase .RENEWMESSAGELOCK );
518503 if (!(renewLockEx instanceof MessageLockLostException || renewLockEx instanceof OperationCancelledException )) {
519504 this .loop ();
520505 }
521506 } else {
522- TRACE_LOGGER .debug ("Renewed lock on '{}'" , this .messageIdentifier );
507+ TRACE_LOGGER .debug ("Renewed lock on message with sequence number '{}'" , this .message . getSequenceNumber () );
523508 this .loop ();
524509 }
525510
511+
526512 return null ;
527513 });
528514 }, renewInterval , TimerType .OneTimeRun );
@@ -532,7 +518,7 @@ protected void loop() {
532518
533519 private Duration getNextRenewInterval () {
534520 if (this .message .getLockedUntilUtc ().isBefore (stopRenewalAt )) {
535- return RenewLockLoop .getNextRenewInterval (this .message .getLockedUntilUtc (), this . messageIdentifier );
521+ return RenewLockLoop .getNextRenewInterval (this .message .getLockedUntilUtc ());
536522 } else {
537523 return null ;
538524 }
@@ -542,14 +528,12 @@ private Duration getNextRenewInterval() {
542528 private static class SessionRenewLockLoop extends RenewLockLoop {
543529 private IMessageSession session ;
544530 private MessageAndSessionPump messageAndSessionPump ;
545- private String sessionIdentifier ;
546531 ScheduledFuture <?> timerFuture ;
547532
548533 SessionRenewLockLoop (IMessageSession session , MessageAndSessionPump messageAndSessionPump ) {
549534 super ();
550535 this .session = session ;
551536 this .messageAndSessionPump = messageAndSessionPump ;
552- this .sessionIdentifier = String .format ("session with id:%s" , this .session .getSessionId ());
553537 }
554538
555539 @ Override
@@ -560,24 +544,25 @@ protected ScheduledFuture<?> getTimerFuture() {
560544 @ Override
561545 protected void loop () {
562546 if (!this .isCancelled ()) {
563- Duration renewInterval = RenewLockLoop .getNextRenewInterval (this .session .getLockedUntilUtc (), this . sessionIdentifier );
547+ Duration renewInterval = RenewLockLoop .getNextRenewInterval (this .session .getLockedUntilUtc ());
564548 if (renewInterval != null && !renewInterval .isNegative ()) {
565549 this .timerFuture = Timer .schedule (() -> {
566- TRACE_LOGGER .debug ("Renewing lock on '{}'" , this .sessionIdentifier );
550+ TRACE_LOGGER .debug ("Renewing lock on session '{}'" , this .session . getSessionId () );
567551 this .session .renewSessionLockAsync ().handleAsync ((v , renewLockEx ) ->
568552 {
569553 if (renewLockEx != null ) {
570554 renewLockEx = ExceptionUtil .extractAsyncCompletionCause (renewLockEx );
571- TRACE_LOGGER .error ("Renewing lock on '{}' failed" , this .sessionIdentifier , renewLockEx );
555+ TRACE_LOGGER .error ("Renewing lock on session '{}' failed" , this .session . getSessionId () , renewLockEx );
572556 this .messageAndSessionPump .notifyExceptionToSessionHandler (renewLockEx , ExceptionPhase .RENEWSESSIONLOCK );
573557 if (!(renewLockEx instanceof SessionLockLostException || renewLockEx instanceof OperationCancelledException )) {
574558 this .loop ();
575559 }
576560 } else {
577- TRACE_LOGGER .debug ("Renewed lock on '{}'" , this .sessionIdentifier );
561+ TRACE_LOGGER .debug ("Renewed lock on session '{}'" , this .session . getSessionId () );
578562 this .loop ();
579563 }
580564
565+
581566 return null ;
582567 });
583568 }, renewInterval , TimerType .OneTimeRun );
0 commit comments