Skip to content

Commit 536dce3

Browse files
Omar YasinÓmar Kjartan Yasin
andauthored
[improve][client] PIP-407 Add newMessage with schema and transactions (#23942)
Co-authored-by: Ómar Kjartan Yasin <[email protected]>
1 parent 669ab61 commit 536dce3

File tree

3 files changed

+61
-0
lines changed

3 files changed

+61
-0
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import static org.mockito.Mockito.when;
3636
import static org.testng.Assert.assertEquals;
3737
import static org.testng.Assert.assertFalse;
38+
import static org.testng.Assert.assertNotNull;
3839
import static org.testng.Assert.assertTrue;
3940
import static org.testng.Assert.fail;
4041
import io.netty.buffer.Unpooled;
@@ -488,6 +489,42 @@ public void testAsyncSendOrAckForSingleFuture() throws Exception {
488489
});
489490
}
490491

492+
@Test
493+
public void testSendAndAckWithSchema() throws Exception {
494+
String topic = NAMESPACE1 + "/testAsyncSendAndAckWithSchema";
495+
String topicName = "subscription";
496+
getPulsarServiceList().get(0).getConfig().setBrokerDeduplicationEnabled(false);
497+
498+
@Cleanup
499+
Producer<byte[]> producer = pulsarClient.newProducer()
500+
.topic(topic)
501+
.producerName("producer")
502+
.sendTimeout(0, TimeUnit.SECONDS)
503+
.create();
504+
505+
@Cleanup
506+
Consumer<byte[]> consumer = pulsarClient.newConsumer()
507+
.topic(topic)
508+
.subscriptionType(SubscriptionType.Exclusive)
509+
.subscriptionName(topicName)
510+
.subscribe();
511+
512+
Transaction txn = pulsarClient.newTransaction()
513+
.withTransactionTimeout(10, TimeUnit.SECONDS)
514+
.build()
515+
.get();
516+
517+
MessageId messageId = producer.newMessage(Schema.STRING, txn)
518+
.value("testSendAndAckWithSchema")
519+
.send();
520+
521+
txn.commit().get();
522+
523+
Message<byte[]> message = consumer.receive();
524+
assertEquals(new String(message.getValue(), StandardCharsets.UTF_8), "testSendAndAckWithSchema");
525+
assertNotNull(message.getSchemaVersion());
526+
}
527+
491528
@Test
492529
public void testGetTxnID() throws Exception {
493530
Transaction transaction = pulsarClient.newTransaction()

pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Producer.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,21 @@ public interface Producer<T> extends Closeable {
191191
* @since 2.7.0
192192
*/
193193
TypedMessageBuilder<T> newMessage(Transaction txn);
194+
195+
/**
196+
* Create a new message builder with transaction and schema, not required same parameterized type with the
197+
* producer.
198+
*
199+
* <p>After the transaction commit, it will be made visible to consumer.
200+
*
201+
* <p>After the transaction abort, it will never be visible to consumer.
202+
*
203+
* @return a typed message builder that can be used to construct the message to be sent through this producer
204+
* @see #newMessage()
205+
*/
206+
<V> TypedMessageBuilder<V> newMessage(Schema<V> schema,
207+
Transaction txn);
208+
194209
/**
195210
* Get the last sequence id that was published by this producer.
196211
*

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ public TypedMessageBuilder<T> newMessage() {
7979
return new TypedMessageBuilderImpl<>(this, schema);
8080
}
8181

82+
@Override
8283
public <V> TypedMessageBuilder<V> newMessage(Schema<V> schema) {
8384
checkArgument(schema != null);
8485
return new TypedMessageBuilderImpl<>(this, schema);
@@ -92,6 +93,14 @@ public TypedMessageBuilder<T> newMessage(Transaction txn) {
9293
return new TypedMessageBuilderImpl<>(this, schema, (TransactionImpl) txn);
9394
}
9495

96+
@Override
97+
public <V> TypedMessageBuilder<V> newMessage(Schema<V> schema,
98+
Transaction txn) {
99+
checkArgument(txn instanceof TransactionImpl);
100+
checkArgument(schema != null);
101+
return new TypedMessageBuilderImpl<>(this, schema, (TransactionImpl) txn);
102+
}
103+
95104
abstract CompletableFuture<MessageId> internalSendAsync(Message<?> message);
96105

97106
abstract CompletableFuture<MessageId> internalSendWithTxnAsync(Message<?> message, Transaction txn);

0 commit comments

Comments
 (0)