@@ -356,7 +356,7 @@ public CompletableFuture<IMessage> receiveAsync() {
356356
357357 @ Override
358358 public CompletableFuture <IMessage > receiveAsync (Duration serverWaitTime ) {
359- CompletableFuture < IMessage > receiveFuture = this .internalReceiver .receiveAsync (1 , serverWaitTime ).thenApplyAsync (c ->
359+ return this .internalReceiver .receiveAsync (1 , serverWaitTime ).thenApplyAsync (c ->
360360 {
361361 if (c == null )
362362 return null ;
@@ -365,8 +365,6 @@ else if (c.isEmpty())
365365 else
366366 return MessageConverter .convertAmqpMessageToBrokeredMessage (c .toArray (new MessageWithDeliveryTag [0 ])[0 ]);
367367 });
368-
369- return this .filterLockExpiredMessage (receiveFuture , serverWaitTime );
370368 }
371369
372370 @ Override
@@ -376,7 +374,7 @@ public CompletableFuture<Collection<IMessage>> receiveBatchAsync(int maxMessageC
376374
377375 @ Override
378376 public CompletableFuture <Collection <IMessage >> receiveBatchAsync (int maxMessageCount , Duration serverWaitTime ) {
379- CompletableFuture < Collection < IMessage >> receiveFuture = this .internalReceiver .receiveAsync (maxMessageCount , serverWaitTime ).thenApplyAsync (c ->
377+ return this .internalReceiver .receiveAsync (maxMessageCount , serverWaitTime ).thenApplyAsync (c ->
380378 {
381379 if (c == null )
382380 return null ;
@@ -385,8 +383,7 @@ else if (c.isEmpty())
385383 else
386384 return convertAmqpMessagesWithDeliveryTagsToBrokeredMessages (c );
387385 });
388-
389- return this .filterLockExpiredMessages (receiveFuture , maxMessageCount , serverWaitTime );
386+
390387 }
391388
392389 @ Override
@@ -503,116 +500,6 @@ private void ensurePeekLockReceiveMode() {
503500 throw new UnsupportedOperationException ("Operations Complete/Abandon/DeadLetter/Defer cannot be called on a receiver opened in ReceiveAndDelete mode." );
504501 }
505502 }
506-
507- private boolean isMessageLockExpired (IMessage msg )
508- {
509- return msg .getLockedUntilUtc ().isBefore (Instant .now ());
510- }
511-
512- private CompletableFuture <IMessage > filterLockExpiredMessage (CompletableFuture <IMessage > receivedFuture , Duration serverWaitTime )
513- {
514- if (this .receiveMode == ReceiveMode .RECEIVEANDDELETE )
515- {
516- return receivedFuture ;
517- }
518- else
519- {
520- Instant startTime = Instant .now ();
521- return receivedFuture .thenCompose ((msg ) -> {
522- if (msg == null )
523- {
524- return receivedFuture ;
525- }
526- else
527- {
528- if (isMessageLockExpired (msg ))
529- {
530- // Message lock already expired. Receive another message
531- TRACE_LOGGER .warn ("Lock of the prefetched message with sequence number '{}' and id '{}' from '{}' already expired" , msg .getSequenceNumber (), msg .getMessageId (), this .getEntityPath ());
532- Duration remainingWaitTime = serverWaitTime .minus (Duration .between (startTime , Instant .now ()));
533- if (remainingWaitTime .isNegative () || remainingWaitTime .isZero ())
534- {
535- return CompletableFuture .completedFuture (null );
536- }
537- else
538- {
539- TRACE_LOGGER .debug ("Ignored the lock exipred message and receiving again from '{}'" , this .getEntityPath ());
540- return this .receiveAsync (remainingWaitTime );
541- }
542- }
543- else
544- {
545- return CompletableFuture .completedFuture (msg );
546- }
547- }
548- });
549- }
550- }
551-
552- private CompletableFuture <Collection <IMessage >> filterLockExpiredMessages (CompletableFuture <Collection <IMessage >> receivedFuture , int maxMessageCount , Duration serverWaitTime )
553- {
554- if (this .receiveMode == ReceiveMode .RECEIVEANDDELETE )
555- {
556- return receivedFuture ;
557- }
558- else
559- {
560- Instant startTime = Instant .now ();
561- return receivedFuture .thenCompose ((messages ) -> {
562- if (messages == null || messages .size () == 0 )
563- {
564- return receivedFuture ;
565- }
566- else
567- {
568- boolean areMessagesRemoved = false ;
569- Iterator <IMessage > msgIterator = messages .iterator ();
570- while (msgIterator .hasNext ())
571- {
572- IMessage msg = msgIterator .next ();
573- if (isMessageLockExpired (msg ))
574- {
575- // Message lock already expired. remove it
576- TRACE_LOGGER .warn ("Lock of the prefetched message with sequence number '{}' and id '{}' from '{}' already expired. Removing it from the list of messages returned to the caller." , msg .getSequenceNumber (), msg .getMessageId (), this .getEntityPath ());
577- msgIterator .remove ();
578- areMessagesRemoved = true ;
579- }
580- else
581- {
582- // Break the loop. As next messages in the list are received from the entity after current message and are definitely not expired. No need to check them
583- break ;
584- }
585- }
586-
587- if (areMessagesRemoved )
588- {
589- if (messages .size () > 0 )
590- {
591- // There are some messages still in the list.. Just return them to the caller
592- return CompletableFuture .completedFuture (messages );
593- }
594- else
595- {
596- Duration remainingWaitTime = serverWaitTime .minus (Duration .between (startTime , Instant .now ()));
597- if (remainingWaitTime .isNegative () || remainingWaitTime .isZero ())
598- {
599- return CompletableFuture .completedFuture (null );
600- }
601- else
602- {
603- TRACE_LOGGER .debug ("All messages in the received list are lock expired. So receiving again from '{}'" , this .getEntityPath ());
604- return this .receiveBatchAsync (maxMessageCount , remainingWaitTime );
605- }
606- }
607- }
608- else
609- {
610- return receivedFuture ;
611- }
612- }
613- });
614- }
615- }
616503
617504 private CompletableFuture <Boolean > checkIfValidRequestResponseLockTokenAsync (UUID lockToken ) {
618505 CompletableFuture <Boolean > future = new CompletableFuture <Boolean >();
0 commit comments