diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 4536bda907b6a..d8a927ef7382a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -51,6 +51,7 @@ import java.time.Instant; import java.time.ZoneId; import java.util.ArrayList; +import java.util.BitSet; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -92,6 +93,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.ClientBuilderImpl; import org.apache.pulsar.client.impl.ClientCnx; import org.apache.pulsar.client.impl.ConsumerBase; @@ -106,6 +108,8 @@ import org.apache.pulsar.client.impl.schema.writer.AvroWriter; import org.apache.pulsar.common.api.EncryptionContext; import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey; +import org.apache.pulsar.common.api.proto.CommandAck; +import org.apache.pulsar.common.api.proto.CommandSuccess; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.api.proto.SingleMessageMetadata; import org.apache.pulsar.common.compression.CompressionCodec; @@ -4692,4 +4696,58 @@ public void flush(ChannelHandlerContext ctx) throws Exception { consumer.close(); admin.topics().delete(topic, false); } + + @DataProvider(name = "ackArgs") + public Object[] ackArgs() { + int batchSize = 10; + BitSet bitSet = new BitSet(batchSize); + bitSet.set(0); + return new Object[][]{ + // no batch. + {CommandAck.AckType.Cumulative, new MessageIdImpl(1,1,1)}, + {CommandAck.AckType.Individual, new MessageIdImpl(1,1,1)}, + // batch without ackSet. + {CommandAck.AckType.Individual, new BatchMessageIdImpl(1,1,1,0)}, + {CommandAck.AckType.Cumulative, new BatchMessageIdImpl(1,1,1,0)}, + // batch with ackSet. + {CommandAck.AckType.Cumulative, new BatchMessageIdImpl(1,1,1,0, batchSize, bitSet)}, + {CommandAck.AckType.Individual, new BatchMessageIdImpl(1,1,1,0, batchSize, bitSet)} + }; + } + + @Test(dataProvider = "ackArgs") + public void testImmediateAckWhenReconnecting(CommandAck.AckType ackType, MessageId messageId) throws Exception { + final String topic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_"); + final String subscriptionName = "s1"; + PulsarClient delayConnectClient = createDelayReconnectClient(); + + Consumer consumer = delayConnectClient.newConsumer(Schema.STRING).topic(topic) + .acknowledgmentGroupTime(0, TimeUnit.MILLISECONDS) + .subscriptionName(subscriptionName).subscribe(); + + admin.topics().unload(topic); + if (ackType == CommandAck.AckType.Individual) { + consumer.acknowledge(messageId); + } else { + consumer.acknowledgeCumulative(messageId); + } + + consumer.close(); + delayConnectClient.close(); + admin.topics().delete(topic, false); + } + + private PulsarClient createDelayReconnectClient() throws Exception { + ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder().serviceUrl(lookupUrl.toString()); + return InjectedClientCnxClientBuilder.create(clientBuilder, + (conf, eventLoopGroup) -> new ClientCnx(conf, eventLoopGroup) { + @Override + protected void handleSuccess(CommandSuccess success) { + new Thread(() -> { + sleepSeconds(5); + super.handleSuccess(success); + }).start(); + } + }); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index d2aaafdd09d7d..13c8be70724fe 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -871,6 +871,7 @@ public CompletableFuture connectionOpened(final ClientCnx cnx) { if (!(firstTimeConnect && hasParentConsumer) && getCurrentReceiverQueueSize() != 0) { increaseAvailablePermits(cnx, getCurrentReceiverQueueSize()); } + acknowledgmentsGroupingTracker.flush(); future.complete(null); }).exceptionally((e) -> { deregisterFromClientCnx(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java index 0cf776aea5942..1c15441dd2d8b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java @@ -231,8 +231,8 @@ private CompletableFuture addAcknowledgment(MessageIdAdv msgId, case Individual: return addIndividualAcknowledgment(msgId, batchMessageId, - __ -> doIndividualAck(__, properties), - __ -> doIndividualBatchAck(__, properties)); + __ -> doIndividualAck(__, properties, false), + __ -> doIndividualBatchAck(__, properties, false)); case Cumulative: if (batchMessageId != null) { consumer.onAcknowledgeCumulative(batchMessageId, null); @@ -240,11 +240,11 @@ private CompletableFuture addAcknowledgment(MessageIdAdv msgId, consumer.onAcknowledgeCumulative(msgId, null); } if (batchMessageId == null || MessageIdAdvUtils.acknowledge(batchMessageId, false)) { - return doCumulativeAck(msgId, properties, null); + return doCumulativeAck(msgId, properties, null, false); } else if (batchIndexAckEnabled) { - return doCumulativeBatchIndexAck(batchMessageId, properties); + return doCumulativeBatchIndexAck(batchMessageId, properties, false); } else { - doCumulativeAck(MessageIdAdvUtils.prevMessageId(batchMessageId), properties, null); + doCumulativeAck(MessageIdAdvUtils.prevMessageId(batchMessageId), properties, null, false); return CompletableFuture.completedFuture(null); } default: @@ -252,8 +252,10 @@ private CompletableFuture addAcknowledgment(MessageIdAdv msgId, } } - private CompletableFuture doIndividualAck(MessageIdAdv messageId, Map properties) { - if (acknowledgementGroupTimeMicros == 0 || (properties != null && !properties.isEmpty())) { + private CompletableFuture doIndividualAck(MessageIdAdv messageId, Map properties, + boolean queueDueToConnecting) { + if (!queueDueToConnecting + && (acknowledgementGroupTimeMicros == 0 || (properties != null && !properties.isEmpty()))) { // We cannot group acks if the delay is 0 or when there are properties attached to it. Fortunately that's an // uncommon condition since it's only used for the compaction subscription. return doImmediateAck(messageId, AckType.Individual, properties, null); @@ -279,8 +281,10 @@ private CompletableFuture doIndividualAckAsync(MessageIdAdv messageId) { } private CompletableFuture doIndividualBatchAck(MessageIdAdv batchMessageId, - Map properties) { - if (acknowledgementGroupTimeMicros == 0 || (properties != null && !properties.isEmpty())) { + Map properties, + boolean queueDueToConnecting) { + if (!queueDueToConnecting + && (acknowledgementGroupTimeMicros == 0 || (properties != null && !properties.isEmpty()))) { return doImmediateBatchIndexAck(batchMessageId, batchMessageId.getBatchIndex(), batchMessageId.getBatchSize(), AckType.Individual, properties); } else { @@ -302,9 +306,10 @@ private CompletableFuture doIndividualBatchAck(MessageIdAdv batchMessageId } private CompletableFuture doCumulativeAck(MessageIdAdv messageId, Map properties, - BitSetRecyclable bitSet) { + BitSetRecyclable bitSet, boolean queueDueToConnecting) { consumer.getStats().incrementNumAcksSent(consumer.getUnAckedMessageTracker().removeMessagesTill(messageId)); - if (acknowledgementGroupTimeMicros == 0 || (properties != null && !properties.isEmpty())) { + if (!queueDueToConnecting + && (acknowledgementGroupTimeMicros == 0 || (properties != null && !properties.isEmpty()))) { // We cannot group acks if the delay is 0 or when there are properties attached to it. Fortunately that's an // uncommon condition since it's only used for the compaction subscription. return doImmediateAck(messageId, AckType.Cumulative, properties, bitSet); @@ -342,15 +347,17 @@ private void doCumulativeAckAsync(MessageIdAdv msgId, BitSetRecyclable bitSet) { } private CompletableFuture doCumulativeBatchIndexAck(MessageIdAdv batchMessageId, - Map properties) { - if (acknowledgementGroupTimeMicros == 0 || (properties != null && !properties.isEmpty())) { + Map properties, + boolean queueDueToConnecting) { + if (!queueDueToConnecting + && (acknowledgementGroupTimeMicros == 0 || (properties != null && !properties.isEmpty()))) { return doImmediateBatchIndexAck(batchMessageId, batchMessageId.getBatchIndex(), batchMessageId.getBatchSize(), AckType.Cumulative, properties); } else { BitSetRecyclable bitSet = BitSetRecyclable.create(); bitSet.set(0, batchMessageId.getBatchSize()); bitSet.clear(0, batchMessageId.getBatchIndex() + 1); - return doCumulativeAck(batchMessageId, null, bitSet); + return doCumulativeAck(batchMessageId, null, bitSet, false); } } @@ -358,6 +365,13 @@ private CompletableFuture doImmediateAck(MessageIdAdv msgId, AckType ackTy BitSetRecyclable bitSet) { ClientCnx cnx = consumer.getClientCnx(); + if (cnx == null && consumer.getState() == HandlerState.State.Connecting) { + if (ackType == AckType.Cumulative) { + return doCumulativeAck(msgId, properties, bitSet, true); + } else { + return doIndividualAck(msgId, properties, true); + } + } if (cnx == null) { return FutureUtil.failedFuture(new PulsarClientException .ConnectException("Consumer connect fail! consumer state:" + consumer.getState())); @@ -369,6 +383,14 @@ private CompletableFuture doImmediateBatchIndexAck(MessageIdAdv msgId, int AckType ackType, Map properties) { ClientCnx cnx = consumer.getClientCnx(); + if (cnx == null && consumer.getState() == HandlerState.State.Connecting) { + if (ackType == AckType.Cumulative) { + return doCumulativeBatchIndexAck(msgId, properties, true); + } else { + return doIndividualBatchAck(msgId, properties, true); + } + } + if (cnx == null) { return FutureUtil.failedFuture(new PulsarClientException .ConnectException("Consumer connect fail! consumer state:" + consumer.getState()));