Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import io.netty.buffer.Unpooled;
Expand Down Expand Up @@ -484,6 +485,42 @@ public void testAsyncSendOrAckForSingleFuture() throws Exception {
});
}

@Test
public void testSendAndAckWithSchema() throws Exception {
String topic = NAMESPACE1 + "/testAsyncSendAndAckWithSchema";
String topicName = "subscription";
getPulsarServiceList().get(0).getConfig().setBrokerDeduplicationEnabled(false);

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.producerName("producer")
.sendTimeout(0, TimeUnit.SECONDS)
.create();

@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionType(SubscriptionType.Exclusive)
.subscriptionName(topicName)
.subscribe();

Transaction txn = pulsarClient.newTransaction()
.withTransactionTimeout(10, TimeUnit.SECONDS)
.build()
.get();

MessageId messageId = producer.newMessage(Schema.STRING, txn)
.value("testSendAndAckWithSchema")
.send();

txn.commit().get();

Message<byte[]> message = consumer.receive();
assertEquals(new String(message.getValue(), StandardCharsets.UTF_8), "testSendAndAckWithSchema");
assertNotNull(message.getSchemaVersion());
}

@Test
public void testGetTxnID() throws Exception {
Transaction transaction = pulsarClient.newTransaction()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,21 @@ public interface Producer<T> extends Closeable {
* @since 2.7.0
*/
TypedMessageBuilder<T> newMessage(Transaction txn);

/**
* Create a new message builder with transaction and schema, not required same parameterized type with the
* producer.
*
* <p>After the transaction commit, it will be made visible to consumer.
*
* <p>After the transaction abort, it will never be visible to consumer.
*
* @return a typed message builder that can be used to construct the message to be sent through this producer
* @see #newMessage()
*/
<V> TypedMessageBuilder<V> newMessage(Schema<V> schema,
Transaction txn);

/**
* Get the last sequence id that was published by this producer.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public TypedMessageBuilder<T> newMessage() {
return new TypedMessageBuilderImpl<>(this, schema);
}

@Override
public <V> TypedMessageBuilder<V> newMessage(Schema<V> schema) {
checkArgument(schema != null);
return new TypedMessageBuilderImpl<>(this, schema);
Expand All @@ -92,6 +93,14 @@ public TypedMessageBuilder<T> newMessage(Transaction txn) {
return new TypedMessageBuilderImpl<>(this, schema, (TransactionImpl) txn);
}

@Override
public <V> TypedMessageBuilder<V> newMessage(Schema<V> schema,
Transaction txn) {
checkArgument(txn instanceof TransactionImpl);
checkArgument(schema != null);
return new TypedMessageBuilderImpl<>(this, schema, (TransactionImpl) txn);
}

abstract CompletableFuture<MessageId> internalSendAsync(Message<?> message);

abstract CompletableFuture<MessageId> internalSendWithTxnAsync(Message<?> message, Transaction txn);
Expand Down
Loading