From 59dd60aac71e3f417233b22bb02be8e32045cf96 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=93mar=20Kjartan=20Yasin?= Date: Thu, 6 Feb 2025 12:57:37 -0800 Subject: [PATCH] [improve][client] Add newMessage with schema and transactions Pulsar Client allows callers to create messages with a schema or a transaction, but not both. This commit adds a new method in the producer that allows callers to create a message with both a schema and transaction. --- .../broker/transaction/TransactionTest.java | 37 +++++++++++++++++++ .../apache/pulsar/client/api/Producer.java | 15 ++++++++ .../pulsar/client/impl/ProducerBase.java | 9 +++++ 3 files changed, 61 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index 35c9048ebb554..43d3f61279837 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -35,6 +35,7 @@ import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import io.netty.buffer.Unpooled; @@ -484,6 +485,42 @@ public void testAsyncSendOrAckForSingleFuture() throws Exception { }); } + @Test + public void testSendAndAckWithSchema() throws Exception { + String topic = NAMESPACE1 + "/testAsyncSendAndAckWithSchema"; + String topicName = "subscription"; + getPulsarServiceList().get(0).getConfig().setBrokerDeduplicationEnabled(false); + + @Cleanup + Producer producer = pulsarClient.newProducer() + .topic(topic) + .producerName("producer") + .sendTimeout(0, TimeUnit.SECONDS) + .create(); + + @Cleanup + Consumer consumer = pulsarClient.newConsumer() + .topic(topic) + .subscriptionType(SubscriptionType.Exclusive) + .subscriptionName(topicName) + .subscribe(); + + Transaction txn = pulsarClient.newTransaction() + .withTransactionTimeout(10, TimeUnit.SECONDS) + .build() + .get(); + + MessageId messageId = producer.newMessage(Schema.STRING, txn) + .value("testSendAndAckWithSchema") + .send(); + + txn.commit().get(); + + Message message = consumer.receive(); + assertEquals(new String(message.getValue(), StandardCharsets.UTF_8), "testSendAndAckWithSchema"); + assertNotNull(message.getSchemaVersion()); + } + @Test public void testGetTxnID() throws Exception { Transaction transaction = pulsarClient.newTransaction() diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Producer.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Producer.java index 4cf3d6b81642b..87ce8f88dbb78 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Producer.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Producer.java @@ -132,6 +132,21 @@ public interface Producer extends Closeable { * @since 2.7.0 */ TypedMessageBuilder newMessage(Transaction txn); + + /** + * Create a new message builder with transaction and schema, not required same parameterized type with the + * producer. + * + *

After the transaction commit, it will be made visible to consumer. + * + *

After the transaction abort, it will never be visible to consumer. + * + * @return a typed message builder that can be used to construct the message to be sent through this producer + * @see #newMessage() + */ + TypedMessageBuilder newMessage(Schema schema, + Transaction txn); + /** * Get the last sequence id that was published by this producer. * diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java index 12e380fdd510c..b7085d28ee2a0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java @@ -79,6 +79,7 @@ public TypedMessageBuilder newMessage() { return new TypedMessageBuilderImpl<>(this, schema); } + @Override public TypedMessageBuilder newMessage(Schema schema) { checkArgument(schema != null); return new TypedMessageBuilderImpl<>(this, schema); @@ -92,6 +93,14 @@ public TypedMessageBuilder newMessage(Transaction txn) { return new TypedMessageBuilderImpl<>(this, schema, (TransactionImpl) txn); } + @Override + public TypedMessageBuilder newMessage(Schema schema, + Transaction txn) { + checkArgument(txn instanceof TransactionImpl); + checkArgument(schema != null); + return new TypedMessageBuilderImpl<>(this, schema, (TransactionImpl) txn); + } + abstract CompletableFuture internalSendAsync(Message message); abstract CompletableFuture internalSendWithTxnAsync(Message message, Transaction txn);