Skip to content

Commit 71a3135

Browse files
authored
Send-Via / TransferSender to send messages across entities in a transaction (#215)
This feature lets a user send messages across entities in a single transaction. A single transaction cannot theoretically span across entities. So as to support cross-entity transaction, you can now send a message to a `destination-queue` via another queue. The transaction will be performed on the `via-queue`, and once successful, the message will be forwarded/transferred to its intended destination. Sample: ```java IMessageSender viaQueueSender = ClientFactory.createMessageSenderFromEntityPath(factory, viaQueueName); IMessageReceiver viaQueueReceiver = ClientFactory.createMessageReceiverFromEntityPath(factory, viaQueueName, ReceiveMode.PEEKLOCK); IMessageSender destinationViaSender = ClientFactory.createTransferMessageSenderFromEntityPath(factory, destinationQueueName, viaQueueName); Message message1 = new Message("message"); message1.setMessageId("1"); message1.setPartitionKey("pk1"); Message message2 = new Message("message"); message2.setMessageId("2"); message2.setPartitionKey("pk2"); message2.setViaPartitionKey("pk1"); viaQueueSender.send(message1); IMessage receivedMessage = viaQueueReceiver.receive(); TransactionContext transaction = this.factory.startTransaction(); viaQueueReceiver.complete(receivedMessage.getLockToken(), transaction); destinationViaSender.send(message2, transaction); transaction.commit(); ``` In the above example, operations on both `message1` and `message2` (intended for different entities) are being performed within the same entity (i.e., the `via-queue`)
1 parent dd8c132 commit 71a3135

File tree

17 files changed

+417
-115
lines changed

17 files changed

+417
-115
lines changed

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/ClientFactory.java

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,25 @@ public static IMessageSender createMessageSenderFromEntityPath(MessagingFactory
8686
return Utils.completeFuture(createMessageSenderFromEntityPathAsync(messagingFactory, entityPath));
8787
}
8888

89+
/**
90+
* Creates a transfer message sender. This sender sends message to destination entity via another entity.
91+
*
92+
* This is mainly to be used when sending messages in a transaction.
93+
* When messages need to be sent across entities in a single transaction, this can be used to ensure
94+
* all the messages land initially in the same entity/partition for local transactions, and then
95+
* let service bus handle transferring the message to the actual destination.
96+
* @param messagingFactory messaging factory (which represents a connection) on which sender needs to be created.
97+
* @param entityPath path of the final destination of the message.
98+
* @param viaEntityPath The initial destination of the message.
99+
* @return IMessageSender instance
100+
* @throws InterruptedException if the current thread was interrupted while waiting
101+
* @throws ServiceBusException if the sender cannot be created
102+
*/
103+
public static IMessageSender createTransferMessageSenderFromEntityPath(MessagingFactory messagingFactory, String entityPath, String viaEntityPath) throws InterruptedException, ServiceBusException
104+
{
105+
return Utils.completeFuture(createTransferMessageSenderFromEntityPathAsync(messagingFactory, entityPath, viaEntityPath));
106+
}
107+
89108
/**
90109
* Create message sender asynchronously from connection string with <a href="https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-sas">Shared Access Signatures</a>
91110
*
@@ -132,7 +151,7 @@ public static CompletableFuture<IMessageSender> createMessageSenderFromEntityPat
132151
public static CompletableFuture<IMessageSender> createMessageSenderFromEntityPathAsync(URI namespaceEndpointURI, String entityPath, ClientSettings clientSettings)
133152
{
134153
Utils.assertNonNull("namespaceEndpointURI", namespaceEndpointURI);
135-
MessageSender sender = new MessageSender(namespaceEndpointURI, entityPath, clientSettings);
154+
MessageSender sender = new MessageSender(namespaceEndpointURI, entityPath, null, clientSettings);
136155
return sender.initializeAsync().thenApply((v) -> sender);
137156
}
138157

@@ -148,6 +167,25 @@ public static CompletableFuture<IMessageSender> createMessageSenderFromEntityPat
148167
return sender.initializeAsync().thenApply((v) -> sender);
149168
}
150169

170+
/**
171+
* Creates a transfer message sender asynchronously. This sender sends message to destination entity via another entity.
172+
*
173+
* This is mainly to be used when sending messages in a transaction.
174+
* When messages need to be sent across entities in a single transaction, this can be used to ensure
175+
* all the messages land initially in the same entity/partition for local transactions, and then
176+
* let service bus handle transferring the message to the actual destination.
177+
* @param messagingFactory messaging factory (which represents a connection) on which sender needs to be created.
178+
* @param entityPath path of the final destination of the message.
179+
* @param viaEntityPath The initial destination of the message.
180+
* @return a CompletableFuture representing the pending creating of IMessageSender instance.
181+
*/
182+
public static CompletableFuture<IMessageSender> createTransferMessageSenderFromEntityPathAsync(MessagingFactory messagingFactory, String entityPath, String viaEntityPath)
183+
{
184+
Utils.assertNonNull("messagingFactory", messagingFactory);
185+
MessageSender sender = new MessageSender(messagingFactory, viaEntityPath, entityPath);
186+
return sender.initializeAsync().thenApply((v) -> sender);
187+
}
188+
151189
/**
152190
* Create {@link IMessageReceiver} in default {@link ReceiveMode#PEEKLOCK} mode from service bus connection string with <a href="https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-sas">Shared Access Signatures</a>
153191
*

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IMessage.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,26 @@ public interface IMessage {
332332
*/
333333
public void setPartitionKey(String partitionKey);
334334

335+
/**
336+
* Gets the partition key for sending a message to a entity via another partitioned transfer entity.
337+
*
338+
* If a message is sent via a transfer queue in the scope of a transaction, this value selects the
339+
* transfer queue partition: This is functionally equivalent to {@link #getPartitionKey()} and ensures that
340+
* messages are kept together and in order as they are transferred.
341+
*
342+
* @return partition key on the via queue.
343+
* @see <a href="https://docs.microsoft.com/azure/service-bus-messaging/service-bus-transactions#transfers-and-send-via">Transfers and Send Via</a>.
344+
*/
345+
public String getViaPartitionKey();
346+
347+
/**
348+
* Sets a via-partition key for sending a message to a destination entity via another partitioned entity
349+
*
350+
* @param viaPartitionKey via-partition key of this message
351+
* @see #getViaPartitionKey()
352+
*/
353+
public void setViaPartitionKey(String viaPartitionKey);
354+
335355
/**
336356
* Gets the name of the queue or subscription that this message was enqueued on, before it was deadlettered.
337357
*

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IMessageSender.java

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -122,15 +122,6 @@ public interface IMessageSender extends IMessageEntityClient {
122122
*/
123123
CompletableFuture<Void> cancelScheduledMessageAsync(long sequenceNumber);
124124

125-
/**
126-
* Cancels the enqueuing of an already sent scheduled message, if it was not already enqueued. This is an asynchronous method returning a CompletableFuture which completes when the message is cancelled.
127-
*
128-
* @param sequenceNumber sequence number of the scheduled message
129-
* @param transaction {@link TransactionContext} which this operation should enlist to.
130-
* @return a CompletableFuture representing the pending cancellation
131-
*/
132-
CompletableFuture<Void> cancelScheduledMessageAsync(long sequenceNumber, TransactionContext transaction);
133-
134125
/**
135126
* Sends a scheduled message to the Azure Service Bus entity this sender is connected to. A scheduled message is enqueued and made available to receivers only at the scheduled enqueue time.
136127
* This method blocks until the message is sent to the entity. Calling this method is equivalent to calling <code>scheduleMessageAsync(message, scheduledEnqueueTimeUtc).get()</code>. For better performance, use async methods.
@@ -165,15 +156,4 @@ public interface IMessageSender extends IMessageEntityClient {
165156
* @throws ServiceBusException if scheduled message couldn't be cancelled
166157
*/
167158
void cancelScheduledMessage(long sequenceNumber) throws InterruptedException, ServiceBusException;
168-
169-
/**
170-
* Cancels the enqueuing of an already sent scheduled message, if it was not already enqueued. This method blocks until the message is sent to the entity. Calling this method is equivalent to calling <code>cancelScheduledMessageAsync(sequenceNumber).get()</code>.
171-
* For better performance, use async methods.
172-
*
173-
* @param sequenceNumber sequence number of the scheduled message
174-
* @param transaction {@link TransactionContext} which this operation should enlist to.
175-
* @throws InterruptedException if the current thread was interrupted while waiting
176-
* @throws ServiceBusException if scheduled message couldn't be cancelled
177-
*/
178-
void cancelScheduledMessage(long sequenceNumber, TransactionContext transaction) throws InterruptedException, ServiceBusException;
179159
}

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/Message.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ final public class Message implements Serializable, IMessage {
5858
private String replyTo;
5959

6060
private String partitionKey;
61+
62+
private String viaPartitionKey;
6163

6264
private String deadLetterSource;
6365

@@ -272,7 +274,17 @@ public String getPartitionKey() {
272274
@Override
273275
public void setPartitionKey(String partitionKey) {
274276
this.partitionKey = partitionKey;
275-
}
277+
}
278+
279+
@Override
280+
public String getViaPartitionKey() {
281+
return this.viaPartitionKey;
282+
}
283+
284+
@Override
285+
public void setViaPartitionKey(String partitionKey) {
286+
this.viaPartitionKey = partitionKey;
287+
}
276288

277289
@Override
278290
public String getDeadLetterSource() {

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageAndSessionPump.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public void registerMessageHandler(IMessageHandler handler) throws InterruptedEx
6161

6262
@Override
6363
public void registerMessageHandler(IMessageHandler handler, MessageHandlerOptions handlerOptions) throws InterruptedException, ServiceBusException {
64-
TRACE_LOGGER.info("Registering message handler on entity '{}' with '{}'", this.entityPath, handlerOptions);
64+
TRACE_LOGGER.info("Registering message handler on entity '{}' with '{}'.", this.entityPath, handlerOptions);
6565
this.setHandlerRegistered();
6666
this.messageHandler = handler;
6767
this.messageHandlerOptions = handlerOptions;

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageConverter.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,11 @@ public static org.apache.qpid.proton.message.Message convertBrokeredMessageToAmq
5959
{
6060
messageAnnotationsMap.put(Symbol.valueOf(ClientConstants.PARTITIONKEYNAME), brokeredMessage.getPartitionKey());
6161
}
62+
63+
if(!StringUtil.isNullOrEmpty(brokeredMessage.getViaPartitionKey()))
64+
{
65+
messageAnnotationsMap.put(Symbol.valueOf(ClientConstants.VIAPARTITIONKEYNAME), brokeredMessage.getViaPartitionKey());
66+
}
6267

6368
amqpMessage.setMessageAnnotations(new MessageAnnotations(messageAnnotationsMap));
6469

@@ -169,6 +174,9 @@ public static Message convertAmqpMessageToBrokeredMessage(org.apache.qpid.proton
169174
case ClientConstants.PARTITIONKEYNAME:
170175
brokeredMessage.setPartitionKey((String)entry.getValue());
171176
break;
177+
case ClientConstants.VIAPARTITIONKEYNAME:
178+
brokeredMessage.setViaPartitionKey((String)entry.getValue());
179+
break;
172180
case ClientConstants.DEADLETTERSOURCENAME:
173181
brokeredMessage.setDeadLetterSource((String)entry.getValue());
174182
break;

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageSender.java

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ final class MessageSender extends InitializableEntity implements IMessageSender
2222
private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(MessageSender.class);
2323
private boolean ownsMessagingFactory;
2424
private String entityPath = null;
25+
private String transferDestinationPath = null;
2526
private MessagingFactory messagingFactory = null;
2627
private CoreMessageSender internalSender = null;
2728
private boolean isInitialized = false;
@@ -32,24 +33,30 @@ private MessageSender() {
3233
super(StringUtil.getShortRandomString());
3334
}
3435

35-
MessageSender(URI namespaceEndpointURI, String entityPath, ClientSettings clientSettings) {
36+
MessageSender(URI namespaceEndpointURI, String entityPath, String transferDestinationPath, ClientSettings clientSettings) {
3637
this();
3738

3839
this.namespaceEndpointURI = namespaceEndpointURI;
40+
this.transferDestinationPath = transferDestinationPath;
3941
this.entityPath = entityPath;
4042
this.clientSettings = clientSettings;
4143
this.ownsMessagingFactory = true;
4244
}
4345

4446
MessageSender(MessagingFactory messagingFactory, String entityPath) {
45-
this(messagingFactory, entityPath, false);
47+
this(messagingFactory, entityPath, null, false);
4648
}
4749

48-
private MessageSender(MessagingFactory messagingFactory, String entityPath, boolean ownsMessagingFactory) {
50+
MessageSender(MessagingFactory messagingFactory, String entityPath, String transferDestinationPath) {
51+
this(messagingFactory, entityPath, transferDestinationPath, false);
52+
}
53+
54+
private MessageSender(MessagingFactory messagingFactory, String entityPath, String transferDestinationPath, boolean ownsMessagingFactory) {
4955
this();
5056

5157
this.messagingFactory = messagingFactory;
5258
this.entityPath = entityPath;
59+
this.transferDestinationPath = transferDestinationPath;
5360
this.ownsMessagingFactory = ownsMessagingFactory;
5461
}
5562

@@ -77,7 +84,7 @@ synchronized CompletableFuture<Void> initializeAsync() {
7784
return factoryFuture.thenComposeAsync((v) ->
7885
{
7986
TRACE_LOGGER.info("Creating MessageSender to entity '{}'", this.entityPath);
80-
CompletableFuture<CoreMessageSender> senderFuture = CoreMessageSender.create(this.messagingFactory, StringUtil.getShortRandomString(), this.entityPath);
87+
CompletableFuture<CoreMessageSender> senderFuture = CoreMessageSender.create(this.messagingFactory, StringUtil.getShortRandomString(), this.entityPath, this.transferDestinationPath);
8188
CompletableFuture<Void> postSenderCreationFuture = new CompletableFuture<Void>();
8289
senderFuture.handleAsync((s, coreSenderCreationEx) -> {
8390
if (coreSenderCreationEx == null) {
@@ -195,14 +202,8 @@ public CompletableFuture<Long> scheduleMessageAsync(IMessage message, Instant sc
195202

196203
@Override
197204
public CompletableFuture<Void> cancelScheduledMessageAsync(long sequenceNumber) {
198-
return this.cancelScheduledMessageAsync(sequenceNumber, TransactionContext.NULL_TXN);
199-
}
200-
201-
@Override
202-
public CompletableFuture<Void> cancelScheduledMessageAsync(long sequenceNumber, TransactionContext transaction) {
203205
return this.internalSender.cancelScheduledMessageAsync(
204206
new Long[]{sequenceNumber},
205-
transaction,
206207
this.messagingFactory.getClientSetttings().getOperationTimeout());
207208
}
208209

@@ -221,11 +222,6 @@ public void cancelScheduledMessage(long sequenceNumber) throws InterruptedExcept
221222
Utils.completeFuture(this.cancelScheduledMessageAsync(sequenceNumber));
222223
}
223224

224-
@Override
225-
public void cancelScheduledMessage(long sequenceNumber, TransactionContext transaction) throws InterruptedException, ServiceBusException {
226-
Utils.completeFuture(this.cancelScheduledMessageAsync(sequenceNumber, transaction));
227-
}
228-
229225
MessagingFactory getMessagingFactory() {
230226
return this.messagingFactory;
231227
}

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/QueueClient.java

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -207,14 +207,9 @@ public CompletableFuture<Long> scheduleMessageAsync(IMessage message, Instant sc
207207

208208
@Override
209209
public CompletableFuture<Void> cancelScheduledMessageAsync(long sequenceNumber) {
210-
return this.cancelScheduledMessageAsync(sequenceNumber, TransactionContext.NULL_TXN);
211-
}
212-
213-
@Override
214-
public CompletableFuture<Void> cancelScheduledMessageAsync(long sequenceNumber, TransactionContext transaction) {
215210
return this.createSenderAsync().thenComposeAsync((v) ->
216211
{
217-
return this.sender.cancelScheduledMessageAsync(sequenceNumber, transaction);
212+
return this.sender.cancelScheduledMessageAsync(sequenceNumber);
218213
});
219214
}
220215

@@ -233,11 +228,6 @@ public void cancelScheduledMessage(long sequenceNumber) throws InterruptedExcept
233228
Utils.completeFuture(this.cancelScheduledMessageAsync(sequenceNumber));
234229
}
235230

236-
@Override
237-
public void cancelScheduledMessage(long sequenceNumber, TransactionContext transaction) throws InterruptedException, ServiceBusException {
238-
Utils.completeFuture(this.cancelScheduledMessageAsync(sequenceNumber, transaction));
239-
}
240-
241231
@Override
242232
public String getEntityPath() {
243233
return this.queuePath;

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/TopicClient.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -115,11 +115,6 @@ public CompletableFuture<Void> cancelScheduledMessageAsync(long sequenceNumber)
115115
return this.sender.cancelScheduledMessageAsync(sequenceNumber);
116116
}
117117

118-
@Override
119-
public CompletableFuture<Void> cancelScheduledMessageAsync(long sequenceNumber, TransactionContext transaction) {
120-
return this.sender.cancelScheduledMessageAsync(sequenceNumber, transaction);
121-
}
122-
123118
@Override
124119
public long scheduleMessage(IMessage message, Instant scheduledEnqueueTimeUtc) throws InterruptedException, ServiceBusException {
125120
return this.sender.scheduleMessage(message, scheduledEnqueueTimeUtc);
@@ -135,11 +130,6 @@ public void cancelScheduledMessage(long sequenceNumber) throws InterruptedExcept
135130
this.sender.cancelScheduledMessage(sequenceNumber);
136131
}
137132

138-
@Override
139-
public void cancelScheduledMessage(long sequenceNumber, TransactionContext transaction) throws InterruptedException, ServiceBusException {
140-
this.sender.cancelScheduledMessage(sequenceNumber, transaction);
141-
}
142-
143133
@Override
144134
public String getEntityPath() {
145135
return this.sender.getEntityPath();

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ClientConstants.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ private ClientConstants() { }
3737
//public static final String LOCKTOKENNAME = "x-opt-lock-token";
3838
public static final String LOCKEDUNTILNAME = "x-opt-locked-until";
3939
public static final String PARTITIONKEYNAME = "x-opt-partition-key";
40+
public static final String VIAPARTITIONKEYNAME = "x-opt-via-partition-key";
4041
public static final String DEADLETTERSOURCENAME = "x-opt-deadletter-source";
4142
public static final UUID ZEROLOCKTOKEN = new UUID(0l, 0l);
4243

@@ -52,6 +53,7 @@ private ClientConstants() { }
5253
public final static Symbol PUBLISHER_REVOKED_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":publisher-revoked");
5354
public final static Symbol TIMEOUT_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":timeout");
5455
public final static Symbol LINK_TIMEOUT_PROPERTY = Symbol.getSymbol(AmqpConstants.VENDOR + ":timeout");
56+
public final static Symbol LINK_TRANSFER_DESTINATION_PROPERTY = Symbol.getSymbol(AmqpConstants.VENDOR + ":transfer-destination-address");
5557
public final static Symbol LINK_PEEKMODE_PROPERTY = Symbol.getSymbol(AmqpConstants.VENDOR + ":peek-mode");
5658
public final static Symbol TRACKING_ID_PROPERTY = Symbol.getSymbol(AmqpConstants.VENDOR + ":tracking-id");
5759
public static final Symbol DEADLETTERNAME = Symbol.valueOf(AmqpConstants.VENDOR + ":dead-letter");
@@ -118,6 +120,7 @@ private ClientConstants() { }
118120
public static final String REQUEST_RESPONSE_MESSAGE_ID = "message-id";
119121
public static final String REQUEST_RESPONSE_SESSION_ID = "session-id";
120122
public static final String REQUEST_RESPONSE_PARTITION_KEY = "partition-key";
123+
public static final String REQUEST_RESPONSE_VIA_PARTITION_KEY = "via-partition-key";
121124
public static final String REQUEST_RESPONSE_FROM_SEQUENCE_NUMER = "from-sequence-number";
122125
public static final String REQUEST_RESPONSE_MESSAGE_COUNT = "message-count";
123126
public static final String REQUEST_RESPONSE_STATUS_CODE = "statusCode";

0 commit comments

Comments
 (0)