Skip to content

Commit 07d051c

Browse files
authored
Master (#238)
* Chaning SLF4J dependency version to 1.7. (#233) # Conflicts: # pom.xml * Fixing a broken link in README by adding support doc (#234) * Changin max message size to 1 MB. (#235) Fixes #214 # Conflicts: # azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/Controller.java # azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageSender.java # azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/RequestResponseLink.java * Fix to the issue where senders, receivers are not recovering after network outage. (#236) # Conflicts: # azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/RequestResponseLink.java * Changing version to 1.2.6 * Removing renew lock tests in a session.
1 parent 331d8a8 commit 07d051c

File tree

10 files changed

+109
-89
lines changed

10 files changed

+109
-89
lines changed

.github/SUPPORT.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
## Support resources to check prior to raising issues
2+
3+
1. Azure Service Bus [samples](https://github.com/Azure/azure-service-bus/tree/master/samples)
4+
1. Already [resolved issues](https://github.com/Azure/azure-service-bus-java/issues?q=is%3Aissue+is%3Aclosed)
5+
1. [StackOverflow](https://stackoverflow.com/questions/tagged/azureservicebus)

azure-servicebus/azure-servicebus.pom

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<modelVersion>4.0.0</modelVersion>
55
<groupId>com.microsoft.azure</groupId>
66
<artifactId>azure-servicebus</artifactId>
7-
<version>1.2.5</version>
7+
<version>1.2.6</version>
88
<licenses>
99
<license>
1010
<name>The MIT License (MIT)</name>
@@ -67,7 +67,7 @@
6767
<dependency>
6868
<groupId>org.slf4j</groupId>
6969
<artifactId>slf4j-api</artifactId>
70-
<version>1.8.0-alpha2</version>
70+
<version>1.7.0</version>
7171
</dependency>
7272
<dependency>
7373
<groupId>com.microsoft.azure</groupId>

azure-servicebus/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
<parent>
1010
<groupId>com.microsoft.azure</groupId>
1111
<artifactId>azure-servicebus-parent</artifactId>
12-
<version>1.2.5</version>
12+
<version>1.0.0</version><!-- Need not change this for every release -->
1313
</parent>
1414

1515
<properties>

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ClientConstants.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,9 @@ private ClientConstants() { }
6767
public static final String DEADLETTER_REASON_HEADER = "DeadLetterReason";
6868
public static final String DEADLETTER_ERROR_DESCRIPTION_HEADER = "DeadLetterErrorDescription";
6969

70-
public static final int MAX_MESSAGE_LENGTH_BYTES = 256 * 1024;
70+
public static final int MAX_MESSAGE_LENGTH_BYTES = 1024 * 1024;
7171
public static final int MAX_FRAME_SIZE_BYTES = 64 * 1024;
72-
public static final int MAX_EVENTHUB_AMQP_HEADER_SIZE_BYTES = 512;
72+
public static final int MAX_MESSAGING_AMQP_HEADER_SIZE_BYTES = 512;
7373

7474
public final static Duration TIMER_TOLERANCE = Duration.ofSeconds(1);
7575

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageReceiver.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,7 @@ private void createReceiveLink()
348348
session.open();
349349
BaseHandler.setHandler(session, new SessionHandler(this.receivePath));
350350

351-
final String receiveLinkNamePrefix = StringUtil.getShortRandomString();
351+
final String receiveLinkNamePrefix = "Receiver".concat(TrackingUtil.TRACKING_ID_TOKEN_SEPARATOR).concat(StringUtil.getShortRandomString());
352352
final String receiveLinkName = !StringUtil.isNullOrEmpty(connection.getRemoteContainer()) ?
353353
receiveLinkNamePrefix.concat(TrackingUtil.TRACKING_ID_TOKEN_SEPARATOR).concat(connection.getRemoteContainer()) :
354354
receiveLinkNamePrefix;
@@ -590,7 +590,6 @@ public void onOpenComplete(Exception exception)
590590
if(this.receiveLinkReopenFuture != null && !this.receiveLinkReopenFuture.isDone())
591591
{
592592
AsyncUtil.completeFuture(this.receiveLinkReopenFuture, null);
593-
this.receiveLinkReopenFuture = null;
594593
}
595594

596595
this.lastKnownLinkError = null;
@@ -617,7 +616,6 @@ public void onOpenComplete(Exception exception)
617616
{
618617
TRACE_LOGGER.warn("Opening receive link to '{}' failed.", this.receivePath, exception);
619618
AsyncUtil.completeFutureExceptionally(this.receiveLinkReopenFuture, exception);
620-
this.receiveLinkReopenFuture = null;
621619
if(this.isSessionReceiver && (exception instanceof SessionLockLostException || exception instanceof SessionCannotBeLockedException))
622620
{
623621
// No point in retrying to establish a link.. SessionLock is lost
@@ -1122,9 +1120,9 @@ public void onEvent()
11221120
private synchronized CompletableFuture<Void> ensureLinkIsOpen()
11231121
{
11241122
// Send SAS token before opening a link as connection might have been closed and reopened
1125-
if (this.receiveLink.getLocalState() == EndpointState.CLOSED || this.receiveLink.getRemoteState() == EndpointState.CLOSED)
1123+
if (!(this.receiveLink.getLocalState() == EndpointState.ACTIVE && this.receiveLink.getRemoteState() == EndpointState.ACTIVE))
11261124
{
1127-
if(this.receiveLinkReopenFuture == null)
1125+
if(this.receiveLinkReopenFuture == null || this.receiveLinkReopenFuture.isDone())
11281126
{
11291127
TRACE_LOGGER.info("Recreating receive link to '{}'", this.receivePath);
11301128
this.retryPolicy.incrementRetryCount(this.getClientId());
@@ -1149,6 +1147,8 @@ private synchronized CompletableFuture<Void> ensureLinkIsOpen()
11491147
this.sendTokenAndSetRenewTimer(false).handleAsync((v, sendTokenEx) -> {
11501148
if(sendTokenEx != null)
11511149
{
1150+
Throwable cause = ExceptionUtil.extractAsyncCompletionCause(sendTokenEx);
1151+
TRACE_LOGGER.error("Sending SAS Token to '{}' failed.", this.receivePath, cause);
11521152
this.receiveLinkReopenFuture.completeExceptionally(sendTokenEx);
11531153
}
11541154
else

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageSender.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ public class CoreMessageSender extends ClientEntity implements IAmqpSender, IErr
8484
private ScheduledFuture<?> sasTokenRenewTimerFuture;
8585
private CompletableFuture<Void> requestResponseLinkCreationFuture;
8686
private CompletableFuture<Void> sendLinkReopenFuture;
87+
private int maxMessageSize;
8788

8889
public static CompletableFuture<CoreMessageSender> create(
8990
final MessagingFactory factory,
@@ -318,17 +319,17 @@ public CompletableFuture<Void> sendAsync(final Iterable<Message> messages)
318319
int byteArrayOffset = 0;
319320
try
320321
{
321-
Pair<byte[], Integer> encodedPair = Util.encodeMessageToMaxSizeArray(batchMessage);
322+
Pair<byte[], Integer> encodedPair = Util.encodeMessageToMaxSizeArray(batchMessage, this.maxMessageSize);
322323
bytes = encodedPair.getFirstItem();
323324
byteArrayOffset = encodedPair.getSecondItem();
324325

325326
for(Message amqpMessage: messages)
326327
{
327328
Message messageWrappedByData = Proton.message();
328-
encodedPair = Util.encodeMessageToOptimalSizeArray(amqpMessage);
329+
encodedPair = Util.encodeMessageToOptimalSizeArray(amqpMessage, this.maxMessageSize);
329330
messageWrappedByData.setBody(new Data(new Binary(encodedPair.getFirstItem(), 0, encodedPair.getSecondItem())));
330331

331-
int encodedSize = Util.encodeMessageToCustomArray(messageWrappedByData, bytes, byteArrayOffset, ClientConstants.MAX_MESSAGE_LENGTH_BYTES - byteArrayOffset - 1);
332+
int encodedSize = Util.encodeMessageToCustomArray(messageWrappedByData, bytes, byteArrayOffset, this.maxMessageSize - byteArrayOffset - 1);
332333
byteArrayOffset = byteArrayOffset + encodedSize;
333334
}
334335
}
@@ -342,12 +343,12 @@ public CompletableFuture<Void> sendAsync(final Iterable<Message> messages)
342343

343344
return this.sendCoreAsync(bytes, byteArrayOffset, AmqpConstants.AMQP_BATCH_MESSAGE_FORMAT);
344345
}
345-
346+
346347
public CompletableFuture<Void> sendAsync(Message msg)
347348
{
348349
try
349350
{
350-
Pair<byte[], Integer> encodedPair = Util.encodeMessageToOptimalSizeArray(msg);
351+
Pair<byte[], Integer> encodedPair = Util.encodeMessageToOptimalSizeArray(msg, this.maxMessageSize);
351352
return this.sendCoreAsync(encodedPair.getFirstItem(), encodedPair.getSecondItem(), DeliveryImpl.DEFAULT_MESSAGE_FORMAT);
352353
}
353354
catch(PayloadSizeExceededException exception)
@@ -365,13 +366,13 @@ public void onOpenComplete(Exception completionException)
365366
if (completionException == null)
366367
{
367368
this.underlyingFactory.registerForConnectionError(this.sendLink);
369+
this.maxMessageSize = Util.getMaxMessageSizeFromLink(this.sendLink);
368370
this.lastKnownLinkError = null;
369371
this.retryPolicy.resetRetryCount(this.getClientId());
370372

371373
if(this.sendLinkReopenFuture != null && !this.sendLinkReopenFuture.isDone())
372374
{
373375
AsyncUtil.completeFuture(this.sendLinkReopenFuture, null);
374-
this.sendLinkReopenFuture = null;
375376
}
376377

377378
if (!this.linkFirstOpen.isDone())
@@ -420,7 +421,6 @@ public void onOpenComplete(Exception completionException)
420421
{
421422
TRACE_LOGGER.warn("Opening send link to '{}' failed", this.sendPath, completionException);
422423
AsyncUtil.completeFutureExceptionally(this.sendLinkReopenFuture, completionException);
423-
this.sendLinkReopenFuture = null;
424424
}
425425
}
426426
}
@@ -683,9 +683,9 @@ public void onFlow(final int creditIssued)
683683
private synchronized CompletableFuture<Void> ensureLinkIsOpen()
684684
{
685685
// Send SAS token before opening a link as connection might have been closed and reopened
686-
if (this.sendLink.getLocalState() == EndpointState.CLOSED || this.sendLink.getRemoteState() == EndpointState.CLOSED)
686+
if (!(this.sendLink.getLocalState() == EndpointState.ACTIVE && this.sendLink.getRemoteState() == EndpointState.ACTIVE))
687687
{
688-
if(this.sendLinkReopenFuture == null)
688+
if(this.sendLinkReopenFuture == null || this.sendLinkReopenFuture.isDone())
689689
{
690690
TRACE_LOGGER.info("Recreating send link to '{}'", this.sendPath);
691691
this.retryPolicy.incrementRetryCount(CoreMessageSender.this.getClientId());
@@ -710,6 +710,8 @@ private synchronized CompletableFuture<Void> ensureLinkIsOpen()
710710
this.sendTokenAndSetRenewTimer(false).handleAsync((v, sendTokenEx) -> {
711711
if(sendTokenEx != null)
712712
{
713+
Throwable cause = ExceptionUtil.extractAsyncCompletionCause(sendTokenEx);
714+
TRACE_LOGGER.error("Sending SAS Token to '{}' failed.", this.sendPath, cause);
713715
this.sendLinkReopenFuture.completeExceptionally(sendTokenEx);
714716
}
715717
else
@@ -788,7 +790,7 @@ private void processSendWork()
788790
sendData = this.pendingSendsData.get(deliveryTag.getDeliveryTag());
789791
if(sendData == null)
790792
{
791-
TRACE_LOGGER.warn("SendData not found for this delivery. path:{}, linkName:{}, deliveryTag:{}", this.sendPath, this.sendLink.getName(), deliveryTag);
793+
TRACE_LOGGER.debug("SendData not found for this delivery. path:{}, linkName:{}, deliveryTag:{}", this.sendPath, this.sendLink.getName(), deliveryTag);
792794
continue;
793795
}
794796
}
@@ -986,10 +988,11 @@ public CompletableFuture<long[]> scheduleMessageAsync(Message[] messages, Durati
986988
Pair<byte[], Integer> encodedPair = null;
987989
try
988990
{
989-
encodedPair = Util.encodeMessageToOptimalSizeArray(message);
991+
encodedPair = Util.encodeMessageToOptimalSizeArray(message, this.maxMessageSize);
990992
}
991993
catch(PayloadSizeExceededException exception)
992994
{
995+
TRACE_LOGGER.error("Payload size of message exceeded limit", exception);
993996
final CompletableFuture<long[]> scheduleMessagesTask = new CompletableFuture<long[]>();
994997
scheduleMessagesTask.completeExceptionally(exception);
995998
return scheduleMessagesTask;

0 commit comments

Comments
 (0)