Skip to content

Commit 650ee00

Browse files
authored
Fixing a rare case where idle connection is closed by the service at the exact time the client is creating a link. (#373)
If the connection is closed by service after CBS token is sent, link open operaiton was failing with AuthorizsationFailed exception.
1 parent 4d760c4 commit 650ee00

File tree

6 files changed

+109
-12
lines changed

6 files changed

+109
-12
lines changed

azure-servicebus/azure-servicebus.pom

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<modelVersion>4.0.0</modelVersion>
55
<groupId>com.microsoft.azure</groupId>
66
<artifactId>azure-servicebus</artifactId>
7-
<version>1.2.14</version>
7+
<version>1.2.15</version>
88
<licenses>
99
<license>
1010
<name>The MIT License (MIT)</name>

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageReceiver.java

Lines changed: 49 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,11 +102,13 @@ public class CoreMessageReceiver extends ClientEntity implements IAmqpReceiver,
102102
private ScheduledFuture<?> sasTokenRenewTimerFuture;
103103
private CompletableFuture<Void> requestResponseLinkCreationFuture;
104104
private CompletableFuture<Void> receiveLinkReopenFuture;
105+
private CompletableFuture<Void> ensureLinkReopenFutureToWaitOn;
105106
private final Runnable timedOutUpdateStateRequestsDaemon;
106107
private final Runnable returnMesagesLoopDaemon;
107108
private final ScheduledFuture<?> updateStateRequestsTimeoutChecker;
108109
private final ScheduledFuture<?> returnMessagesLoopRunner;
109110
private final MessagingEntityType entityType;
111+
private boolean shouldRetryLinkReopenOnTransientFailure = true;
110112

111113
// TODO Change onReceiveComplete to handle empty deliveries. Change onError to retry updateState requests.
112114
private CoreMessageReceiver(final MessagingFactory factory,
@@ -370,8 +372,25 @@ private void closeRequestResponseLink()
370372
private void createReceiveLink()
371373
{
372374
TRACE_LOGGER.info("Creating receive link to '{}'", this.receivePath);
373-
Connection connection = this.underlyingFactory.getConnection();
374-
375+
Connection connection = this.underlyingFactory.getActiveConnectionOrNothing();
376+
if (connection == null) {
377+
// Connection closed after sending CBS token. Happens only in the rare case of azure service bus closing idle connection, just right after sending
378+
// CBS token but before opening a link.
379+
TRACE_LOGGER.warn("Idle connection closed by service just after sending CBS token. Very rare case. Will retry.");
380+
ServiceBusException exception = new ServiceBusException(true, "Idle connection closed by service just after sending CBS token. Please retry.");
381+
if (this.linkOpen != null && !this.linkOpen.getWork().isDone()) {
382+
// Should never happen
383+
AsyncUtil.completeFutureExceptionally(this.linkOpen.getWork(), exception);
384+
}
385+
386+
if(this.receiveLinkReopenFuture != null && !this.receiveLinkReopenFuture.isDone()) {
387+
// Complete the future and re-attempt link creation
388+
AsyncUtil.completeFutureExceptionally(this.receiveLinkReopenFuture, exception);
389+
}
390+
391+
return;
392+
}
393+
375394
final Session session = connection.session();
376395
session.setIncomingCapacity(Integer.MAX_VALUE);
377396
session.open();
@@ -877,7 +896,7 @@ public void onEvent()
877896
catch (IOException ioException)
878897
{
879898
this.pendingReceives.remove(receiveWorkItem);
880-
this.reduceCreditForCompletedReceiveRequest(receiveWorkItem.getMaxMessageCount());
899+
this.reduceCreditForCompletedReceiveRequest(receiveWorkItem.getMaxMessageCount());
881900
receiveWorkItem.getWork().completeExceptionally(generateDispatacherSchedulingFailedException("completeMessage", ioException));
882901
receiveWorkItem.cancelTimeoutTask(false);
883902
}
@@ -1191,7 +1210,33 @@ public void onEvent()
11911210
}, MessagingFactory.INTERNAL_THREAD_POOL);
11921211
}
11931212

1194-
return this.receiveLinkReopenFuture;
1213+
if (this.ensureLinkReopenFutureToWaitOn == null || this.ensureLinkReopenFutureToWaitOn.isDone()) {
1214+
this.ensureLinkReopenFutureToWaitOn = new CompletableFuture<Void>();
1215+
this.shouldRetryLinkReopenOnTransientFailure = true;
1216+
}
1217+
1218+
this.receiveLinkReopenFuture.handleAsync((v, ex) -> {
1219+
if (ex == null) {
1220+
this.ensureLinkReopenFutureToWaitOn.complete(null);
1221+
} else {
1222+
if (ex instanceof ServiceBusException && ((ServiceBusException)ex).getIsTransient()) {
1223+
if (this.shouldRetryLinkReopenOnTransientFailure) {
1224+
// Retry link creation
1225+
this.shouldRetryLinkReopenOnTransientFailure = false;
1226+
this.ensureLinkIsOpen();
1227+
} else {
1228+
this.ensureLinkReopenFutureToWaitOn.completeExceptionally(ex);
1229+
}
1230+
} else {
1231+
this.ensureLinkReopenFutureToWaitOn.completeExceptionally(ex);
1232+
}
1233+
1234+
}
1235+
return null;
1236+
},
1237+
MessagingFactory.INTERNAL_THREAD_POOL);
1238+
1239+
return this.ensureLinkReopenFutureToWaitOn;
11951240
}
11961241
else
11971242
{

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageSender.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ public class CoreMessageSender extends ClientEntity implements IAmqpSender, IErr
8686
private CompletableFuture<Void> requestResponseLinkCreationFuture;
8787
private CompletableFuture<Void> sendLinkReopenFuture;
8888
private int maxMessageSize;
89+
private boolean shouldRetryLinkOpenIfConnectionIsClosedAfterCBSTokenSent = true;
8990

9091
@Deprecated
9192
public static CompletableFuture<CoreMessageSender> create(
@@ -375,6 +376,7 @@ public CompletableFuture<Void> sendAsync(Message msg)
375376
@Override
376377
public void onOpenComplete(Exception completionException)
377378
{
379+
this.shouldRetryLinkOpenIfConnectionIsClosedAfterCBSTokenSent = true;
378380
if (completionException == null)
379381
{
380382
this.maxMessageSize = Util.getMaxMessageSizeFromLink(this.sendLink);
@@ -576,13 +578,35 @@ private void cleanupFailedSend(final SendWorkItem<Void> failedSend, final Throwa
576578
private void createSendLink()
577579
{
578580
TRACE_LOGGER.info("Creating send link to '{}'", this.sendPath);
579-
final Connection connection = this.underlyingFactory.getConnection();
581+
Connection connection = this.underlyingFactory.getActiveConnectionOrNothing();
582+
if (connection == null) {
583+
// Connection closed after sending CBS token. Happens only in the rare case of azure service bus closing idle connection, just right after sending
584+
// CBS token but before opening a link.
585+
TRACE_LOGGER.warn("Idle connection closed by service just after sending CBS token. Very rare case. Will retry.");
586+
ServiceBusException exception = new ServiceBusException(true, "Idle connection closed by service just after sending CBS token. Please retry.");
587+
if (this.linkFirstOpen != null && !this.linkFirstOpen.isDone()) {
588+
// Should never happen
589+
AsyncUtil.completeFutureExceptionally(this.linkFirstOpen, exception);
590+
}
591+
592+
if (this.sendLinkReopenFuture != null && !this.sendLinkReopenFuture.isDone()) {
593+
// Complete the future and re-attempt link creation
594+
AsyncUtil.completeFutureExceptionally(this.sendLinkReopenFuture, exception);
595+
if(this.shouldRetryLinkOpenIfConnectionIsClosedAfterCBSTokenSent) {
596+
this.shouldRetryLinkOpenIfConnectionIsClosedAfterCBSTokenSent = false;
597+
Timer.schedule(() -> {this.ensureLinkIsOpen();}, Duration.ZERO, TimerType.OneTimeRun);
598+
}
599+
}
600+
601+
return;
602+
}
603+
580604
final Session session = connection.session();
581605
session.setOutgoingWindow(Integer.MAX_VALUE);
582606
session.open();
583607
BaseHandler.setHandler(session, new SessionHandler(sendPath));
584608

585-
final String sendLinkNamePrefix = StringUtil.getShortRandomString();
609+
final String sendLinkNamePrefix = "sender".concat(TrackingUtil.TRACKING_ID_TOKEN_SEPARATOR).concat(StringUtil.getShortRandomString());
586610
final String sendLinkName = !StringUtil.isNullOrEmpty(connection.getRemoteContainer()) ?
587611
sendLinkNamePrefix.concat(TrackingUtil.TRACKING_ID_TOKEN_SEPARATOR).concat(connection.getRemoteContainer()) :
588612
sendLinkNamePrefix;

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingFactory.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,17 @@ private void startReactor(ReactorHandler reactorHandler) throws IOException
140140
TRACE_LOGGER.info("Started reactor");
141141
}
142142

143-
Connection getConnection()
143+
Connection getActiveConnectionOrNothing()
144+
{
145+
if (this.connection == null || this.connection.getLocalState() == EndpointState.CLOSED || this.connection.getRemoteState() == EndpointState.CLOSED) {
146+
return null;
147+
}
148+
else {
149+
return this.connection;
150+
}
151+
}
152+
153+
Connection getActiveConnectionCreateIfNecessary()
144154
{
145155
if (this.connection == null || this.connection.getLocalState() == EndpointState.CLOSED || this.connection.getRemoteState() == EndpointState.CLOSED)
146156
{
@@ -667,7 +677,7 @@ private void createCBSLinkAsync()
667677
this.cbsLinkCreationFuture.completeExceptionally(completionEx);
668678
}
669679
else
670-
{
680+
{
671681
String requestResponseLinkPath = RequestResponseLink.getCBSNodeLinkPath();
672682
TRACE_LOGGER.info("Creating CBS link to {}", requestResponseLinkPath);
673683
RequestResponseLink.createAsync(this, this.getClientId() + "-cbs", requestResponseLinkPath, null, null).handleAsync((cbsLink, ex) ->

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/RequestResponseLink.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -239,9 +239,27 @@ private void createInternalLinks()
239239
commonLinkProperties.put(ClientConstants.ENTITY_TYPE_PROPERTY, this.entityType.getIntValue());
240240
}
241241

242-
// Create send link
243-
final Connection connection = this.underlyingFactory.getConnection();
242+
Connection connection;
243+
if(this.sasTokenAudienceURI == null) {
244+
// CBS link. Doesn't have to send SAS token
245+
connection = this.underlyingFactory.getActiveConnectionCreateIfNecessary();
246+
}
247+
else {
248+
connection = this.underlyingFactory.getActiveConnectionOrNothing();
249+
if (connection == null) {
250+
// Connection closed after sending CBS token. Happens only in the rare case of azure service bus closing idle connection, just right after sending
251+
// CBS token but before opening a link.
252+
TRACE_LOGGER.warn("Idle connection closed by service just after sending CBS token. Very rare case. Will retry.");
253+
ServiceBusException exception = new ServiceBusException(true, "Idle connection closed by service just after sending CBS token. Please retry.");
254+
AsyncUtil.completeFutureExceptionally(this.amqpSender.openFuture, exception);
255+
AsyncUtil.completeFutureExceptionally(this.amqpReceiver.openFuture, exception);
256+
// Retry with little delay so that link recreation in progress flag is reset
257+
Timer.schedule(() -> {RequestResponseLink.this.ensureUniqueLinkRecreation();}, Duration.ofMillis(1000), TimerType.OneTimeRun);
258+
return;
259+
}
260+
}
244261

262+
// Create send link
245263
Session session = connection.session();
246264
session.setOutgoingWindow(Integer.MAX_VALUE);
247265
session.open();

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
<proton-j-version>0.31.0</proton-j-version>
1414
<junit-version>4.12</junit-version>
1515
<slf4j-version>1.7.0</slf4j-version>
16-
<client-current-version>1.2.14</client-current-version>
16+
<client-current-version>1.2.15</client-current-version>
1717
</properties>
1818

1919
<modules>

0 commit comments

Comments
 (0)