Skip to content

[draft] [fix] [client] fix ack failed when consumer is reconnecting #21928

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -92,6 +93,7 @@
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConsumerBase;
Expand All @@ -106,6 +108,8 @@
import org.apache.pulsar.client.impl.schema.writer.AvroWriter;
import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandSuccess;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
import org.apache.pulsar.common.compression.CompressionCodec;
Expand Down Expand Up @@ -4692,4 +4696,58 @@ public void flush(ChannelHandlerContext ctx) throws Exception {
consumer.close();
admin.topics().delete(topic, false);
}

@DataProvider(name = "ackArgs")
public Object[] ackArgs() {
int batchSize = 10;
BitSet bitSet = new BitSet(batchSize);
bitSet.set(0);
return new Object[][]{
// no batch.
{CommandAck.AckType.Cumulative, new MessageIdImpl(1,1,1)},
{CommandAck.AckType.Individual, new MessageIdImpl(1,1,1)},
// batch without ackSet.
{CommandAck.AckType.Individual, new BatchMessageIdImpl(1,1,1,0)},
{CommandAck.AckType.Cumulative, new BatchMessageIdImpl(1,1,1,0)},
// batch with ackSet.
{CommandAck.AckType.Cumulative, new BatchMessageIdImpl(1,1,1,0, batchSize, bitSet)},
{CommandAck.AckType.Individual, new BatchMessageIdImpl(1,1,1,0, batchSize, bitSet)}
};
}

@Test(dataProvider = "ackArgs")
public void testImmediateAckWhenReconnecting(CommandAck.AckType ackType, MessageId messageId) throws Exception {
final String topic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");
final String subscriptionName = "s1";
PulsarClient delayConnectClient = createDelayReconnectClient();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The client should be closed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed


Consumer<String> consumer = delayConnectClient.newConsumer(Schema.STRING).topic(topic)
.acknowledgmentGroupTime(0, TimeUnit.MILLISECONDS)
.subscriptionName(subscriptionName).subscribe();

admin.topics().unload(topic);
if (ackType == CommandAck.AckType.Individual) {
consumer.acknowledge(messageId);
} else {
consumer.acknowledgeCumulative(messageId);
}

consumer.close();
delayConnectClient.close();
admin.topics().delete(topic, false);
}

private PulsarClient createDelayReconnectClient() throws Exception {
ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder().serviceUrl(lookupUrl.toString());
return InjectedClientCnxClientBuilder.create(clientBuilder,
(conf, eventLoopGroup) -> new ClientCnx(conf, eventLoopGroup) {
@Override
protected void handleSuccess(CommandSuccess success) {
new Thread(() -> {
sleepSeconds(5);
super.handleSuccess(success);
}).start();
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,7 @@ public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
if (!(firstTimeConnect && hasParentConsumer) && getCurrentReceiverQueueSize() != 0) {
increaseAvailablePermits(cnx, getCurrentReceiverQueueSize());
}
acknowledgmentsGroupingTracker.flush();
future.complete(null);
}).exceptionally((e) -> {
deregisterFromClientCnx();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,29 +231,31 @@ private CompletableFuture<Void> addAcknowledgment(MessageIdAdv msgId,
case Individual:
return addIndividualAcknowledgment(msgId,
batchMessageId,
__ -> doIndividualAck(__, properties),
__ -> doIndividualBatchAck(__, properties));
__ -> doIndividualAck(__, properties, false),
__ -> doIndividualBatchAck(__, properties, false));
case Cumulative:
if (batchMessageId != null) {
consumer.onAcknowledgeCumulative(batchMessageId, null);
} else {
consumer.onAcknowledgeCumulative(msgId, null);
}
if (batchMessageId == null || MessageIdAdvUtils.acknowledge(batchMessageId, false)) {
return doCumulativeAck(msgId, properties, null);
return doCumulativeAck(msgId, properties, null, false);
} else if (batchIndexAckEnabled) {
return doCumulativeBatchIndexAck(batchMessageId, properties);
return doCumulativeBatchIndexAck(batchMessageId, properties, false);
} else {
doCumulativeAck(MessageIdAdvUtils.prevMessageId(batchMessageId), properties, null);
doCumulativeAck(MessageIdAdvUtils.prevMessageId(batchMessageId), properties, null, false);
return CompletableFuture.completedFuture(null);
}
default:
throw new IllegalStateException("Unknown AckType: " + ackType);
}
}

private CompletableFuture<Void> doIndividualAck(MessageIdAdv messageId, Map<String, Long> properties) {
if (acknowledgementGroupTimeMicros == 0 || (properties != null && !properties.isEmpty())) {
private CompletableFuture<Void> doIndividualAck(MessageIdAdv messageId, Map<String, Long> properties,
boolean queueDueToConnecting) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method invocations like f(args..., true) and f(args..., false) are hard to read unless you jump to the implementation of f and see what the boolean parameter means.

You should add a new method like queueIndividualAck and call it directly. For example,

    private CompletableFuture<Void> queueIndividualAck(MessageIdAdv messageId) {
        Optional<Lock> readLock = acquireReadLock();
        try {
            doIndividualAckAsync(messageId);
            return readLock.map(__ -> currentIndividualAckFuture).orElse(CompletableFuture.completedFuture(null));
        } finally {
            readLock.ifPresent(Lock::unlock);
            if (pendingIndividualAcks.size() >= maxAckGroupSize) {
                flush();
            }
        }
    }

    private CompletableFuture<Void> doIndividualAck(MessageIdAdv messageId, Map<String, Long> properties) {
        if (acknowledgementGroupTimeMicros == 0 || (properties != null && !properties.isEmpty())) {
            // We cannot group acks if the delay is 0 or when there are properties attached to it. Fortunately that's an
            // uncommon condition since it's only used for the compaction subscription.
            return doImmediateAck(messageId, AckType.Individual, properties, null);
        } else {
            return queueIndividualAck(messageId);
        }
    }

Then you don't have to add false argument to all existing doIndividualAck calls. And in doImmediateAck, you can call queueIndividualAck directly.

    private CompletableFuture<Void> doImmediateAck(MessageIdAdv msgId, AckType ackType, Map<String, Long> properties,
                                                   BitSetRecyclable bitSet) {
        ClientCnx cnx = consumer.getClientCnx();

        if (cnx == null && consumer.getState() == HandlerState.State.Connecting) {
            if (ackType == AckType.Cumulative) {
                return queueCumulativeAck(msgId);
            } else {
                return queueIndividualAck(msgId);
            }
        }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will change the implementation of this PR as #21928 (comment)

if (!queueDueToConnecting
&& (acknowledgementGroupTimeMicros == 0 || (properties != null && !properties.isEmpty()))) {
Comment on lines +257 to +258
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it break the behavior of

// We cannot group acks if the delay is 0 or when there are properties attached to it. Fortunately that's an
// uncommon condition since it's only used for the compaction subscription.

If the consumer is reconnecting but the ack has properties. We will also group the acks which is not expected?

Copy link
Contributor

@BewareMyPower BewareMyPower Jan 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codelipenghui Yes. But this behavior is only added for the TwoPhaseCompactor to avoid latency when seeking, the client side has no way to attach properties. see

.thenCompose((v) -> reader.acknowledgeCumulativeAsync(lastReadId,
Map.of(COMPACTED_TOPIC_LEDGER_PROPERTY, ledger.getId())))

Currently, if acknowledge is called during reconnection, phaseTwoSeekThenLoop will fail.

From my perspective, we should also queue the ACK requests and flush them after connected. @poorbarcode

Copy link
Contributor Author

@poorbarcode poorbarcode Jan 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agress with @BewareMyPower

I am trying to split this PR into the following.

  • part-1: add a new component AcknowledgmentCache to support caching the acknowledgments which include the arg properties.
  • part-2: split PersistentAcknowledgmentsGroupingTracker into two implementations:
    • cache and batch the acks
    • immediately ack
  • part-3: fix the issue

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Marked current PR as Draft

Copy link
Contributor

@BewareMyPower BewareMyPower Jan 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

split PersistentAcknowledgmentsGroupingTracker into two implementations

It's reasonable. BTW, the C++ client is already implemented in such way that:

https://github.com/apache/pulsar-client-cpp/blob/72b7311aeef32e28a28e926da686aaf948e8f948/lib/ConsumerImpl.cc#L201C1-L215C6

Though the naming is not good (but consistent with other impl classes in the library)

// We cannot group acks if the delay is 0 or when there are properties attached to it. Fortunately that's an
// uncommon condition since it's only used for the compaction subscription.
return doImmediateAck(messageId, AckType.Individual, properties, null);
Expand All @@ -279,8 +281,10 @@ private CompletableFuture<Void> doIndividualAckAsync(MessageIdAdv messageId) {
}

private CompletableFuture<Void> doIndividualBatchAck(MessageIdAdv batchMessageId,
Map<String, Long> properties) {
if (acknowledgementGroupTimeMicros == 0 || (properties != null && !properties.isEmpty())) {
Map<String, Long> properties,
boolean queueDueToConnecting) {
if (!queueDueToConnecting
&& (acknowledgementGroupTimeMicros == 0 || (properties != null && !properties.isEmpty()))) {
return doImmediateBatchIndexAck(batchMessageId, batchMessageId.getBatchIndex(),
batchMessageId.getBatchSize(), AckType.Individual, properties);
} else {
Expand All @@ -302,9 +306,10 @@ private CompletableFuture<Void> doIndividualBatchAck(MessageIdAdv batchMessageId
}

private CompletableFuture<Void> doCumulativeAck(MessageIdAdv messageId, Map<String, Long> properties,
BitSetRecyclable bitSet) {
BitSetRecyclable bitSet, boolean queueDueToConnecting) {
consumer.getStats().incrementNumAcksSent(consumer.getUnAckedMessageTracker().removeMessagesTill(messageId));
if (acknowledgementGroupTimeMicros == 0 || (properties != null && !properties.isEmpty())) {
if (!queueDueToConnecting
&& (acknowledgementGroupTimeMicros == 0 || (properties != null && !properties.isEmpty()))) {
// We cannot group acks if the delay is 0 or when there are properties attached to it. Fortunately that's an
// uncommon condition since it's only used for the compaction subscription.
return doImmediateAck(messageId, AckType.Cumulative, properties, bitSet);
Expand Down Expand Up @@ -342,22 +347,31 @@ private void doCumulativeAckAsync(MessageIdAdv msgId, BitSetRecyclable bitSet) {
}

private CompletableFuture<Void> doCumulativeBatchIndexAck(MessageIdAdv batchMessageId,
Map<String, Long> properties) {
if (acknowledgementGroupTimeMicros == 0 || (properties != null && !properties.isEmpty())) {
Map<String, Long> properties,
boolean queueDueToConnecting) {
if (!queueDueToConnecting
&& (acknowledgementGroupTimeMicros == 0 || (properties != null && !properties.isEmpty()))) {
return doImmediateBatchIndexAck(batchMessageId, batchMessageId.getBatchIndex(),
batchMessageId.getBatchSize(), AckType.Cumulative, properties);
} else {
BitSetRecyclable bitSet = BitSetRecyclable.create();
bitSet.set(0, batchMessageId.getBatchSize());
bitSet.clear(0, batchMessageId.getBatchIndex() + 1);
return doCumulativeAck(batchMessageId, null, bitSet);
return doCumulativeAck(batchMessageId, null, bitSet, false);
}
}

private CompletableFuture<Void> doImmediateAck(MessageIdAdv msgId, AckType ackType, Map<String, Long> properties,
BitSetRecyclable bitSet) {
ClientCnx cnx = consumer.getClientCnx();

if (cnx == null && consumer.getState() == HandlerState.State.Connecting) {
if (ackType == AckType.Cumulative) {
return doCumulativeAck(msgId, properties, bitSet, true);
} else {
return doIndividualAck(msgId, properties, true);
}
}
if (cnx == null) {
return FutureUtil.failedFuture(new PulsarClientException
.ConnectException("Consumer connect fail! consumer state:" + consumer.getState()));
Expand All @@ -369,6 +383,14 @@ private CompletableFuture<Void> doImmediateBatchIndexAck(MessageIdAdv msgId, int
AckType ackType, Map<String, Long> properties) {
ClientCnx cnx = consumer.getClientCnx();

if (cnx == null && consumer.getState() == HandlerState.State.Connecting) {
if (ackType == AckType.Cumulative) {
return doCumulativeBatchIndexAck(msgId, properties, true);
} else {
return doIndividualBatchAck(msgId, properties, true);
}
}

if (cnx == null) {
return FutureUtil.failedFuture(new PulsarClientException
.ConnectException("Consumer connect fail! consumer state:" + consumer.getState()));
Expand Down
Loading