Skip to content

Commit a25b9d7

Browse files
authored
Cherrypicking fixes from Dev branch (#244)
* Adding a property needed for additional validation to check client type (#240) * Adding a property needed for additional validation to check if the type of client being created matches the entity type. Added unit tests too. * Fixing a unit test # Conflicts: # azure-servicebus/src/main/java/com/microsoft/azure/servicebus/ClientFactory.java # azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageSender.java # azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/Controller.java # azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageSender.java # azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingFactory.java # azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/RequestResponseLink.java # azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/RequestResponseLinkCache.java # azure-servicebus/src/test/java/com/microsoft/azure/servicebus/QueueSendReceiveTests.java * Renaming two public methods. From getScheduledEnqueuedTime to getScheduledEnqueueTime. (#241) Similarly for set. They were mistakenly kept like that in the first release. # Conflicts: # azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageSender.java * Chaning version to 1.2.7
1 parent 07d051c commit a25b9d7

25 files changed

+522
-75
lines changed

azure-servicebus/azure-servicebus.pom

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<modelVersion>4.0.0</modelVersion>
55
<groupId>com.microsoft.azure</groupId>
66
<artifactId>azure-servicebus</artifactId>
7-
<version>1.2.6</version>
7+
<version>1.2.7</version>
88
<licenses>
99
<license>
1010
<name>The MIT License (MIT)</name>

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/BrowsableMessageSession.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import java.util.UUID;
1111
import java.util.concurrent.CompletableFuture;
1212

13+
import com.microsoft.azure.servicebus.primitives.MessagingEntityType;
1314
import com.microsoft.azure.servicebus.primitives.MessagingFactory;
1415
import com.microsoft.azure.servicebus.primitives.ServiceBusException;
1516

@@ -19,8 +20,8 @@
1920
final class BrowsableMessageSession extends MessageSession {
2021
private static final String INVALID_OPERATION_ERROR_MESSAGE = "Unsupported operation on a browse only session.";
2122

22-
BrowsableMessageSession(String sessionId, MessagingFactory messagingFactory, String entityPath) {
23-
super(messagingFactory, entityPath, sessionId, ReceiveMode.PEEKLOCK);
23+
BrowsableMessageSession(String sessionId, MessagingFactory messagingFactory, String entityPath, MessagingEntityType entityType) {
24+
super(messagingFactory, entityPath, entityType, sessionId, ReceiveMode.PEEKLOCK);
2425
// try {
2526
// this.initializeAsync().get();
2627
// } catch (InterruptedException | ExecutionException e) {

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/ClientFactory.java

Lines changed: 61 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import java.util.concurrent.CompletableFuture;
88

99
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
10+
import com.microsoft.azure.servicebus.primitives.MessagingEntityType;
1011
import com.microsoft.azure.servicebus.primitives.MessagingFactory;
1112
import com.microsoft.azure.servicebus.primitives.ServiceBusException;
1213
import com.microsoft.azure.servicebus.primitives.Util;
@@ -48,6 +49,10 @@ public static IMessageSender createMessageSenderFromConnectionStringBuilder(Conn
4849
return Utils.completeFuture(createMessageSenderFromConnectionStringBuilderAsync(amqpConnectionStringBuilder));
4950
}
5051

52+
static IMessageSender createMessageSenderFromConnectionStringBuilder(ConnectionStringBuilder amqpConnectionStringBuilder, MessagingEntityType entityType) throws InterruptedException, ServiceBusException {
53+
return Utils.completeFuture(createMessageSenderFromConnectionStringBuilderAsync(amqpConnectionStringBuilder, entityType));
54+
}
55+
5156
/**
5257
* Creates a message sender to the entity using the client settings.
5358
* @param namespaceName namespace of entity
@@ -73,10 +78,26 @@ public static IMessageSender createMessageSenderFromEntityPath(String namespaceN
7378
public static IMessageSender createMessageSenderFromEntityPath(URI namespaceEndpointURI, String entityPath, ClientSettings clientSettings) throws InterruptedException, ServiceBusException {
7479
return Utils.completeFuture(createMessageSenderFromEntityPathAsync(namespaceEndpointURI, entityPath, clientSettings));
7580
}
81+
82+
static IMessageSender createMessageSenderFromEntityPath(URI namespaceEndpointURI, String entityPath, MessagingEntityType entityType, ClientSettings clientSettings) throws InterruptedException, ServiceBusException {
83+
return Utils.completeFuture(createMessageSenderFromEntityPathAsync(namespaceEndpointURI, entityPath, entityType, clientSettings));
84+
}
7685

77-
static IMessageSender createMessageSenderFromEntityPath(MessagingFactory messagingFactory, String entityPath) throws InterruptedException, ServiceBusException {
86+
/**
87+
* Creates a message sender to the entity.
88+
* @param messagingFactory messaging factory (which represents a connection) on which sender needs to be created
89+
* @param entityPath path of entity
90+
* @return IMessageSender instance
91+
* @throws InterruptedException if the current thread was interrupted while waiting
92+
* @throws ServiceBusException if the sender cannot be created
93+
*/
94+
public static IMessageSender createMessageSenderFromEntityPath(MessagingFactory messagingFactory, String entityPath) throws InterruptedException, ServiceBusException {
7895
return Utils.completeFuture(createMessageSenderFromEntityPathAsync(messagingFactory, entityPath));
7996
}
97+
98+
static IMessageSender createMessageSenderFromEntityPath(MessagingFactory messagingFactory, String entityPath, MessagingEntityType entityType) throws InterruptedException, ServiceBusException {
99+
return Utils.completeFuture(createMessageSenderFromEntityPathAsync(messagingFactory, entityPath, entityType));
100+
}
80101

81102
/**
82103
* Create message sender asynchronously from connection string with <a href="https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-sas">Shared Access Signatures</a>
@@ -100,6 +121,11 @@ public static CompletableFuture<IMessageSender> createMessageSenderFromConnectio
100121
return createMessageSenderFromEntityPathAsync(amqpConnectionStringBuilder.getEndpoint(), amqpConnectionStringBuilder.getEntityPath(), Util.getClientSettingsFromConnectionStringBuilder(amqpConnectionStringBuilder));
101122
}
102123

124+
static CompletableFuture<IMessageSender> createMessageSenderFromConnectionStringBuilderAsync(ConnectionStringBuilder amqpConnectionStringBuilder, MessagingEntityType entityType) {
125+
Utils.assertNonNull("amqpConnectionStringBuilder", amqpConnectionStringBuilder);
126+
return createMessageSenderFromEntityPathAsync(amqpConnectionStringBuilder.getEndpoint(), amqpConnectionStringBuilder.getEntityPath(), entityType, Util.getClientSettingsFromConnectionStringBuilder(amqpConnectionStringBuilder));
127+
}
128+
103129
/**
104130
* Creates a message sender asynchronously to the entity using the client settings.
105131
* @param namespaceName namespace name of entity
@@ -122,15 +148,30 @@ public static CompletableFuture<IMessageSender> createMessageSenderFromEntityPat
122148
* @return a CompletableFuture representing the pending creating of IMessageSender instance
123149
*/
124150
public static CompletableFuture<IMessageSender> createMessageSenderFromEntityPathAsync(URI namespaceEndpointURI, String entityPath, ClientSettings clientSettings)
151+
{
152+
return createMessageSenderFromEntityPathAsync(namespaceEndpointURI, entityPath, null, clientSettings);
153+
}
154+
155+
static CompletableFuture<IMessageSender> createMessageSenderFromEntityPathAsync(URI namespaceEndpointURI, String entityPath, MessagingEntityType entityType, ClientSettings clientSettings)
125156
{
126157
Utils.assertNonNull("namespaceEndpointURI", namespaceEndpointURI);
127-
MessageSender sender = new MessageSender(namespaceEndpointURI, entityPath, clientSettings);
158+
MessageSender sender = new MessageSender(namespaceEndpointURI, entityPath, entityType, clientSettings);
128159
return sender.initializeAsync().thenApply((v) -> sender);
129160
}
130161

131-
static CompletableFuture<IMessageSender> createMessageSenderFromEntityPathAsync(MessagingFactory messagingFactory, String entityPath) {
162+
/**
163+
* Creates a message sender asynchronously to the entity using the {@link MessagingFactory}
164+
* @param messagingFactory messaging factory (which represents a connection) on which sender needs to be created
165+
* @param entityPath path of entity
166+
* @return a CompletableFuture representing the pending creating of IMessageSender instance
167+
*/
168+
public static CompletableFuture<IMessageSender> createMessageSenderFromEntityPathAsync(MessagingFactory messagingFactory, String entityPath) {
169+
return createMessageSenderFromEntityPathAsync(messagingFactory, entityPath, null);
170+
}
171+
172+
static CompletableFuture<IMessageSender> createMessageSenderFromEntityPathAsync(MessagingFactory messagingFactory, String entityPath, MessagingEntityType entityType) {
132173
Utils.assertNonNull("messagingFactory", messagingFactory);
133-
MessageSender sender = new MessageSender(messagingFactory, entityPath);
174+
MessageSender sender = new MessageSender(messagingFactory, entityPath, entityType);
134175
return sender.initializeAsync().thenApply((v) -> sender);
135176
}
136177

@@ -252,6 +293,10 @@ static IMessageReceiver createMessageReceiverFromEntityPath(MessagingFactory mes
252293
static IMessageReceiver createMessageReceiverFromEntityPath(MessagingFactory messagingFactory, String entityPath, ReceiveMode receiveMode) throws InterruptedException, ServiceBusException {
253294
return Utils.completeFuture(createMessageReceiverFromEntityPathAsync(messagingFactory, entityPath, receiveMode));
254295
}
296+
297+
static IMessageReceiver createMessageReceiverFromEntityPath(MessagingFactory messagingFactory, String entityPath, MessagingEntityType entityType, ReceiveMode receiveMode) throws InterruptedException, ServiceBusException {
298+
return Utils.completeFuture(createMessageReceiverFromEntityPathAsync(messagingFactory, entityPath, entityType, receiveMode));
299+
}
255300

256301
/**
257302
* Create {@link IMessageReceiver} in default {@link ReceiveMode#PEEKLOCK} mode asynchronously from connection string with <a href="https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-sas">Shared Access Signatures</a>
@@ -343,7 +388,7 @@ public static CompletableFuture<IMessageReceiver> createMessageReceiverFromEntit
343388
public static CompletableFuture<IMessageReceiver> createMessageReceiverFromEntityPathAsync(URI namespaceEndpointURI, String entityPath, ClientSettings clientSettings, ReceiveMode receiveMode) {
344389
Utils.assertNonNull("namespaceEndpointURI", namespaceEndpointURI);
345390
Utils.assertNonNull("entityPath", entityPath);
346-
MessageReceiver receiver = new MessageReceiver(namespaceEndpointURI, entityPath, clientSettings, receiveMode);
391+
MessageReceiver receiver = new MessageReceiver(namespaceEndpointURI, entityPath, null, clientSettings, receiveMode);
347392
return receiver.initializeAsync().thenApply((v) -> receiver);
348393
}
349394

@@ -352,8 +397,12 @@ static CompletableFuture<IMessageReceiver> createMessageReceiverFromEntityPathAs
352397
}
353398

354399
static CompletableFuture<IMessageReceiver> createMessageReceiverFromEntityPathAsync(MessagingFactory messagingFactory, String entityPath, ReceiveMode receiveMode) {
400+
return createMessageReceiverFromEntityPathAsync(messagingFactory, entityPath, null, receiveMode);
401+
}
402+
403+
static CompletableFuture<IMessageReceiver> createMessageReceiverFromEntityPathAsync(MessagingFactory messagingFactory, String entityPath, MessagingEntityType entityType, ReceiveMode receiveMode) {
355404
Utils.assertNonNull("messagingFactory", messagingFactory);
356-
MessageReceiver receiver = new MessageReceiver(messagingFactory, entityPath, receiveMode);
405+
MessageReceiver receiver = new MessageReceiver(messagingFactory, entityPath, entityType, receiveMode);
357406
return receiver.initializeAsync().thenApply((v) -> receiver);
358407
}
359408

@@ -575,7 +624,7 @@ public static CompletableFuture<IMessageSession> acceptSessionFromEntityPathAsyn
575624
public static CompletableFuture<IMessageSession> acceptSessionFromEntityPathAsync(URI namespaceEndpointURI, String entityPath, String sessionId, ClientSettings clientSettings, ReceiveMode receiveMode) {
576625
Utils.assertNonNull("namespaceEndpointURI", namespaceEndpointURI);
577626
Utils.assertNonNull("entityPath", entityPath);
578-
MessageSession session = new MessageSession(namespaceEndpointURI, entityPath, sessionId, clientSettings, receiveMode);
627+
MessageSession session = new MessageSession(namespaceEndpointURI, entityPath, null, sessionId, clientSettings, receiveMode);
579628
return session.initializeAsync().thenApply((v) -> session);
580629
}
581630

@@ -584,8 +633,12 @@ static CompletableFuture<IMessageSession> acceptSessionFromEntityPathAsync(Messa
584633
}
585634

586635
static CompletableFuture<IMessageSession> acceptSessionFromEntityPathAsync(MessagingFactory messagingFactory, String entityPath, String sessionId, ReceiveMode receiveMode) {
636+
return acceptSessionFromEntityPathAsync(messagingFactory, entityPath, null, sessionId, receiveMode);
637+
}
638+
639+
static CompletableFuture<IMessageSession> acceptSessionFromEntityPathAsync(MessagingFactory messagingFactory, String entityPath, MessagingEntityType entityType, String sessionId, ReceiveMode receiveMode) {
587640
Utils.assertNonNull("messagingFactory", messagingFactory);
588-
MessageSession session = new MessageSession(messagingFactory, entityPath, sessionId, receiveMode);
641+
MessageSession session = new MessageSession(messagingFactory, entityPath, entityType, sessionId, receiveMode);
589642
return session.initializeAsync().thenApply((v) -> session);
590643
}
591644
}

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IMessage.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,16 +137,41 @@ public interface IMessage {
137137
*
138138
* @return the instant at which the message will be enqueued in Azure Service Bus
139139
* @see <a href="https://docs.microsoft.com/azure/service-bus-messaging/message-sequencing">Message Sequencing and Timestamps</a>
140+
* @deprecated Replaced by {@link #getScheduledEnqueueTimeUtc()
140141
*/
142+
@Deprecated
141143
public Instant getScheduledEnqueuedTimeUtc();
142144

143145
/**
144146
* Sets the scheduled enqueue time of this message.
145147
*
146148
* @param scheduledEnqueueTimeUtc the instant at which this message should be enqueued in Azure Service Bus
147-
* @see #getScheduledEnqueuedTimeUtc()
149+
* @see #getScheduledEnqueueTimeUtc()
150+
* @deprecated Replaced by {@link #setScheduledEnqueueTimeUtc(Instant)()
148151
*/
152+
@Deprecated
149153
public void setScheduledEnqueuedTimeUtc(Instant scheduledEnqueueTimeUtc);
154+
155+
/**
156+
* Gets the scheduled enqueue time of this message.
157+
*
158+
* This value is used for delayed message availability. The message is safely added to
159+
* the queue, but is not considered active and therefore not retrievable until the
160+
* scheduled enqueue time. Mind that the message may not be activated (enqueued) at the exact given
161+
* instant; the actual activation time depends on the queue's workload and its state.
162+
*
163+
* @return the instant at which the message will be enqueued in Azure Service Bus
164+
* @see <a href="https://docs.microsoft.com/azure/service-bus-messaging/message-sequencing">Message Sequencing and Timestamps</a>
165+
*/
166+
public Instant getScheduledEnqueueTimeUtc();
167+
168+
/**
169+
* Sets the scheduled enqueue time of this message.
170+
*
171+
* @param scheduledEnqueueTimeUtc the instant at which this message should be enqueued in Azure Service Bus
172+
* @see #getScheduledEnqueueTimeUtc()
173+
*/
174+
public void setScheduledEnqueueTimeUtc(Instant scheduledEnqueueTimeUtc);
150175

151176
/**
152177
* Gets the unique number assigned to a message by Service Bus.

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/Message.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -254,15 +254,27 @@ public void setReplyToSessionId(String replyToSessionId) {
254254
this.replyToSessionId = replyToSessionId;
255255
}
256256

257+
@Deprecated
257258
@Override
258259
public Instant getScheduledEnqueuedTimeUtc() {
259-
return this.scheduledEnqueueTimeUtc;
260+
return this.getScheduledEnqueueTimeUtc();
260261
}
261262

263+
@Deprecated
262264
@Override
263265
public void setScheduledEnqueuedTimeUtc(Instant scheduledEnqueueTimeUtc) {
266+
this.setScheduledEnqueueTimeUtc(scheduledEnqueueTimeUtc);
267+
}
268+
269+
@Override
270+
public Instant getScheduledEnqueueTimeUtc() {
271+
return this.scheduledEnqueueTimeUtc;
272+
}
273+
274+
@Override
275+
public void setScheduledEnqueueTimeUtc(Instant scheduledEnqueueTimeUtc) {
264276
this.scheduledEnqueueTimeUtc = scheduledEnqueueTimeUtc;
265-
}
277+
}
266278

267279
@Override
268280
public String getPartitionKey() {
@@ -301,5 +313,4 @@ void setDeliveryTag(byte[] deliveryTag)
301313
{
302314
this.deliveryTag = deliveryTag;
303315
}
304-
305316
}

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageAndSessionPump.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
import com.microsoft.azure.servicebus.primitives.ExceptionUtil;
1818
import com.microsoft.azure.servicebus.primitives.MessageLockLostException;
19+
import com.microsoft.azure.servicebus.primitives.MessagingEntityType;
1920
import com.microsoft.azure.servicebus.primitives.MessagingFactory;
2021
import com.microsoft.azure.servicebus.primitives.OperationCancelledException;
2122
import com.microsoft.azure.servicebus.primitives.ServiceBusException;
@@ -36,6 +37,7 @@ class MessageAndSessionPump extends InitializableEntity implements IMessageAndSe
3637
private final MessagingFactory factory;
3738
private final String entityPath;
3839
private final ReceiveMode receiveMode;
40+
private final MessagingEntityType entityType;
3941
private IMessageReceiver innerReceiver;
4042

4143
private boolean handlerRegistered = false;
@@ -45,10 +47,11 @@ class MessageAndSessionPump extends InitializableEntity implements IMessageAndSe
4547
private SessionHandlerOptions sessionHandlerOptions;
4648
private int prefetchCount;
4749

48-
public MessageAndSessionPump(MessagingFactory factory, String entityPath, ReceiveMode receiveMode) {
50+
public MessageAndSessionPump(MessagingFactory factory, String entityPath, MessagingEntityType entityType, ReceiveMode receiveMode) {
4951
super(StringUtil.getShortRandomString());
5052
this.factory = factory;
5153
this.entityPath = entityPath;
54+
this.entityType = entityType;
5255
this.receiveMode = receiveMode;
5356
this.openSessions = new ConcurrentHashMap<>();
5457
this.prefetchCount = UNSET_PREFETCH_COUNT;
@@ -66,7 +69,7 @@ public void registerMessageHandler(IMessageHandler handler, MessageHandlerOption
6669
this.messageHandler = handler;
6770
this.messageHandlerOptions = handlerOptions;
6871

69-
this.innerReceiver = ClientFactory.createMessageReceiverFromEntityPath(this.factory, this.entityPath, this.receiveMode);
72+
this.innerReceiver = ClientFactory.createMessageReceiverFromEntityPath(this.factory, this.entityPath, this.entityType, this.receiveMode);
7073
TRACE_LOGGER.info("Created MessageReceiver to entity '{}'", this.entityPath);
7174
if(this.prefetchCount != UNSET_PREFETCH_COUNT)
7275
{
@@ -210,7 +213,7 @@ private void receiveAndPumpMessage() {
210213
private void acceptSessionAndPumpMessages() {
211214
if (!this.getIsClosingOrClosed()) {
212215
TRACE_LOGGER.debug("Accepting a session from entity '{}'", this.entityPath);
213-
CompletableFuture<IMessageSession> acceptSessionFuture = ClientFactory.acceptSessionFromEntityPathAsync(this.factory, this.entityPath, null, this.receiveMode);
216+
CompletableFuture<IMessageSession> acceptSessionFuture = ClientFactory.acceptSessionFromEntityPathAsync(this.factory, this.entityPath, this.entityType, null, this.receiveMode);
214217
acceptSessionFuture.handleAsync((session, acceptSessionEx) -> {
215218
if (acceptSessionEx != null) {
216219
acceptSessionEx = ExceptionUtil.extractAsyncCompletionCause(acceptSessionEx);

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageConverter.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,9 @@ public static org.apache.qpid.proton.message.Message convertBrokeredMessageToAmq
5050
amqpMessage.setGroupId(brokeredMessage.getSessionId());
5151

5252
Map<Symbol, Object> messageAnnotationsMap = new HashMap<Symbol, Object>();
53-
if(brokeredMessage.getScheduledEnqueuedTimeUtc() != null)
53+
if(brokeredMessage.getScheduledEnqueueTimeUtc() != null)
5454
{
55-
messageAnnotationsMap.put(Symbol.valueOf(ClientConstants.SCHEDULEDENQUEUETIMENAME), Date.from(brokeredMessage.getScheduledEnqueuedTimeUtc()));
55+
messageAnnotationsMap.put(Symbol.valueOf(ClientConstants.SCHEDULEDENQUEUETIMENAME), Date.from(brokeredMessage.getScheduledEnqueueTimeUtc()));
5656
}
5757

5858
if(!StringUtil.isNullOrEmpty(brokeredMessage.getPartitionKey()))
@@ -158,7 +158,7 @@ public static Message convertAmqpMessageToBrokeredMessage(org.apache.qpid.proton
158158
brokeredMessage.setEnqueuedTimeUtc(((Date)entry.getValue()).toInstant());
159159
break;
160160
case ClientConstants.SCHEDULEDENQUEUETIMENAME:
161-
brokeredMessage.setScheduledEnqueuedTimeUtc(((Date)entry.getValue()).toInstant());
161+
brokeredMessage.setScheduledEnqueueTimeUtc(((Date)entry.getValue()).toInstant());
162162
break;
163163
case ClientConstants.SEQUENCENUBMERNAME:
164164
brokeredMessage.setSequenceNumber((long)entry.getValue());

0 commit comments

Comments
 (0)