Skip to content

Commit 68d114b

Browse files
authored
Moving SAS token renewal part from MiscOpeationsHandler to RequestResponseLink so when connection is lost (#188)
and request-response link request is the first request on reconnection, SAS token is sent to service bus before attempting the request.
1 parent 9327038 commit 68d114b

File tree

5 files changed

+129
-113
lines changed

5 files changed

+129
-113
lines changed

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageReceiver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1142,7 +1142,7 @@ private synchronized CompletableFuture<Void> ensureLinkIsOpen()
11421142
TRACE_LOGGER.warn(operationTimedout.getMessage());
11431143
linkReopenFutureThatCanBeCancelled.completeExceptionally(operationTimedout);
11441144
}
1145-
}
1145+
}
11461146
, CoreMessageReceiver.LINK_REOPEN_TIMEOUT
11471147
, TimerType.OneTimeRun);
11481148
this.cancelSASTokenRenewTimer();

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -628,7 +628,7 @@ private CompletableFuture<Void> createCBSLinkAsync()
628628
String requestResponseLinkPath = RequestResponseLink.getCBSNodeLinkPath();
629629
TRACE_LOGGER.info("Creating CBS link to {}", requestResponseLinkPath);
630630
CompletableFuture<Void> crateAndAssignRequestResponseLink =
631-
RequestResponseLink.createAsync(this, this.getClientId() + "-cbs", requestResponseLinkPath).handleAsync((cbsLink, ex) ->
631+
RequestResponseLink.createAsync(this, this.getClientId() + "-cbs", requestResponseLinkPath, null).handleAsync((cbsLink, ex) ->
632632
{
633633
if(ex == null)
634634
{

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MiscRequestResponseOperationHandler.java

Lines changed: 1 addition & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
package com.microsoft.azure.servicebus.primitives;
22

3-
import java.time.ZonedDateTime;
43
import java.util.*;
54
import java.util.concurrent.CompletableFuture;
6-
import java.util.concurrent.ScheduledFuture;
75

86
import org.apache.qpid.proton.amqp.DescribedType;
97
import org.apache.qpid.proton.message.Message;
@@ -18,63 +16,26 @@ public final class MiscRequestResponseOperationHandler extends ClientEntity
1816

1917
private final Object requestResonseLinkCreationLock = new Object();
2018
private final String entityPath;
21-
private final String sasTokenAudienceURI;
2219
private final MessagingFactory underlyingFactory;
2320
private RequestResponseLink requestResponseLink;
2421
private CompletableFuture<Void> requestResponseLinkCreationFuture;
25-
private ScheduledFuture<?> sasTokenRenewTimerFuture;
2622

2723
private MiscRequestResponseOperationHandler(MessagingFactory factory, String linkName, String entityPath)
2824
{
2925
super(linkName, factory);
3026

3127
this.underlyingFactory = factory;
3228
this.entityPath = entityPath;
33-
this.sasTokenAudienceURI = String.format(ClientConstants.SAS_TOKEN_AUDIENCE_FORMAT, factory.getHostName(), entityPath);
3429
}
3530

3631
public static CompletableFuture<MiscRequestResponseOperationHandler> create(MessagingFactory factory, String entityPath)
3732
{
38-
CompletableFuture<MiscRequestResponseOperationHandler> creationFuture = new CompletableFuture<MiscRequestResponseOperationHandler>();
3933
MiscRequestResponseOperationHandler requestResponseOperationHandler = new MiscRequestResponseOperationHandler(factory, StringUtil.getShortRandomString(), entityPath);
40-
requestResponseOperationHandler.sendTokenAndSetRenewTimer(false).handleAsync((v, ex) -> {
41-
if(ex == null)
42-
{
43-
TRACE_LOGGER.info("Opened MiscRequestResponseOperationHandler");
44-
creationFuture.complete(requestResponseOperationHandler);
45-
}
46-
else
47-
{
48-
TRACE_LOGGER.error("Opening of MiscRequestResponseOperationHandler failed", ex);
49-
creationFuture.completeExceptionally(ExceptionUtil.extractAsyncCompletionCause(ex));
50-
}
51-
return null;
52-
});
53-
54-
Timer.schedule(
55-
new Runnable()
56-
{
57-
public void run()
58-
{
59-
if (!creationFuture.isDone())
60-
{
61-
requestResponseOperationHandler.closeInternals();
62-
Exception operationTimedout = new TimeoutException(
63-
String.format(Locale.US, "Open operation on CBSLink(%s) on Entity(%s) timed out at %s.", requestResponseOperationHandler.getClientId(), requestResponseOperationHandler.entityPath, ZonedDateTime.now().toString()));
64-
TRACE_LOGGER.warn(operationTimedout.getMessage());
65-
66-
creationFuture.completeExceptionally(operationTimedout);
67-
}
68-
}
69-
}
70-
, factory.getOperationTimeout()
71-
, TimerType.OneTimeRun);
72-
return creationFuture;
34+
return CompletableFuture.completedFuture(requestResponseOperationHandler);
7335
}
7436

7537
private void closeInternals()
7638
{
77-
this.cancelSASTokenRenewTimer();
7839
this.closeRequestResponseLink();
7940
}
8041

@@ -85,28 +46,6 @@ protected CompletableFuture<Void> onClose() {
8546
return CompletableFuture.completedFuture(null);
8647
}
8748

88-
private CompletableFuture<Void> sendTokenAndSetRenewTimer(boolean retryOnFailure)
89-
{
90-
if(this.getIsClosingOrClosed())
91-
{
92-
return CompletableFuture.completedFuture(null);
93-
}
94-
else
95-
{
96-
CompletableFuture<ScheduledFuture<?>> sendTokenFuture = this.underlyingFactory.sendSecurityTokenAndSetRenewTimer(this.sasTokenAudienceURI, retryOnFailure, () -> this.sendTokenAndSetRenewTimer(true));
97-
return sendTokenFuture.thenAccept((f) -> {this.sasTokenRenewTimerFuture = f; TRACE_LOGGER.debug("Set SAS Token renew timer");});
98-
}
99-
}
100-
101-
private void cancelSASTokenRenewTimer()
102-
{
103-
if(this.sasTokenRenewTimerFuture != null && !this.sasTokenRenewTimerFuture.isDone())
104-
{
105-
TRACE_LOGGER.debug("Cancelling SAS Token renew timer");
106-
this.sasTokenRenewTimerFuture.cancel(true);
107-
}
108-
}
109-
11049
private CompletableFuture<Void> createRequestResponseLink()
11150
{
11251
synchronized (this.requestResonseLinkCreationLock) {

0 commit comments

Comments
 (0)