Description
I'm trying to send message to topic in a transaction. It worked for 100 message if I try to publish more than 100 message it is not able publish. Find my code ref
`ServiceBusClientBuilder builder = new ServiceBusClientBuilder() .enableCrossEntityTransactions()
.connectionString(connectionString);
ServiceBusProcessorClient processorClient = builder.processor().topicName(announcementTopicName)
.subscriptionName(announcementSubName).processMessage(context -> {
ServiceBusSenderClient sender = builder.sender().topicName("test_topic").buildClient();
ServiceBusTransactionContext txnContext = sender.createTransaction();
context.complete(new CompleteOptions().setTransactionContext(txnContext));
try {
ServiceBusReceivedMessage message = context.getMessage();
List<ServiceBusMessage> messagesList = new ArrayList<>();
Map<String, List<ServiceBusMessage>> msgListToSendToAsb = new HashMap<>();
for (int i = 1; i <= 101; i++) {
ServiceBusMessage msg = new ServiceBusMessage(message.getBody());
messagesList.add(msg);
msgListToSendToAsb.put("test_topic", messagesList);
}
for (Map.Entry<String, List<ServiceBusMessage>> entry : msgListToSendToAsb.entrySet()) {
ServiceBusMessageBatch currentBatch = sender.createMessageBatch();
for (ServiceBusMessage msg : entry.getValue()) {
if (currentBatch.tryAddMessage(msg)) {
continue;
}
sender.sendMessages(currentBatch, txnContext);
currentBatch = sender.createMessageBatch();
if (currentBatch.tryAddMessage(msg)) {
log.info("Exception");
}
}
if (currentBatch.getCount() > 0)
sender.sendMessages(currentBatch, txnContext);
}
sender.commitTransaction(txnContext);
} catch (ServiceBusException e) {
log.info("Exception while consuming message : " + e);
sender.rollbackTransaction(txnContext);
} catch (Exception e) {
sender.rollbackTransaction(txnContext);
log.info("Exception while consuming message : " + e);
}
}).processError(context -> processError(context, new CountDownLatch(1))).maxConcurrentCalls(5)
.disableAutoComplete()
.prefetchCount(1)
.disableAutoComplete()
.receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
.maxAutoLockRenewDuration(Duration.ofMinutes(3))
.buildProcessorClient();`
Exception
I have notice below exception
com.fmr.mom.azure.service.AsbMessageConsumerAsyncClient.lambda$4 - Exception while consuming message : com.azure.core.amqp.exception.AmqpException: errorCondition[amqp:transaction:unknown-id]. description[Transaction is not declared Reference:4a544285-f299-4bc5-a125-6fabaab37e27, TrackingId:a62fe61d-e819-421e-b64c-cdcbd10892e7_G4S3, SystemTracker:gtm, Timestamp:2023-05-29T12:44:42], errorContext[NAMESPACE: sps-grk-ap142540-eastus2-messagebroker-dit.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: coordinator, REFERENCE_ID: coordinator, LINK_CREDIT: 998]
Version
- Java 11
- azure-messaging-servicebus (7.14.0)
- Spring Boot (2.2.9)