Skip to content

Commit 136376b

Browse files
authored
Fixing high CPU and memory leak in session pump (#382)
* Fixing high CPU and memory leak in session pump caused by timed out accept session requests. * Fixing a minor race condition when closing sender or receiver on link creation timeout.
1 parent 1f5138a commit 136376b

File tree

7 files changed

+42
-17
lines changed

7 files changed

+42
-17
lines changed

azure-servicebus/azure-servicebus.pom

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<modelVersion>4.0.0</modelVersion>
55
<groupId>com.microsoft.azure</groupId>
66
<artifactId>azure-servicebus</artifactId>
7-
<version>1.2.15</version>
7+
<version>1.2.16</version>
88
<licenses>
99
<license>
1010
<name>The MIT License (MIT)</name>

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/ReceiveLinkHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public void onLinkRemoteOpen(Event event)
6060
}
6161
else
6262
{
63-
TRACE_LOGGER.debug("onLinkRemoteOpen: linkName:{}, remoteTarget:{}, remoteTarget:{}, action:{}", receiver.getName(), null, null, "waitingForError");
63+
TRACE_LOGGER.debug("onLinkRemoteOpen: linkName:{}, remoteSource:{}, remoteTarget:{}, action:{}", receiver.getName(), null, null, "waitingForError");
6464
}
6565
}
6666
}

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ClientEntity.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,20 @@ protected final void setClosed()
5757
{
5858
synchronized (this.syncClose)
5959
{
60+
this.isClosing = false;
6061
this.isClosed = true;
6162
}
6263
}
64+
65+
protected final void setClosing()
66+
{
67+
synchronized (this.syncClose)
68+
{
69+
if (!this.isClosed) {
70+
this.isClosing = true;
71+
}
72+
}
73+
}
6374

6475
public final CompletableFuture<Void> closeAsync()
6576
{

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageReceiver.java

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -105,10 +105,10 @@ public class CoreMessageReceiver extends ClientEntity implements IAmqpReceiver,
105105
private CompletableFuture<Void> ensureLinkReopenFutureToWaitOn;
106106
private final Runnable timedOutUpdateStateRequestsDaemon;
107107
private final Runnable returnMesagesLoopDaemon;
108-
private final ScheduledFuture<?> updateStateRequestsTimeoutChecker;
109-
private final ScheduledFuture<?> returnMessagesLoopRunner;
110108
private final MessagingEntityType entityType;
111109
private boolean shouldRetryLinkReopenOnTransientFailure = true;
110+
private ScheduledFuture<?> updateStateRequestsTimeoutChecker;
111+
private ScheduledFuture<?> returnMessagesLoopRunner;
112112

113113
// TODO Change onReceiveComplete to handle empty deliveries. Change onError to retry updateState requests.
114114
private CoreMessageReceiver(final MessagingFactory factory,
@@ -151,6 +151,12 @@ private CoreMessageReceiver(final MessagingFactory factory,
151151
public void run() {
152152
try
153153
{
154+
if (CoreMessageReceiver.this.getIsClosed())
155+
{
156+
CoreMessageReceiver.this.updateStateRequestsTimeoutChecker.cancel(true);
157+
return;
158+
}
159+
154160
TRACE_LOGGER.trace("Starting '{}' core message receiver's internal loop to complete timed out update state requests.", CoreMessageReceiver.this.receivePath);
155161
for(Map.Entry<String, UpdateStateWorkItem> entry : CoreMessageReceiver.this.pendingUpdateStateRequests.entrySet())
156162
{
@@ -182,6 +188,12 @@ public void run() {
182188
public void run() {
183189
try
184190
{
191+
if (CoreMessageReceiver.this.getIsClosed())
192+
{
193+
CoreMessageReceiver.this.returnMessagesLoopRunner.cancel(true);
194+
return;
195+
}
196+
185197
TRACE_LOGGER.trace("Starting '{}' core message receiver's internal loop to return messages to waiting clients.", CoreMessageReceiver.this.receivePath);
186198
while(!CoreMessageReceiver.this.prefetchedMessages.isEmpty())
187199
{
@@ -210,11 +222,6 @@ public void run() {
210222
}
211223
}
212224
};
213-
214-
// As all update state requests have the same timeout, one timer is better than having one timer per request
215-
this.updateStateRequestsTimeoutChecker = Timer.schedule(timedOutUpdateStateRequestsDaemon, CoreMessageReceiver.UPDATE_STATE_REQUESTS_DAEMON_WAKE_UP_INTERVAL, TimerType.RepeatRun);
216-
// Scheduling it as a separate thread that wakes up at regular very short intervals.. Doesn't wait on incoming receive requests from callers or incoming deliveries from reactor
217-
this.returnMessagesLoopRunner = Timer.schedule(returnMesagesLoopDaemon, CoreMessageReceiver.RETURN_MESSAGES_DAEMON_WAKE_UP_INTERVAL, TimerType.RepeatRun);
218225
}
219226

220227
// Connection has to be associated with Reactor before Creating a receiver on it.
@@ -636,6 +643,11 @@ public void onOpenComplete(Exception exception)
636643
if (this.linkOpen != null && !this.linkOpen.getWork().isDone())
637644
{
638645
AsyncUtil.completeFuture(this.linkOpen.getWork(), this);
646+
647+
// As all update state requests have the same timeout, one timer is better than having one timer per request
648+
this.updateStateRequestsTimeoutChecker = Timer.schedule(timedOutUpdateStateRequestsDaemon, CoreMessageReceiver.UPDATE_STATE_REQUESTS_DAEMON_WAKE_UP_INTERVAL, TimerType.RepeatRun);
649+
// Scheduling it as a separate thread that wakes up at regular very short intervals.. Doesn't wait on incoming receive requests from callers or incoming deliveries from reactor
650+
this.returnMessagesLoopRunner = Timer.schedule(returnMesagesLoopDaemon, CoreMessageReceiver.RETURN_MESSAGES_DAEMON_WAKE_UP_INTERVAL, TimerType.RepeatRun);
639651
}
640652

641653
if(this.receiveLinkReopenFuture != null && !this.receiveLinkReopenFuture.isDone())
@@ -924,14 +936,15 @@ public void run()
924936
{
925937
if (!linkOpen.getWork().isDone())
926938
{
927-
CoreMessageReceiver.this.closeInternals(false);
928-
CoreMessageReceiver.this.setClosed();
929-
930939
Exception operationTimedout = new TimeoutException(
931940
String.format(Locale.US, "%s operation on ReceiveLink(%s) to path(%s) timed out at %s.", "Open", CoreMessageReceiver.this.receiveLink.getName(), CoreMessageReceiver.this.receivePath, ZonedDateTime.now()),
932941
CoreMessageReceiver.this.lastKnownLinkError);
933942
TRACE_LOGGER.warn(operationTimedout.getMessage());
934943
ExceptionUtil.completeExceptionally(linkOpen.getWork(), operationTimedout, CoreMessageReceiver.this, true);
944+
945+
CoreMessageReceiver.this.setClosing();
946+
CoreMessageReceiver.this.closeInternals(false);
947+
CoreMessageReceiver.this.setClosed();
935948
}
936949
}
937950
}

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageSender.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -675,14 +675,15 @@ public void run()
675675
{
676676
if (!CoreMessageSender.this.linkFirstOpen.isDone())
677677
{
678-
CoreMessageSender.this.closeInternals(false);
679-
CoreMessageSender.this.setClosed();
680-
681678
Exception operationTimedout = new TimeoutException(
682679
String.format(Locale.US, "Open operation on SendLink(%s) on Entity(%s) timed out at %s.", CoreMessageSender.this.sendLink.getName(), CoreMessageSender.this.getSendPath(), ZonedDateTime.now().toString()),
683680
CoreMessageSender.this.lastKnownErrorReportedAt.isAfter(Instant.now().minusSeconds(ClientConstants.SERVER_BUSY_BASE_SLEEP_TIME_IN_SECS)) ? CoreMessageSender.this.lastKnownLinkError : null);
684681
TRACE_LOGGER.warn(operationTimedout.getMessage());
685682
ExceptionUtil.completeExceptionally(CoreMessageSender.this.linkFirstOpen, operationTimedout, CoreMessageSender.this, true);
683+
684+
CoreMessageSender.this.setClosing();
685+
CoreMessageSender.this.closeInternals(false);
686+
CoreMessageSender.this.setClosed();
686687
}
687688
}
688689
}

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/Util.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -385,7 +385,7 @@ static int encodeMessageToCustomArray(Message message, byte[] encodedBytes, int
385385
// Pass little less than client timeout to the server so client doesn't time out before server times out
386386
static Duration adjustServerTimeout(Duration clientTimeout)
387387
{
388-
return clientTimeout.minusMillis(100);
388+
return clientTimeout.minusMillis(200);
389389
}
390390

391391
// This is not super stable for some reason

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
<proton-j-version>0.31.0</proton-j-version>
1414
<junit-version>4.12</junit-version>
1515
<slf4j-version>1.7.0</slf4j-version>
16-
<client-current-version>1.2.15</client-current-version>
16+
<client-current-version>1.2.16</client-current-version>
1717
</properties>
1818

1919
<modules>

0 commit comments

Comments
 (0)