Skip to content

Commit 59dd60a

Browse files
author
Ómar Kjartan Yasin
committed
[improve][client] Add newMessage with schema and transactions
Pulsar Client allows callers to create messages with a schema or a transaction, but not both. This commit adds a new method in the producer that allows callers to create a message with both a schema and transaction.
1 parent cdab2d6 commit 59dd60a

File tree

3 files changed

+61
-0
lines changed

3 files changed

+61
-0
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import static org.mockito.Mockito.when;
3636
import static org.testng.Assert.assertEquals;
3737
import static org.testng.Assert.assertFalse;
38+
import static org.testng.Assert.assertNotNull;
3839
import static org.testng.Assert.assertTrue;
3940
import static org.testng.Assert.fail;
4041
import io.netty.buffer.Unpooled;
@@ -484,6 +485,42 @@ public void testAsyncSendOrAckForSingleFuture() throws Exception {
484485
});
485486
}
486487

488+
@Test
489+
public void testSendAndAckWithSchema() throws Exception {
490+
String topic = NAMESPACE1 + "/testAsyncSendAndAckWithSchema";
491+
String topicName = "subscription";
492+
getPulsarServiceList().get(0).getConfig().setBrokerDeduplicationEnabled(false);
493+
494+
@Cleanup
495+
Producer<byte[]> producer = pulsarClient.newProducer()
496+
.topic(topic)
497+
.producerName("producer")
498+
.sendTimeout(0, TimeUnit.SECONDS)
499+
.create();
500+
501+
@Cleanup
502+
Consumer<byte[]> consumer = pulsarClient.newConsumer()
503+
.topic(topic)
504+
.subscriptionType(SubscriptionType.Exclusive)
505+
.subscriptionName(topicName)
506+
.subscribe();
507+
508+
Transaction txn = pulsarClient.newTransaction()
509+
.withTransactionTimeout(10, TimeUnit.SECONDS)
510+
.build()
511+
.get();
512+
513+
MessageId messageId = producer.newMessage(Schema.STRING, txn)
514+
.value("testSendAndAckWithSchema")
515+
.send();
516+
517+
txn.commit().get();
518+
519+
Message<byte[]> message = consumer.receive();
520+
assertEquals(new String(message.getValue(), StandardCharsets.UTF_8), "testSendAndAckWithSchema");
521+
assertNotNull(message.getSchemaVersion());
522+
}
523+
487524
@Test
488525
public void testGetTxnID() throws Exception {
489526
Transaction transaction = pulsarClient.newTransaction()

pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Producer.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,21 @@ public interface Producer<T> extends Closeable {
132132
* @since 2.7.0
133133
*/
134134
TypedMessageBuilder<T> newMessage(Transaction txn);
135+
136+
/**
137+
* Create a new message builder with transaction and schema, not required same parameterized type with the
138+
* producer.
139+
*
140+
* <p>After the transaction commit, it will be made visible to consumer.
141+
*
142+
* <p>After the transaction abort, it will never be visible to consumer.
143+
*
144+
* @return a typed message builder that can be used to construct the message to be sent through this producer
145+
* @see #newMessage()
146+
*/
147+
<V> TypedMessageBuilder<V> newMessage(Schema<V> schema,
148+
Transaction txn);
149+
135150
/**
136151
* Get the last sequence id that was published by this producer.
137152
*

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ public TypedMessageBuilder<T> newMessage() {
7979
return new TypedMessageBuilderImpl<>(this, schema);
8080
}
8181

82+
@Override
8283
public <V> TypedMessageBuilder<V> newMessage(Schema<V> schema) {
8384
checkArgument(schema != null);
8485
return new TypedMessageBuilderImpl<>(this, schema);
@@ -92,6 +93,14 @@ public TypedMessageBuilder<T> newMessage(Transaction txn) {
9293
return new TypedMessageBuilderImpl<>(this, schema, (TransactionImpl) txn);
9394
}
9495

96+
@Override
97+
public <V> TypedMessageBuilder<V> newMessage(Schema<V> schema,
98+
Transaction txn) {
99+
checkArgument(txn instanceof TransactionImpl);
100+
checkArgument(schema != null);
101+
return new TypedMessageBuilderImpl<>(this, schema, (TransactionImpl) txn);
102+
}
103+
95104
abstract CompletableFuture<MessageId> internalSendAsync(Message<?> message);
96105

97106
abstract CompletableFuture<MessageId> internalSendWithTxnAsync(Message<?> message, Transaction txn);

0 commit comments

Comments
 (0)