Skip to content

Commit e7fcc0b

Browse files
authored
Fixing a memory leak in qpid layer. (#193)
* Fixing a memory leak in qpid layer. When service force closes an idle connetion, it is not properly freed in qpid layer. Includes some minor changes to make code simpler. * Fixing a code review comment.
1 parent 68d114b commit e7fcc0b

15 files changed

+108
-84
lines changed

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/InitializableEntity.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@
1515
abstract class InitializableEntity extends ClientEntity {
1616

1717
//TODO Init and close semantics are primitive now. Fix them with support for other states like Initializing, Closing, and concurrency.
18-
protected InitializableEntity(String clientId, ClientEntity parent) {
19-
super(clientId, parent);
18+
protected InitializableEntity(String clientId) {
19+
super(clientId);
2020
}
2121

2222
/**

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageAndSessionPump.java

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class MessageAndSessionPump extends InitializableEntity implements IMessageAndSe
4848
private int prefetchCount;
4949

5050
public MessageAndSessionPump(MessagingFactory factory, String entityPath, ReceiveMode receiveMode) {
51-
super(StringUtil.getShortRandomString(), null);
51+
super(StringUtil.getShortRandomString());
5252
this.factory = factory;
5353
this.entityPath = entityPath;
5454
this.receiveMode = receiveMode;
@@ -92,7 +92,7 @@ public void registerSessionHandler(ISessionHandler handler, SessionHandlerOption
9292
this.sessionHandlerOptions = handlerOptions;
9393

9494
for (int i = 0; i < handlerOptions.getMaxConcurrentSessions(); i++) {
95-
this.acceptSessionsAndPumpMessage();
95+
this.acceptSessionAndPumpMessages();
9696
}
9797
}
9898

@@ -142,6 +142,12 @@ private void receiveAndPumpMessage() {
142142
onMessageFuture = new CompletableFuture<Void>();
143143
onMessageFuture.completeExceptionally(onMessageSyncEx);
144144
}
145+
146+
// Some clients are returning null from the call
147+
if(onMessageFuture == null)
148+
{
149+
onMessageFuture = CompletableFuture.completedFuture(null);
150+
}
145151

146152
onMessageFuture.handleAsync((v, onMessageEx) -> {
147153
if (onMessageEx != null) {
@@ -203,7 +209,7 @@ private void receiveAndPumpMessage() {
203209
}
204210
}
205211

206-
private void acceptSessionsAndPumpMessage() {
212+
private void acceptSessionAndPumpMessages() {
207213
if (!this.getIsClosingOrClosed()) {
208214
TRACE_LOGGER.debug("Accepting a session from entity '{}'", this.entityPath);
209215
CompletableFuture<IMessageSession> acceptSessionFuture = ClientFactory.acceptSessionFromEntityPathAsync(this.factory, this.entityPath, null, this.receiveMode);
@@ -222,7 +228,7 @@ private void acceptSessionsAndPumpMessage() {
222228
// In case of any other exception, sleep and retry
223229
TRACE_LOGGER.debug("AcceptSession from entity '{}' will be retried after '{}'.", this.entityPath, SLEEP_DURATION_ON_ACCEPT_SESSION_EXCEPTION);
224230
Timer.schedule(() -> {
225-
MessageAndSessionPump.this.acceptSessionsAndPumpMessage();
231+
MessageAndSessionPump.this.acceptSessionAndPumpMessages();
226232
}, SLEEP_DURATION_ON_ACCEPT_SESSION_EXCEPTION, TimerType.OneTimeRun);
227233
}
228234
} else {
@@ -293,6 +299,12 @@ private void receiveFromSessionAndPumpMessage(SessionTracker sessionTracker) {
293299
onMessageFuture.completeExceptionally(onMessageSyncEx);
294300
}
295301

302+
// Some clients are returning null from the call
303+
if(onMessageFuture == null)
304+
{
305+
onMessageFuture = CompletableFuture.completedFuture(null);
306+
}
307+
296308
onMessageFuture.handleAsync((v, onMessageEx) -> {
297309
renewCancelTimer.cancel(true);
298310
if (onMessageEx != null) {
@@ -419,6 +431,12 @@ synchronized CompletableFuture<Boolean> shouldRetryOnNoMessageOrException() {
419431
onCloseFuture = new CompletableFuture<Void>();
420432
onCloseFuture.completeExceptionally(onCloseSyncEx);
421433
}
434+
435+
// Some clients are returning null from the call
436+
if(onCloseFuture == null)
437+
{
438+
onCloseFuture = CompletableFuture.completedFuture(null);
439+
}
422440

423441
onCloseFuture.handleAsync((v, onCloseEx) -> {
424442
renewCancelTimer.cancel(true);
@@ -441,7 +459,7 @@ synchronized CompletableFuture<Boolean> shouldRetryOnNoMessageOrException() {
441459
}
442460

443461
this.messageAndSessionPump.openSessions.remove(this.session.getSessionId());
444-
this.messageAndSessionPump.acceptSessionsAndPumpMessage();
462+
this.messageAndSessionPump.acceptSessionAndPumpMessages();
445463
return null;
446464
});
447465
return null;

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageReceiver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ class MessageReceiver extends InitializableEntity implements IMessageReceiver, I
5656
private final ConcurrentHashMap<UUID, Instant> requestResponseLockTokensToLockTimesMap;
5757

5858
private MessageReceiver(ReceiveMode receiveMode) {
59-
super(StringUtil.getShortRandomString(), null);
59+
super(StringUtil.getShortRandomString());
6060
this.receiveMode = receiveMode;
6161
this.requestResponseLockTokensToLockTimesMap = new ConcurrentHashMap<>();
6262
if (receiveMode == ReceiveMode.PEEKLOCK) {

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageSender.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ final class MessageSender extends InitializableEntity implements IMessageSender
2929
private ClientSettings clientSettings;
3030

3131
private MessageSender() {
32-
super(StringUtil.getShortRandomString(), null);
32+
super(StringUtil.getShortRandomString());
3333
}
3434

3535
MessageSender(URI namespaceEndpointURI, String entityPath, ClientSettings clientSettings) {

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/QueueClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public final class QueueClient extends InitializableEntity implements IQueueClie
3636
private MiscRequestResponseOperationHandler miscRequestResponseHandler;
3737

3838
private QueueClient(ReceiveMode receiveMode, String queuePath) {
39-
super(StringUtil.getShortRandomString(), null);
39+
super(StringUtil.getShortRandomString());
4040
this.receiveMode = receiveMode;
4141
this.queuePath = queuePath;
4242
this.senderCreationLock = new Object();

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/SubscriptionClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public final class SubscriptionClient extends InitializableEntity implements ISu
3838

3939
private SubscriptionClient(ReceiveMode receiveMode, String subscriptionPath)
4040
{
41-
super(StringUtil.getShortRandomString(), null);
41+
super(StringUtil.getShortRandomString());
4242
this.receiveMode = receiveMode;
4343
this.subscriptionPath = subscriptionPath;
4444
}

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/TopicClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public final class TopicClient extends InitializableEntity implements ITopicClie
2626
private MessageBrowser browser;
2727

2828
private TopicClient() {
29-
super(StringUtil.getShortRandomString(), null);
29+
super(StringUtil.getShortRandomString());
3030
}
3131

3232
public TopicClient(ConnectionStringBuilder amqpConnectionStringBuilder) throws InterruptedException, ServiceBusException {

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/ConnectionHandler.java

Lines changed: 19 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -68,12 +68,6 @@ public void onConnectionBound(Event event)
6868
sasl.setMechanisms("ANONYMOUS");
6969
}
7070

71-
@Override
72-
public void onConnectionUnbound(Event event)
73-
{
74-
TRACE_LOGGER.debug("Connection.onConnectionUnbound: hostname:{}", event.getConnection().getHostname());
75-
}
76-
7771
@Override
7872
public void onTransportError(Event event)
7973
{
@@ -89,9 +83,10 @@ public void onTransportError(Event event)
8983

9084
this.messagingFactory.onConnectionError(condition);
9185
Connection connection = event.getConnection();
92-
if (connection != null) {
93-
connection.free();
94-
}
86+
if(connection != null)
87+
{
88+
connection.free();
89+
}
9590
}
9691

9792
@Override
@@ -108,38 +103,33 @@ public void onConnectionRemoteClose(Event event)
108103
final ErrorCondition error = connection.getRemoteCondition();
109104

110105
TRACE_LOGGER.debug("onConnectionRemoteClose: hostname:{},errorCondition:{}", connection.getHostname(), error != null ? error.getCondition() + "," + error.getDescription() : null);
111-
112-
if (connection.getRemoteState() != EndpointState.CLOSED)
106+
boolean shouldFreeConnection = connection.getLocalState() == EndpointState.CLOSED;
107+
this.messagingFactory.onConnectionError(error);
108+
if(shouldFreeConnection)
113109
{
114-
connection.close();
110+
connection.free();
115111
}
116-
117-
this.messagingFactory.onConnectionError(error);
118-
this.freeOnCloseResponse(connection);
119112
}
120113

121114
@Override
122115
public void onConnectionFinal(Event event) {
123-
final Transport transport = event.getTransport();
124-
if (transport != null) {
125-
transport.unbind();
126-
transport.free();
127-
}
116+
TRACE_LOGGER.debug("onConnectionFinal: hostname:{}", event.getConnection().getHostname());
128117
}
129118

130119
@Override
131120
public void onConnectionLocalClose(Event event) {
132121
Connection connection = event.getConnection();
133122
TRACE_LOGGER.debug("onConnectionLocalClose: hostname:{}", connection.getHostname());
134-
this.freeOnCloseResponse(connection);
135-
}
136-
137-
private void freeOnCloseResponse(Connection connection) {
138-
if (connection != null &&
139-
connection.getLocalState() == EndpointState.CLOSED &&
140-
(connection.getRemoteState() == EndpointState.CLOSED)) {
141-
connection.free();
142-
}
123+
if(connection.getRemoteState() == EndpointState.CLOSED)
124+
{
125+
// Service closed it first. In some such cases transport is not unbound and causing a leak.
126+
if(connection.getTransport() != null)
127+
{
128+
connection.getTransport().unbind();
129+
}
130+
131+
connection.free();
132+
}
143133
}
144134

145135
private static SslDomain makeDomain(SslDomain.Mode mode)

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/CustomIOHandler.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,13 @@ public void onConnectionLocalOpen(Event event)
2424
transport.setEmitFlowEventOnSend(false);
2525
transport.bind(connection);
2626
}
27+
28+
@Override
29+
public void onTransportClosed(Event event)
30+
{
31+
if(event.getTransport() != null)
32+
{
33+
event.getTransport().unbind();
34+
}
35+
}
2736
}

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ClientEntity.java

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,13 @@ public abstract class ClientEntity
1818
{
1919
private final String clientId;
2020
private final Object syncClose;
21-
private final ClientEntity parent;
2221

2322
private boolean isClosing;
2423
private boolean isClosed;
2524

26-
protected ClientEntity(final String clientId, final ClientEntity parent)
25+
protected ClientEntity(final String clientId)
2726
{
2827
this.clientId = clientId;
29-
this.parent = parent;
30-
3128
this.syncClose = new Object();
3229
}
3330

@@ -39,21 +36,19 @@ public String getClientId()
3936
}
4037

4138
protected boolean getIsClosed()
42-
{
43-
final boolean isParentClosed = this.parent != null && this.parent.getIsClosed();
39+
{
4440
synchronized (this.syncClose)
4541
{
46-
return isParentClosed || this.isClosed;
42+
return this.isClosed;
4743
}
4844
}
49-
50-
// returns true even if the Parent is (being) Closed
45+
5146
protected boolean getIsClosingOrClosed()
5247
{
53-
final boolean isParentClosingOrClosed = this.parent != null && this.parent.getIsClosingOrClosed();
48+
5449
synchronized (this.syncClose)
5550
{
56-
return isParentClosingOrClosed || this.isClosing || this.isClosed;
51+
return this.isClosing || this.isClosed;
5752
}
5853
}
5954

0 commit comments

Comments
 (0)