Skip to content

[improve][client] Support multi-topic messageId deserialization to ack messages #19944

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

import com.google.common.collect.Lists;
import com.google.common.collect.Range;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -42,6 +45,9 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import lombok.Cleanup;

import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.ConsumerImpl;
Expand Down Expand Up @@ -351,4 +357,66 @@ public int choosePartition(Message<?> msg, TopicMetadata metadata) {
}
consumer.close();
}

/**
* It tests acking of messageId created from byte[] and validates client acks messages successfully.
* @throws Exception
*/
@Test
public void testMultiTopicAckWithByteMessageId() throws Exception {
String topicName = newTopicName();
int numPartitions = 2;
int numMessages = 100000;
admin.topics().createPartitionedTopic(topicName, numPartitions);

Producer<Long>[] producers = new Producer[numPartitions];

for (int i = 0; i < numPartitions; i++) {
producers[i] = pulsarClient.newProducer(Schema.INT64)
// produce to each partition directly so that order can be maintained in sending
.topic(topicName + "-partition-" + i).enableBatching(true).maxPendingMessages(30000)
.maxPendingMessagesAcrossPartitions(60000).batchingMaxMessages(10000)
.batchingMaxPublishDelay(5, TimeUnit.SECONDS).batchingMaxBytes(4 * 1024 * 1024)
.blockIfQueueFull(true).create();
}

@Cleanup
Consumer<Long> consumer = pulsarClient.newConsumer(Schema.INT64)
// consume on the partitioned topic
.topic(topicName).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.receiverQueueSize(numMessages).subscriptionName(methodName).subscribe();

// produce sequence numbers to each partition topic
long sequenceNumber = 1L;
for (int i = 0; i < numMessages; i++) {
for (Producer<Long> producer : producers) {
producer.newMessage().value(sequenceNumber).sendAsync();
}
sequenceNumber++;
}
for (Producer<Long> producer : producers) {
producer.flush();
producer.close();
}

// receive and validate sequences in the partitioned topic
Map<String, AtomicLong> receivedSequences = new HashMap<>();
int receivedCount = 0;
while (receivedCount < numPartitions * numMessages) {
Message<Long> message = consumer.receiveAsync().get(5, TimeUnit.SECONDS);
byte[] idByte = message.getMessageId().toByteArray();
MessageId id = MessageId.fromByteArray(idByte);
consumer.acknowledge(id);
receivedCount++;
AtomicLong receivedSequenceCounter = receivedSequences.computeIfAbsent(message.getTopicName(),
k -> new AtomicLong(1L));
Assert.assertEquals(message.getValue().longValue(), receivedSequenceCounter.getAndIncrement());
}
Assert.assertEquals(numPartitions * numMessages, receivedCount);
consumer.close();

PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName + "-partition-0", false).get().get();
Range<PositionImpl> range = topic.getManagedLedger().getCursors().iterator().next().getLastIndividualDeletedRange();
assertNull(range);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ public Impl(String topic, MessageId messageId) {
this.messageId = (MessageIdAdv) messageId;
}

protected MessageId getMessageId() {
return messageId;
}

@Override
public byte[] toByteArray() {
return messageId.toByteArray();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ public byte[] toByteArray() {
return toByteArray(batchIndex, batchSize);
}

@Override
protected byte[] toByteArray(String topic) {
return toByteArray(batchIndex, batchSize, topic);
}

@Deprecated
public boolean ackIndividual() {
return MessageIdAdvUtils.acknowledge(this, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.netty.util.concurrent.FastThreadLocal;
import java.io.IOException;
import java.util.Objects;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.common.api.proto.MessageIdData;
Expand Down Expand Up @@ -93,7 +94,7 @@ public static MessageId fromByteArray(byte[] data) throws IOException {
throw new IOException(e);
}

MessageIdImpl messageId;
MessageId messageId;
if (idData.hasBatchIndex()) {
if (idData.hasBatchSize()) {
messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(),
Expand All @@ -112,6 +113,11 @@ public static MessageId fromByteArray(byte[] data) throws IOException {
messageId = new MessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition());
}

if (idData.hasTopicName()) {
String topicName = idData.getTopicName();
messageId = new TopicMessageIdImpl(topicName, topicName, messageId);
}

return messageId;
}

Expand Down Expand Up @@ -174,7 +180,18 @@ protected MessageIdData writeMessageIdData(MessageIdData msgId, int batchIndex,

// batchIndex is -1 if message is non-batched message and has the batchIndex for a batch message
protected byte[] toByteArray(int batchIndex, int batchSize) {
return toByteArray(batchIndex, batchSize, null);
}

protected byte[] toByteArray(String topic) {
return toByteArray(-1, 0, topic);
}

protected byte[] toByteArray(int batchIndex, int batchSize, String topicName) {
MessageIdData msgId = writeMessageIdData(null, batchIndex, batchSize);
if (StringUtils.isNotBlank(topicName)) {
msgId.setTopicName(topicName);
}

int size = msgId.getSerializedSize();
ByteBuf serialized = Unpooled.buffer(size, size);
Expand All @@ -186,6 +203,6 @@ protected byte[] toByteArray(int batchIndex, int batchSize) {
@Override
public byte[] toByteArray() {
// there is no message batch so we pass -1
return toByteArray(-1, 0);
return toByteArray(-1, 0, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,14 @@ public boolean equals(Object obj) {
public int hashCode() {
return super.hashCode();
}
}

@Override
public byte[] toByteArray() {
MessageId id = getMessageId();
if (id instanceof MessageIdImpl) {
return ((MessageIdImpl) id).toByteArray(-1, 0, getOwnerTopic());
} else {
return id.toByteArray();
}
}
}
2 changes: 1 addition & 1 deletion pulsar-common/src/main/proto/PulsarApi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ message MessageIdData {
optional int32 batch_index = 4 [default = -1];
repeated int64 ack_set = 5;
optional int32 batch_size = 6;

// For the chunk message id, we need to specify the first chunk message id.
optional MessageIdData first_chunk_message_id = 7;
optional string topicName = 8;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need a PIP to add new fields to the Pulsar API. In PIP-224 I proposed to add an extra class TopicMessageIdSerDes for the serialization of the TopicMessageId, but the related PR is not present yet. If we can introduce such a new field, I think we don't need to implement TopicMessageIdSerDes any more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I described here, the limit of PIP-224 is caused by the lack of the topic name field in MessageIdData. I think it would be better to add this field before the 3.0.0 release. But I'd like to hear more voices about this change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BewareMyPower I replied on the PR serializing and deserializing is expensive and on top of that having different APIs for different use cases is creating a really bad experience for users, and I strongly feel we should avoid such APIs and complexity if things can be solved with a simple straight forward change with the same API and without creating the bad user experience.
I think we should consider this simple change without costing performance and API incompatibility and confusing usage to the users. So, I would like to avoid this change: BewareMyPower#20

Copy link
Contributor

@BewareMyPower BewareMyPower Apr 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I'm +1 to your PR and I think the API proposed in PIP-224 could be replaced with this PR.

few committers block all the PRs without any reason

However, the reason I blocked this PR is the current requirement of the PIP:

Any change to the wire protocol APIs

This PR brings a change to the wire protocol API, and I think it should not be passed without any proposal. /cc @merlimat @eolivelli @codelipenghui

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, I think it's better to add a description to clarify the new field will not apply to RPC calls. It is only used by the client side to carry the topic name.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi, i come from Kafka. and i think when client enable group commit offset. the field can be group by topicName and we can have much smaller RPC message to send on the wire. see kafka OffsetCommit RPC https://kafka.apache.org/protocol.html#The_Messages_OffsetCommit : - )

}

message KeyValue {
Expand Down
Loading