Skip to content

Commit 6dfd93d

Browse files
yvgopalbinzywu
authored andcommitted
Fixing AuthorizationFailed exceptions coming in stress (#88)
* Fixing javadoc generation errors * Adding dependency to use SLF4J logging instead of java util logging. * More SLF4J migration * Changed all java util logging statements to slf4j. * Checking in the POM that goes with the release to maven reposisory. * Removed unused imports * Instrumented all code with SLF4J logging. * Fixing some log statements * Moving request-response link creation to message factory so we don't create multiple links to the same entity. * Updaing third party notice to include SLF4J license. * Updating distributed POM * Renaming setContent method to setBody. * Fixing a typo in log statement. * Handling the special case of 0 timeout for receives. If there are no prefetched messages, receive call returns immediately with already prefetched messages. * Correcting a log statement * Added javadoc comments for rules package. * Adding javadoc comments. Work in progress. * More javadoc comments added. * Fixing a minor bug in sender and receiver creation. Closes the messaging factory if sender or receiver creation fails. * Fixing the issue of not renewing CBS tokens. Also a concurrency fix. * Fixing a bug in session receiver to not repeatedly attempt accepting the same session on lock lost. * Added some java docs.. And changed default prefetch count, based on receive mode. * Fixing a thread unending wait bug in request-response link. * Fixing AuthorizationFailed exceptions that pop when the conncetion is recreated. * Minor tweaks
1 parent 7d3cde9 commit 6dfd93d

File tree

8 files changed

+427
-258
lines changed

8 files changed

+427
-258
lines changed

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ClientConstants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ private ClientConstants() { }
164164

165165
static final String SAS_TOKEN_TYPE = "servicebus.windows.net:sastoken";
166166
static final int DEFAULT_SAS_TOKEN_VALIDITY_IN_SECONDS = 20*60; // 20 minutes
167+
static final int DEFAULT_SAS_TOKEN_SEND_RETRY_INTERVAL_IN_SECONDS = 5;
167168
static final String SAS_TOKEN_AUDIENCE_FORMAT = "amqp://%s/%s";
168169

169170
private static String getPlatformInfo() {

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageReceiver.java

Lines changed: 240 additions & 175 deletions
Large diffs are not rendered by default.

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageSender.java

Lines changed: 94 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ public class CoreMessageSender extends ClientEntity implements IAmqpSender, IErr
5959
{
6060
private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(CoreMessageSender.class);
6161
private static final String SEND_TIMED_OUT = "Send operation timed out";
62+
private static final Duration LINK_REOPEN_TIMEOUT = Duration.ofMinutes(5); // service closes link long before this timeout expires
6263

6364
private final Object requestResonseLinkCreationLock = new Object();
6465
private final MessagingFactory underlyingFactory;
@@ -80,6 +81,7 @@ public class CoreMessageSender extends ClientEntity implements IAmqpSender, IErr
8081
private Instant lastKnownErrorReportedAt;
8182
private ScheduledFuture<?> sasTokenRenewTimerFuture;
8283
private CompletableFuture<Void> requestResponseLinkCreationFuture;
84+
private CompletableFuture<Void> sendLinkReopenFuture;
8385

8486
public static CompletableFuture<CoreMessageSender> create(
8587
final MessagingFactory factory,
@@ -91,7 +93,7 @@ public static CompletableFuture<CoreMessageSender> create(
9193
TimeoutTracker openLinkTracker = TimeoutTracker.create(factory.getOperationTimeout());
9294
msgSender.initializeLinkOpen(openLinkTracker);
9395

94-
msgSender.sendSASTokenAndSetRenewTimer().handleAsync((v, sasTokenEx) -> {
96+
msgSender.sendSASTokenAndSetRenewTimer(false).handleAsync((v, sasTokenEx) -> {
9597
if(sasTokenEx != null)
9698
{
9799
Throwable cause = ExceptionUtil.extractAsyncCompletionCause(sasTokenEx);
@@ -176,6 +178,7 @@ private CoreMessageSender(final MessagingFactory factory, final String sendLinkN
176178
this.linkCredit = 0;
177179

178180
this.linkClose = new CompletableFuture<Void>();
181+
this.sendLinkReopenFuture = null;
179182

180183
this.sendWork = new DispatchHandler()
181184
{
@@ -337,13 +340,19 @@ public void onOpenComplete(Exception completionException)
337340
this.lastKnownLinkError = null;
338341
this.retryPolicy.resetRetryCount(this.getClientId());
339342

343+
if(this.sendLinkReopenFuture != null && !this.sendLinkReopenFuture.isDone())
344+
{
345+
AsyncUtil.completeFuture(this.sendLinkReopenFuture, null);
346+
this.sendLinkReopenFuture = null;
347+
}
348+
340349
if (!this.linkFirstOpen.isDone())
341350
{
342351
TRACE_LOGGER.info("Opened send link to '{}'", this.sendPath);
343352
AsyncUtil.completeFuture(this.linkFirstOpen, this);
344353
}
345354
else
346-
{
355+
{
347356
synchronized (this.pendingSendLock)
348357
{
349358
if (!this.pendingSendsData.isEmpty())
@@ -370,14 +379,20 @@ public void onOpenComplete(Exception completionException)
370379
}
371380
}
372381
else
373-
{
382+
{
383+
TRACE_LOGGER.error("Opending send link to '{}' failed", this.sendPath, completionException);
384+
this.cancelSASTokenRenewTimer();
374385
if (!this.linkFirstOpen.isDone())
375-
{
376-
TRACE_LOGGER.error("Opending send link to '{}' failed", this.sendPath, completionException);
377-
this.setClosed();
378-
this.cancelSASTokenRenewTimer();
386+
{
387+
this.setClosed();
379388
ExceptionUtil.completeExceptionally(this.linkFirstOpen, completionException, this, true);
380389
}
390+
391+
if(this.sendLinkReopenFuture != null && !this.sendLinkReopenFuture.isDone())
392+
{
393+
AsyncUtil.completeFutureExceptionally(this.sendLinkReopenFuture, completionException);
394+
this.sendLinkReopenFuture = null;
395+
}
381396
}
382397
}
383398

@@ -449,23 +464,7 @@ public void onError(Exception completionException)
449464
if (nextRetryInterval != null)
450465
{
451466
TRACE_LOGGER.warn("Send link to '{}' closed. Will retry link creation after '{}'.", this.sendPath, nextRetryInterval);
452-
try
453-
{
454-
this.underlyingFactory.scheduleOnReactorThread((int) nextRetryInterval.toMillis(), new DispatchHandler()
455-
{
456-
@Override
457-
public void onEvent()
458-
{
459-
if (sendLink.getLocalState() == EndpointState.CLOSED || sendLink.getRemoteState() == EndpointState.CLOSED)
460-
{
461-
CoreMessageSender.this.recreateSendLink();
462-
}
463-
}
464-
});
465-
}
466-
catch (IOException ignore)
467-
{
468-
}
467+
Timer.schedule(() -> {CoreMessageSender.this.ensureLinkIsOpen();}, nextRetryInterval, TimerType.OneTimeRun);
469468
}
470469
}
471470
}
@@ -616,15 +615,16 @@ private void createSendLink()
616615
this.sendLink = sender;
617616
}
618617

619-
CompletableFuture<Void> sendSASTokenAndSetRenewTimer()
618+
CompletableFuture<Void> sendSASTokenAndSetRenewTimer(boolean retryOnFailure)
620619
{
621620
if(this.getIsClosingOrClosed())
622621
{
623622
return CompletableFuture.completedFuture(null);
624623
}
625624
else
626625
{
627-
CompletableFuture<ScheduledFuture<?>> sendTokenFuture = this.underlyingFactory.sendSASTokenAndSetRenewTimer(this.sasTokenAudienceURI, () -> this.sendSASTokenAndSetRenewTimer());
626+
this.cancelSASTokenRenewTimer();
627+
CompletableFuture<ScheduledFuture<?>> sendTokenFuture = this.underlyingFactory.sendSASTokenAndSetRenewTimer(this.sasTokenAudienceURI, retryOnFailure, () -> this.sendSASTokenAndSetRenewTimer(true));
628628
return sendTokenFuture.thenAccept((f) -> {this.sasTokenRenewTimerFuture = f; TRACE_LOGGER.debug("Sent SAS Token and set renew timer");});
629629
}
630630
}
@@ -696,26 +696,79 @@ public void onFlow(final int creditIssued)
696696
this.linkCredit = this.linkCredit + creditIssued;
697697
this.sendWork.onEvent();
698698
}
699-
700-
private void recreateSendLink()
701-
{
702-
TRACE_LOGGER.info("Recreating send link to '{}'", this.sendPath);
703-
this.createSendLink();
704-
this.retryPolicy.incrementRetryCount(CoreMessageSender.this.getClientId());
705-
}
699+
700+
private synchronized CompletableFuture<Void> ensureLinkIsOpen()
701+
{
702+
// Send SAS token before opening a link as connection might have been closed and reopened
703+
if (this.sendLink.getLocalState() == EndpointState.CLOSED || this.sendLink.getRemoteState() == EndpointState.CLOSED)
704+
{
705+
if(this.sendLinkReopenFuture == null)
706+
{
707+
TRACE_LOGGER.info("Recreating send link to '{}'", this.sendPath);
708+
this.retryPolicy.incrementRetryCount(CoreMessageSender.this.getClientId());
709+
this.sendLinkReopenFuture = new CompletableFuture<Void>();
710+
// Variable just to closed over by the scheduled runnable. The runnable should cancel only the closed over future, not the parent's instance variable which can change
711+
final CompletableFuture<Void> linkReopenFutureThatCanBeCancelled = this.sendLinkReopenFuture;
712+
Timer.schedule(
713+
() -> {
714+
if (!linkReopenFutureThatCanBeCancelled.isDone())
715+
{
716+
CoreMessageSender.this.cancelSASTokenRenewTimer();
717+
Exception operationTimedout = new TimeoutException(
718+
String.format(Locale.US, "%s operation on SendLink(%s) to path(%s) timed out at %s.", "Open", CoreMessageSender.this.sendLink.getName(), CoreMessageSender.this.sendPath, ZonedDateTime.now()));
719+
720+
TRACE_LOGGER.warn(operationTimedout.getMessage());
721+
linkReopenFutureThatCanBeCancelled.completeExceptionally(operationTimedout);
722+
}
723+
}
724+
, CoreMessageSender.LINK_REOPEN_TIMEOUT
725+
, TimerType.OneTimeRun);
726+
this.sendSASTokenAndSetRenewTimer(false).handleAsync((v, sendTokenEx) -> {
727+
if(sendTokenEx != null)
728+
{
729+
this.sendLinkReopenFuture.completeExceptionally(sendTokenEx);
730+
}
731+
else
732+
{
733+
try
734+
{
735+
this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler()
736+
{
737+
@Override
738+
public void onEvent()
739+
{
740+
CoreMessageSender.this.createSendLink();
741+
}
742+
});
743+
}
744+
catch (IOException ioEx)
745+
{
746+
this.sendLinkReopenFuture.completeExceptionally(ioEx);
747+
}
748+
}
749+
return null;
750+
});
751+
}
752+
753+
return this.sendLinkReopenFuture;
754+
}
755+
else
756+
{
757+
return CompletableFuture.completedFuture(null);
758+
}
759+
}
706760

707761
// actual send on the SenderLink should happen only in this method & should run on Reactor Thread
708762
private void processSendWork()
709763
{
710764
TRACE_LOGGER.debug("Processing pending sends to '{}'. Available link credit '{}'", this.sendPath, this.linkCredit);
711-
final Sender sendLinkCurrent = this.sendLink;
712-
713-
if (sendLinkCurrent.getLocalState() == EndpointState.CLOSED || sendLinkCurrent.getRemoteState() == EndpointState.CLOSED)
714-
{
715-
this.recreateSendLink();
716-
return;
717-
}
718-
765+
if(!this.ensureLinkIsOpen().isDone())
766+
{
767+
// Link recreation is pending
768+
return;
769+
}
770+
771+
final Sender sendLinkCurrent = this.sendLink;
719772
while (sendLinkCurrent != null
720773
&& sendLinkCurrent.getLocalState() == EndpointState.ACTIVE && sendLinkCurrent.getRemoteState() == EndpointState.ACTIVE
721774
&& this.linkCredit > 0)

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingFactory.java

Lines changed: 51 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -437,16 +437,16 @@ public void run()
437437
this.rctr.setTimeout(3141);
438438
this.rctr.start();
439439
boolean continuteProcessing = true;
440-
while(!Thread.interrupted() && continuteProcessing)
441-
{
442-
// If factory is closed, stop reactor too
443-
if(MessagingFactory.this.getIsClosed())
444-
{
445-
TRACE_LOGGER.info("Gracefully releasing reactor thread as messaging factory is closed");
446-
break;
447-
}
448-
continuteProcessing = this.rctr.process();
449-
}
440+
while(!Thread.interrupted() && continuteProcessing)
441+
{
442+
// If factory is closed, stop reactor too
443+
if(MessagingFactory.this.getIsClosed())
444+
{
445+
TRACE_LOGGER.info("Gracefully releasing reactor thread as messaging factory is closed");
446+
break;
447+
}
448+
continuteProcessing = this.rctr.process();
449+
}
450450
TRACE_LOGGER.info("Stopping reactor");
451451
this.rctr.stop();
452452
}
@@ -514,7 +514,7 @@ void scheduleOnReactorThread(final int delay, final DispatchHandler handler) thr
514514
this.getReactorScheduler().invoke(delay, handler);
515515
}
516516

517-
CompletableFuture<ScheduledFuture<?>> sendSASTokenAndSetRenewTimer(String sasTokenAudienceURI, Runnable validityRenewer)
517+
CompletableFuture<ScheduledFuture<?>> sendSASTokenAndSetRenewTimer(String sasTokenAudienceURI, boolean retryOnFailure, Runnable validityRenewer)
518518
{
519519
TRACE_LOGGER.debug("Sending CBS Token for {}", sasTokenAudienceURI);
520520
boolean isSasTokenGenerated = false;
@@ -538,22 +538,48 @@ CompletableFuture<ScheduledFuture<?>> sendSASTokenAndSetRenewTimer(String sasTok
538538
CompletableFuture<Void> sendTokenFuture = this.cbsLinkCreationFuture.thenComposeAsync((v) -> {
539539
return CommonRequestResponseOperations.sendCBSTokenAsync(this.cbsLink, Util.adjustServerTimeout(this.operationTimeout), finalSasToken, ClientConstants.SAS_TOKEN_TYPE, sasTokenAudienceURI);
540540
});
541-
return sendTokenFuture.thenApplyAsync((v) -> {
542-
TRACE_LOGGER.debug("Sent CBS Token for {}", sasTokenAudienceURI);
543-
if(finalIsSasTokenGenerated)
544-
{
545-
// It will eventually expire. Renew it
546-
int renewInterval = SASUtil.getCBSTokenRenewIntervalInSeconds(ClientConstants.DEFAULT_SAS_TOKEN_VALIDITY_IN_SECONDS);
547-
return Timer.schedule(validityRenewer, Duration.ofSeconds(renewInterval), TimerType.OneTimeRun);
548-
}
549-
else
550-
{
551-
// User provided signature. We can't renew it.
552-
return null;
553-
}
554-
});
541+
542+
543+
if(retryOnFailure)
544+
{
545+
return sendTokenFuture.handleAsync((v, sendTokenEx) -> {
546+
if(sendTokenEx == null)
547+
{
548+
return MessagingFactory.scheduleRenewTimer(finalIsSasTokenGenerated, sasTokenAudienceURI, validityRenewer);
549+
}
550+
else
551+
{
552+
TRACE_LOGGER.error("Sending CBS Token for {} failed.", sasTokenAudienceURI, sendTokenEx);
553+
TRACE_LOGGER.info("Will retry sending CBS Token for {} after {} seconds.", sasTokenAudienceURI, ClientConstants.DEFAULT_SAS_TOKEN_SEND_RETRY_INTERVAL_IN_SECONDS);
554+
return Timer.schedule(validityRenewer, Duration.ofSeconds(ClientConstants.DEFAULT_SAS_TOKEN_SEND_RETRY_INTERVAL_IN_SECONDS), TimerType.OneTimeRun);
555+
}
556+
});
557+
}
558+
else
559+
{
560+
// Let the exception of the sendToken state pass up to caller
561+
return sendTokenFuture.thenApply((v) -> {
562+
return MessagingFactory.scheduleRenewTimer(finalIsSasTokenGenerated, sasTokenAudienceURI, validityRenewer);
563+
});
564+
}
555565
}
556566

567+
private static ScheduledFuture<?> scheduleRenewTimer(boolean isSasTokenGenerated, String sasTokenAudienceURI, Runnable validityRenewer)
568+
{
569+
TRACE_LOGGER.debug("Sent CBS Token for {}", sasTokenAudienceURI);
570+
if(isSasTokenGenerated)
571+
{
572+
// It will eventually expire. Renew it
573+
int renewInterval = SASUtil.getCBSTokenRenewIntervalInSeconds(ClientConstants.DEFAULT_SAS_TOKEN_VALIDITY_IN_SECONDS);
574+
return Timer.schedule(validityRenewer, Duration.ofSeconds(renewInterval), TimerType.OneTimeRun);
575+
}
576+
else
577+
{
578+
// User provided signature. We can't renew it.
579+
return null;
580+
}
581+
}
582+
557583
CompletableFuture<RequestResponseLink> obtainRequestResponseLinkAsync(String entityPath)
558584
{
559585
this.throwIfClosed(null);

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MiscRequestResponseOperationHandler.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public static CompletableFuture<MiscRequestResponseOperationHandler> create(Mess
3737
{
3838
CompletableFuture<MiscRequestResponseOperationHandler> creationFuture = new CompletableFuture<MiscRequestResponseOperationHandler>();
3939
MiscRequestResponseOperationHandler requestResponseOperationHandler = new MiscRequestResponseOperationHandler(factory, StringUtil.getShortRandomString(), entityPath);
40-
requestResponseOperationHandler.sendSASTokenAndSetRenewTimer().handleAsync((v, ex) -> {
40+
requestResponseOperationHandler.sendSASTokenAndSetRenewTimer(false).handleAsync((v, ex) -> {
4141
if(ex == null)
4242
{
4343
TRACE_LOGGER.info("Opened MiscRequestResponseOperationHandler");
@@ -80,15 +80,16 @@ protected CompletableFuture<Void> onClose() {
8080
return CompletableFuture.completedFuture(null);
8181
}
8282

83-
private CompletableFuture<Void> sendSASTokenAndSetRenewTimer()
83+
private CompletableFuture<Void> sendSASTokenAndSetRenewTimer(boolean retryOnFailure)
8484
{
8585
if(this.getIsClosingOrClosed())
8686
{
8787
return CompletableFuture.completedFuture(null);
8888
}
8989
else
9090
{
91-
CompletableFuture<ScheduledFuture<?>> sendTokenFuture = this.underlyingFactory.sendSASTokenAndSetRenewTimer(this.sasTokenAudienceURI, () -> this.sendSASTokenAndSetRenewTimer());
91+
this.cancelSASTokenRenewTimer();
92+
CompletableFuture<ScheduledFuture<?>> sendTokenFuture = this.underlyingFactory.sendSASTokenAndSetRenewTimer(this.sasTokenAudienceURI, retryOnFailure, () -> this.sendSASTokenAndSetRenewTimer(true));
9293
return sendTokenFuture.thenAccept((f) -> {this.sasTokenRenewTimerFuture = f; TRACE_LOGGER.debug("Set SAS Token renew timer");});
9394
}
9495
}

0 commit comments

Comments
 (0)