Skip to content

Commit 15f4ed6

Browse files
authored
Porting bug fixes from dev branch (#195)
* ReceiveLink idle close fix (#186) * Sending extra application property in request message to associate a request on request-response link with a sender or rceiver. * Correcting a typo * Moving SAS token renewal part from MiscOpeationsHandler to RequestResponseLink so when connection is lost (#188) and request-response link request is the first request on reconnection, SAS token is sent to service bus before attempting the request. # Conflicts: # azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MiscRequestResponseOperationHandler.java # azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/RequestResponseLink.java * Fixing a memory leak in qpid layer. (#193) * Fixing a memory leak in qpid layer. When service force closes an idle connetion, it is not properly freed in qpid layer. Includes some minor changes to make code simpler. * Fixing a code review comment. # Conflicts: # azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingFactory.java * Changing version. * Resolving a merge conflict.
1 parent f182602 commit 15f4ed6

22 files changed

+269
-204
lines changed

azure-servicebus/azure-servicebus.pom

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<modelVersion>4.0.0</modelVersion>
55
<groupId>com.microsoft.azure</groupId>
66
<artifactId>azure-servicebus</artifactId>
7-
<version>1.1.1</version>
7+
<version>1.1.2</version>
88
<licenses>
99
<license>
1010
<name>The MIT License (MIT)</name>

azure-servicebus/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
<parent>
1010
<groupId>com.microsoft.azure</groupId>
1111
<artifactId>azure-servicebus-parent</artifactId>
12-
<version>1.1.1</version>
12+
<version>1.1.2</version>
1313
</parent>
1414

1515
<build>

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/InitializableEntity.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@
1515
abstract class InitializableEntity extends ClientEntity {
1616

1717
//TODO Init and close semantics are primitive now. Fix them with support for other states like Initializing, Closing, and concurrency.
18-
protected InitializableEntity(String clientId, ClientEntity parent) {
19-
super(clientId, parent);
18+
protected InitializableEntity(String clientId) {
19+
super(clientId);
2020
}
2121

2222
/**

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageAndSessionPump.java

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class MessageAndSessionPump extends InitializableEntity implements IMessageAndSe
4848
private int prefetchCount;
4949

5050
public MessageAndSessionPump(MessagingFactory factory, String entityPath, ReceiveMode receiveMode) {
51-
super(StringUtil.getShortRandomString(), null);
51+
super(StringUtil.getShortRandomString());
5252
this.factory = factory;
5353
this.entityPath = entityPath;
5454
this.receiveMode = receiveMode;
@@ -92,7 +92,7 @@ public void registerSessionHandler(ISessionHandler handler, SessionHandlerOption
9292
this.sessionHandlerOptions = handlerOptions;
9393

9494
for (int i = 0; i < handlerOptions.getMaxConcurrentSessions(); i++) {
95-
this.acceptSessionsAndPumpMessage();
95+
this.acceptSessionAndPumpMessages();
9696
}
9797
}
9898

@@ -142,6 +142,12 @@ private void receiveAndPumpMessage() {
142142
onMessageFuture = new CompletableFuture<Void>();
143143
onMessageFuture.completeExceptionally(onMessageSyncEx);
144144
}
145+
146+
// Some clients are returning null from the call
147+
if(onMessageFuture == null)
148+
{
149+
onMessageFuture = CompletableFuture.completedFuture(null);
150+
}
145151

146152
onMessageFuture.handleAsync((v, onMessageEx) -> {
147153
if (onMessageEx != null) {
@@ -203,7 +209,7 @@ private void receiveAndPumpMessage() {
203209
}
204210
}
205211

206-
private void acceptSessionsAndPumpMessage() {
212+
private void acceptSessionAndPumpMessages() {
207213
if (!this.getIsClosingOrClosed()) {
208214
TRACE_LOGGER.debug("Accepting a session from entity '{}'", this.entityPath);
209215
CompletableFuture<IMessageSession> acceptSessionFuture = ClientFactory.acceptSessionFromEntityPathAsync(this.factory, this.entityPath, null, this.receiveMode);
@@ -222,7 +228,7 @@ private void acceptSessionsAndPumpMessage() {
222228
// In case of any other exception, sleep and retry
223229
TRACE_LOGGER.debug("AcceptSession from entity '{}' will be retried after '{}'.", this.entityPath, SLEEP_DURATION_ON_ACCEPT_SESSION_EXCEPTION);
224230
Timer.schedule(() -> {
225-
MessageAndSessionPump.this.acceptSessionsAndPumpMessage();
231+
MessageAndSessionPump.this.acceptSessionAndPumpMessages();
226232
}, SLEEP_DURATION_ON_ACCEPT_SESSION_EXCEPTION, TimerType.OneTimeRun);
227233
}
228234
} else {
@@ -293,6 +299,12 @@ private void receiveFromSessionAndPumpMessage(SessionTracker sessionTracker) {
293299
onMessageFuture.completeExceptionally(onMessageSyncEx);
294300
}
295301

302+
// Some clients are returning null from the call
303+
if(onMessageFuture == null)
304+
{
305+
onMessageFuture = CompletableFuture.completedFuture(null);
306+
}
307+
296308
onMessageFuture.handleAsync((v, onMessageEx) -> {
297309
renewCancelTimer.cancel(true);
298310
if (onMessageEx != null) {
@@ -419,6 +431,12 @@ synchronized CompletableFuture<Boolean> shouldRetryOnNoMessageOrException() {
419431
onCloseFuture = new CompletableFuture<Void>();
420432
onCloseFuture.completeExceptionally(onCloseSyncEx);
421433
}
434+
435+
// Some clients are returning null from the call
436+
if(onCloseFuture == null)
437+
{
438+
onCloseFuture = CompletableFuture.completedFuture(null);
439+
}
422440

423441
onCloseFuture.handleAsync((v, onCloseEx) -> {
424442
renewCancelTimer.cancel(true);
@@ -441,7 +459,7 @@ synchronized CompletableFuture<Boolean> shouldRetryOnNoMessageOrException() {
441459
}
442460

443461
this.messageAndSessionPump.openSessions.remove(this.session.getSessionId());
444-
this.messageAndSessionPump.acceptSessionsAndPumpMessage();
462+
this.messageAndSessionPump.acceptSessionAndPumpMessages();
445463
return null;
446464
});
447465
return null;

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageReceiver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ class MessageReceiver extends InitializableEntity implements IMessageReceiver, I
5454
private final ConcurrentHashMap<UUID, Instant> requestResponseLockTokensToLockTimesMap;
5555

5656
private MessageReceiver(ReceiveMode receiveMode) {
57-
super(StringUtil.getShortRandomString(), null);
57+
super(StringUtil.getShortRandomString());
5858
this.receiveMode = receiveMode;
5959
this.requestResponseLockTokensToLockTimesMap = new ConcurrentHashMap<>();
6060
if (receiveMode == ReceiveMode.PEEKLOCK) {

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageSender.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ final class MessageSender extends InitializableEntity implements IMessageSender
2828
private boolean isInitialized = false;
2929

3030
private MessageSender() {
31-
super(StringUtil.getShortRandomString(), null);
31+
super(StringUtil.getShortRandomString());
3232
}
3333

3434
MessageSender(ConnectionStringBuilder amqpConnectionStringBuilder) {

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/QueueClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public final class QueueClient extends InitializableEntity implements IQueueClie
3434
private MiscRequestResponseOperationHandler miscRequestResponseHandler;
3535

3636
private QueueClient(ReceiveMode receiveMode, String queuePath) {
37-
super(StringUtil.getShortRandomString(), null);
37+
super(StringUtil.getShortRandomString());
3838
this.receiveMode = receiveMode;
3939
this.queuePath = queuePath;
4040
this.senderCreationLock = new Object();

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/SubscriptionClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public final class SubscriptionClient extends InitializableEntity implements ISu
3636

3737
private SubscriptionClient(ReceiveMode receiveMode, String subscriptionPath)
3838
{
39-
super(StringUtil.getShortRandomString(), null);
39+
super(StringUtil.getShortRandomString());
4040
this.receiveMode = receiveMode;
4141
this.subscriptionPath = subscriptionPath;
4242
}

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/TopicClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ public final class TopicClient extends InitializableEntity implements ITopicClie
2424
private MessageBrowser browser;
2525

2626
private TopicClient() {
27-
super(StringUtil.getShortRandomString(), null);
27+
super(StringUtil.getShortRandomString());
2828
}
2929

3030
public TopicClient(ConnectionStringBuilder amqpConnectionStringBuilder) throws InterruptedException, ServiceBusException {

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/ConnectionHandler.java

Lines changed: 19 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -68,12 +68,6 @@ public void onConnectionBound(Event event)
6868
sasl.setMechanisms("ANONYMOUS");
6969
}
7070

71-
@Override
72-
public void onConnectionUnbound(Event event)
73-
{
74-
TRACE_LOGGER.debug("Connection.onConnectionUnbound: hostname:{}", event.getConnection().getHostname());
75-
}
76-
7771
@Override
7872
public void onTransportError(Event event)
7973
{
@@ -89,9 +83,10 @@ public void onTransportError(Event event)
8983

9084
this.messagingFactory.onConnectionError(condition);
9185
Connection connection = event.getConnection();
92-
if (connection != null) {
93-
connection.free();
94-
}
86+
if(connection != null)
87+
{
88+
connection.free();
89+
}
9590
}
9691

9792
@Override
@@ -108,38 +103,33 @@ public void onConnectionRemoteClose(Event event)
108103
final ErrorCondition error = connection.getRemoteCondition();
109104

110105
TRACE_LOGGER.debug("onConnectionRemoteClose: hostname:{},errorCondition:{}", connection.getHostname(), error != null ? error.getCondition() + "," + error.getDescription() : null);
111-
112-
if (connection.getRemoteState() != EndpointState.CLOSED)
106+
boolean shouldFreeConnection = connection.getLocalState() == EndpointState.CLOSED;
107+
this.messagingFactory.onConnectionError(error);
108+
if(shouldFreeConnection)
113109
{
114-
connection.close();
110+
connection.free();
115111
}
116-
117-
this.messagingFactory.onConnectionError(error);
118-
this.freeOnCloseResponse(connection);
119112
}
120113

121114
@Override
122115
public void onConnectionFinal(Event event) {
123-
final Transport transport = event.getTransport();
124-
if (transport != null) {
125-
transport.unbind();
126-
transport.free();
127-
}
116+
TRACE_LOGGER.debug("onConnectionFinal: hostname:{}", event.getConnection().getHostname());
128117
}
129118

130119
@Override
131120
public void onConnectionLocalClose(Event event) {
132121
Connection connection = event.getConnection();
133122
TRACE_LOGGER.debug("onConnectionLocalClose: hostname:{}", connection.getHostname());
134-
this.freeOnCloseResponse(connection);
135-
}
136-
137-
private void freeOnCloseResponse(Connection connection) {
138-
if (connection != null &&
139-
connection.getLocalState() == EndpointState.CLOSED &&
140-
(connection.getRemoteState() == EndpointState.CLOSED)) {
141-
connection.free();
142-
}
123+
if(connection.getRemoteState() == EndpointState.CLOSED)
124+
{
125+
// Service closed it first. In some such cases transport is not unbound and causing a leak.
126+
if(connection.getTransport() != null)
127+
{
128+
connection.getTransport().unbind();
129+
}
130+
131+
connection.free();
132+
}
143133
}
144134

145135
private static SslDomain makeDomain(SslDomain.Mode mode)

0 commit comments

Comments
 (0)