@@ -356,7 +356,7 @@ public CompletableFuture<IMessage> receiveAsync() {
356356
357357 @ Override
358358 public CompletableFuture <IMessage > receiveAsync (Duration serverWaitTime ) {
359- return this .internalReceiver .receiveAsync (1 , serverWaitTime ).thenApplyAsync (c ->
359+ CompletableFuture < IMessage > receiveFuture = this .internalReceiver .receiveAsync (1 , serverWaitTime ).thenApplyAsync (c ->
360360 {
361361 if (c == null )
362362 return null ;
@@ -365,6 +365,8 @@ else if (c.isEmpty())
365365 else
366366 return MessageConverter .convertAmqpMessageToBrokeredMessage (c .toArray (new MessageWithDeliveryTag [0 ])[0 ]);
367367 });
368+
369+ return this .filterLockExpiredMessage (receiveFuture , serverWaitTime );
368370 }
369371
370372 @ Override
@@ -374,7 +376,7 @@ public CompletableFuture<Collection<IMessage>> receiveBatchAsync(int maxMessageC
374376
375377 @ Override
376378 public CompletableFuture <Collection <IMessage >> receiveBatchAsync (int maxMessageCount , Duration serverWaitTime ) {
377- return this .internalReceiver .receiveAsync (maxMessageCount , serverWaitTime ).thenApplyAsync (c ->
379+ CompletableFuture < Collection < IMessage >> receiveFuture = this .internalReceiver .receiveAsync (maxMessageCount , serverWaitTime ).thenApplyAsync (c ->
378380 {
379381 if (c == null )
380382 return null ;
@@ -383,6 +385,8 @@ else if (c.isEmpty())
383385 else
384386 return convertAmqpMessagesWithDeliveryTagsToBrokeredMessages (c );
385387 });
388+
389+ return this .filterLockExpiredMessages (receiveFuture , maxMessageCount , serverWaitTime );
386390 }
387391
388392 @ Override
@@ -499,6 +503,116 @@ private void ensurePeekLockReceiveMode() {
499503 throw new UnsupportedOperationException ("Operations Complete/Abandon/DeadLetter/Defer cannot be called on a receiver opened in ReceiveAndDelete mode." );
500504 }
501505 }
506+
507+ private boolean isMessageLockExpired (IMessage msg )
508+ {
509+ return msg .getLockedUntilUtc ().isBefore (Instant .now ());
510+ }
511+
512+ private CompletableFuture <IMessage > filterLockExpiredMessage (CompletableFuture <IMessage > receivedFuture , Duration serverWaitTime )
513+ {
514+ if (this .receiveMode == ReceiveMode .RECEIVEANDDELETE )
515+ {
516+ return receivedFuture ;
517+ }
518+ else
519+ {
520+ Instant startTime = Instant .now ();
521+ return receivedFuture .thenCompose ((msg ) -> {
522+ if (msg == null )
523+ {
524+ return receivedFuture ;
525+ }
526+ else
527+ {
528+ if (isMessageLockExpired (msg ))
529+ {
530+ // Message lock already expired. Receive another message
531+ TRACE_LOGGER .warn ("Lock of the prefetched message with sequence number '{}' and id '{}' from '{}' already expired" , msg .getSequenceNumber (), msg .getMessageId (), this .getEntityPath ());
532+ Duration remainingWaitTime = serverWaitTime .minus (Duration .between (startTime , Instant .now ()));
533+ if (remainingWaitTime .isNegative () || remainingWaitTime .isZero ())
534+ {
535+ return CompletableFuture .completedFuture (null );
536+ }
537+ else
538+ {
539+ TRACE_LOGGER .debug ("Ignored the lock exipred message and receiving again from '{}'" , this .getEntityPath ());
540+ return this .receiveAsync (remainingWaitTime );
541+ }
542+ }
543+ else
544+ {
545+ return CompletableFuture .completedFuture (msg );
546+ }
547+ }
548+ });
549+ }
550+ }
551+
552+ private CompletableFuture <Collection <IMessage >> filterLockExpiredMessages (CompletableFuture <Collection <IMessage >> receivedFuture , int maxMessageCount , Duration serverWaitTime )
553+ {
554+ if (this .receiveMode == ReceiveMode .RECEIVEANDDELETE )
555+ {
556+ return receivedFuture ;
557+ }
558+ else
559+ {
560+ Instant startTime = Instant .now ();
561+ return receivedFuture .thenCompose ((messages ) -> {
562+ if (messages == null || messages .size () == 0 )
563+ {
564+ return receivedFuture ;
565+ }
566+ else
567+ {
568+ boolean areMessagesRemoved = false ;
569+ Iterator <IMessage > msgIterator = messages .iterator ();
570+ while (msgIterator .hasNext ())
571+ {
572+ IMessage msg = msgIterator .next ();
573+ if (isMessageLockExpired (msg ))
574+ {
575+ // Message lock already expired. remove it
576+ TRACE_LOGGER .warn ("Lock of the prefetched message with sequence number '{}' and id '{}' from '{}' already expired. Removing it from the list of messages returned to the caller." , msg .getSequenceNumber (), msg .getMessageId (), this .getEntityPath ());
577+ msgIterator .remove ();
578+ areMessagesRemoved = true ;
579+ }
580+ else
581+ {
582+ // Break the loop. As next messages in the list are received from the entity after current message and are definitely not expired. No need to check them
583+ break ;
584+ }
585+ }
586+
587+ if (areMessagesRemoved )
588+ {
589+ if (messages .size () > 0 )
590+ {
591+ // There are some messages still in the list.. Just return them to the caller
592+ return CompletableFuture .completedFuture (messages );
593+ }
594+ else
595+ {
596+ Duration remainingWaitTime = serverWaitTime .minus (Duration .between (startTime , Instant .now ()));
597+ if (remainingWaitTime .isNegative () || remainingWaitTime .isZero ())
598+ {
599+ return CompletableFuture .completedFuture (null );
600+ }
601+ else
602+ {
603+ TRACE_LOGGER .debug ("All messages in the received list are lock expired. So receiving again from '{}'" , this .getEntityPath ());
604+ return this .receiveBatchAsync (maxMessageCount , remainingWaitTime );
605+ }
606+ }
607+ }
608+ else
609+ {
610+ return receivedFuture ;
611+ }
612+ }
613+ });
614+ }
615+ }
502616
503617 private CompletableFuture <Boolean > checkIfValidRequestResponseLockTokenAsync (UUID lockToken ) {
504618 CompletableFuture <Boolean > future = new CompletableFuture <Boolean >();
0 commit comments