Skip to content

Commit 1c2d181

Browse files
committed
[improve][client] Pulsar client supports multi-topic messageId deserialization to ack messages
1 parent 1545396 commit 1c2d181

File tree

6 files changed

+108
-4
lines changed

6 files changed

+108
-4
lines changed

Diff for: pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java

+68
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,13 @@
2424
import static org.mockito.Mockito.times;
2525
import static org.mockito.Mockito.verify;
2626
import static org.testng.Assert.assertFalse;
27+
import static org.testng.Assert.assertNull;
2728
import static org.testng.Assert.assertEquals;
2829
import static org.testng.Assert.assertTrue;
2930

3031
import com.google.common.collect.Lists;
32+
import com.google.common.collect.Range;
33+
3134
import java.util.ArrayList;
3235
import java.util.Collections;
3336
import java.util.HashMap;
@@ -42,6 +45,9 @@
4245
import java.util.stream.Collectors;
4346
import java.util.stream.IntStream;
4447
import lombok.Cleanup;
48+
49+
import org.apache.bookkeeper.mledger.impl.PositionImpl;
50+
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
4551
import org.apache.pulsar.client.admin.PulsarAdminException;
4652
import org.apache.pulsar.client.impl.ClientBuilderImpl;
4753
import org.apache.pulsar.client.impl.ConsumerImpl;
@@ -351,4 +357,66 @@ public int choosePartition(Message<?> msg, TopicMetadata metadata) {
351357
}
352358
consumer.close();
353359
}
360+
361+
/**
362+
* It tests acking of messageId created from byte[] and validates client acks messages successfully.
363+
* @throws Exception
364+
*/
365+
@Test
366+
public void testMultiTopicAckWithByteMessageId() throws Exception {
367+
String topicName = newTopicName();
368+
int numPartitions = 2;
369+
int numMessages = 100000;
370+
admin.topics().createPartitionedTopic(topicName, numPartitions);
371+
372+
Producer<Long>[] producers = new Producer[numPartitions];
373+
374+
for (int i = 0; i < numPartitions; i++) {
375+
producers[i] = pulsarClient.newProducer(Schema.INT64)
376+
// produce to each partition directly so that order can be maintained in sending
377+
.topic(topicName + "-partition-" + i).enableBatching(true).maxPendingMessages(30000)
378+
.maxPendingMessagesAcrossPartitions(60000).batchingMaxMessages(10000)
379+
.batchingMaxPublishDelay(5, TimeUnit.SECONDS).batchingMaxBytes(4 * 1024 * 1024)
380+
.blockIfQueueFull(true).create();
381+
}
382+
383+
@Cleanup
384+
Consumer<Long> consumer = pulsarClient.newConsumer(Schema.INT64)
385+
// consume on the partitioned topic
386+
.topic(topicName).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
387+
.receiverQueueSize(numMessages).subscriptionName(methodName).subscribe();
388+
389+
// produce sequence numbers to each partition topic
390+
long sequenceNumber = 1L;
391+
for (int i = 0; i < numMessages; i++) {
392+
for (Producer<Long> producer : producers) {
393+
producer.newMessage().value(sequenceNumber).sendAsync();
394+
}
395+
sequenceNumber++;
396+
}
397+
for (Producer<Long> producer : producers) {
398+
producer.flush();
399+
producer.close();
400+
}
401+
402+
// receive and validate sequences in the partitioned topic
403+
Map<String, AtomicLong> receivedSequences = new HashMap<>();
404+
int receivedCount = 0;
405+
while (receivedCount < numPartitions * numMessages) {
406+
Message<Long> message = consumer.receiveAsync().get(5, TimeUnit.SECONDS);
407+
byte[] idByte = message.getMessageId().toByteArray();
408+
MessageId id = MessageId.fromByteArray(idByte);
409+
consumer.acknowledge(id);
410+
receivedCount++;
411+
AtomicLong receivedSequenceCounter = receivedSequences.computeIfAbsent(message.getTopicName(),
412+
k -> new AtomicLong(1L));
413+
Assert.assertEquals(message.getValue().longValue(), receivedSequenceCounter.getAndIncrement());
414+
}
415+
Assert.assertEquals(numPartitions * numMessages, receivedCount);
416+
consumer.close();
417+
418+
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName + "-partition-0", false).get().get();
419+
Range<PositionImpl> range = topic.getManagedLedger().getCursors().iterator().next().getLastIndividualDeletedRange();
420+
assertNull(range);
421+
}
354422
}

Diff for: pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicMessageId.java

+4
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,10 @@ public Impl(String topic, MessageId messageId) {
5959
this.topic = topic;
6060
this.messageId = (MessageIdAdv) messageId;
6161
}
62+
63+
protected MessageId getMessageId() {
64+
return messageId;
65+
}
6266

6367
@Override
6468
public byte[] toByteArray() {

Diff for: pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java

+5
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,11 @@ public byte[] toByteArray() {
7878
return toByteArray(batchIndex, batchSize);
7979
}
8080

81+
@Override
82+
protected byte[] toByteArray(String topic) {
83+
return toByteArray(batchIndex, batchSize, topic);
84+
}
85+
8186
@Deprecated
8287
public boolean ackIndividual() {
8388
return MessageIdAdvUtils.acknowledge(this, true);

Diff for: pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java

+19-2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.netty.util.concurrent.FastThreadLocal;
2424
import java.io.IOException;
2525
import java.util.Objects;
26+
import org.apache.commons.lang3.StringUtils;
2627
import org.apache.pulsar.client.api.MessageId;
2728
import org.apache.pulsar.client.api.MessageIdAdv;
2829
import org.apache.pulsar.common.api.proto.MessageIdData;
@@ -93,7 +94,7 @@ public static MessageId fromByteArray(byte[] data) throws IOException {
9394
throw new IOException(e);
9495
}
9596

96-
MessageIdImpl messageId;
97+
MessageId messageId;
9798
if (idData.hasBatchIndex()) {
9899
if (idData.hasBatchSize()) {
99100
messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(),
@@ -112,6 +113,11 @@ public static MessageId fromByteArray(byte[] data) throws IOException {
112113
messageId = new MessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition());
113114
}
114115

116+
if (idData.hasTopicName()) {
117+
String topicName = idData.getTopicName();
118+
messageId = new TopicMessageIdImpl(topicName, topicName, messageId);
119+
}
120+
115121
return messageId;
116122
}
117123

@@ -174,7 +180,18 @@ protected MessageIdData writeMessageIdData(MessageIdData msgId, int batchIndex,
174180

175181
// batchIndex is -1 if message is non-batched message and has the batchIndex for a batch message
176182
protected byte[] toByteArray(int batchIndex, int batchSize) {
183+
return toByteArray(batchIndex, batchSize, null);
184+
}
185+
186+
protected byte[] toByteArray(String topic) {
187+
return toByteArray(-1, 0, topic);
188+
}
189+
190+
protected byte[] toByteArray(int batchIndex, int batchSize, String topicName) {
177191
MessageIdData msgId = writeMessageIdData(null, batchIndex, batchSize);
192+
if (StringUtils.isNotBlank(topicName)) {
193+
msgId.setTopicName(topicName);
194+
}
178195

179196
int size = msgId.getSerializedSize();
180197
ByteBuf serialized = Unpooled.buffer(size, size);
@@ -186,6 +203,6 @@ protected byte[] toByteArray(int batchIndex, int batchSize) {
186203
@Override
187204
public byte[] toByteArray() {
188205
// there is no message batch so we pass -1
189-
return toByteArray(-1, 0);
206+
return toByteArray(-1, 0, null);
190207
}
191208
}

Diff for: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -62,4 +62,14 @@ public boolean equals(Object obj) {
6262
public int hashCode() {
6363
return super.hashCode();
6464
}
65-
}
65+
66+
@Override
67+
public byte[] toByteArray() {
68+
MessageId id = getMessageId();
69+
if (id instanceof MessageIdImpl) {
70+
return ((MessageIdImpl) id).toByteArray(-1, 0, getOwnerTopic());
71+
} else {
72+
return id.toByteArray();
73+
}
74+
}
75+
}

Diff for: pulsar-common/src/main/proto/PulsarApi.proto

+1-1
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,9 @@ message MessageIdData {
6262
optional int32 batch_index = 4 [default = -1];
6363
repeated int64 ack_set = 5;
6464
optional int32 batch_size = 6;
65-
6665
// For the chunk message id, we need to specify the first chunk message id.
6766
optional MessageIdData first_chunk_message_id = 7;
67+
optional string topicName = 8;
6868
}
6969

7070
message KeyValue {

0 commit comments

Comments
 (0)