Skip to content

[fix][client] Fix client redeliver epoch bigger than broker consumer epoch #20032

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -2034,20 +2034,31 @@ protected void handleRedeliverUnacknowledged(CommandRedeliverUnacknowledgedMessa
remoteAddress, redeliver.getConsumerId(),
redeliver.hasConsumerEpoch() ? redeliver.getConsumerEpoch() : null);
}

CompletableFuture<Consumer> consumerFuture = consumers.get(redeliver.getConsumerId());

if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
Consumer consumer = consumerFuture.getNow(null);
if (redeliver.getMessageIdsCount() > 0 && Subscription.isIndividualAckMode(consumer.subType())) {
consumer.redeliverUnacknowledgedMessages(redeliver.getMessageIdsList());
} else {
if (redeliver.hasConsumerEpoch()) {
consumer.redeliverUnacknowledgedMessages(redeliver.getConsumerEpoch());
boolean hasConsumerEpoch = redeliver.hasConsumerEpoch();
List<MessageIdData> messageIdsList = redeliver.getMessageIdsList();
int messageIdsCount = redeliver.getMessageIdsCount();
long consumerId = redeliver.getConsumerId();
long consumerEpoch = redeliver.getConsumerEpoch();
CompletableFuture<Consumer> consumerFuture = consumers.get(consumerId);
if (consumerFuture != null) {
consumerFuture.thenAccept((consumer) -> {
if (messageIdsCount > 0 && Subscription.isIndividualAckMode(consumer.subType())) {
consumer.redeliverUnacknowledgedMessages(messageIdsList);
} else {
consumer.redeliverUnacknowledgedMessages(DEFAULT_CONSUMER_EPOCH);
if (hasConsumerEpoch) {
consumer.redeliverUnacknowledgedMessages(consumerEpoch);
} else {
consumer.redeliverUnacknowledgedMessages(DEFAULT_CONSUMER_EPOCH);
}
}
}
}).exceptionally(e -> {
// if consumerFuture completed exceptionally, don't need to process this redeliver command
// because, consumer will reconnect
log.warn("[{}] ignore this redeliverUnacknowledged request from consumer {}, consumerEpoch {}",
remoteAddress, redeliver.getConsumerId(),
redeliver.hasConsumerEpoch() ? redeliver.getConsumerEpoch() : null, e);
return null;
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,15 +279,20 @@ public void redeliverUnacknowledgedMessages(Consumer consumer, long consumerEpoc
}

private synchronized void internalRedeliverUnacknowledgedMessages(Consumer consumer, long consumerEpoch) {
// redeliver epoch is bigger than current consumer epoch, so don't need to handle this redeliver request
if (consumerEpoch < consumer.getConsumerEpoch()) {
log.warn("[{}-{}] Update epoch, old epoch [{}] bigger than new epoch [{}]",
name, consumer, consumer.getConsumerEpoch(), consumerEpoch);
return;
}

if (consumerEpoch > consumer.getConsumerEpoch()) {
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Update epoch, old epoch [{}], new epoch [{}]",
name, consumer, consumer.getConsumerEpoch(), consumerEpoch);
}
consumer.setConsumerEpoch(consumerEpoch);
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Update epoch, old epoch [{}] new epoch [{}]",
name, consumer, consumer.getConsumerEpoch(), consumerEpoch);
}

consumer.setConsumerEpoch(consumerEpoch);

if (consumer != getActiveConsumer()) {
log.info("[{}-{}] Ignoring reDeliverUnAcknowledgedMessages: Only the active consumer can call resend",
name, consumer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.pulsar.client.api.SubscriptionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
Expand Down Expand Up @@ -322,6 +323,70 @@ public void testRedeliveryAddEpoch(boolean enableBatch) throws Exception{
assertEquals(message.getValue(), test3);
}

@Test
public void testRedeliveryBrokerIgnoreSmallerEpoch() throws Exception{
final String topic = "testRedeliveryBrokerAbortSmallerEpoch";
final String subName = "my-sub";

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.create();

@Cleanup
ConsumerImpl<String> consumer = ((ConsumerImpl<String>) pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Failover)
.subscribe());

consumer.redeliverUnacknowledgedMessages();
PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopic(
"persistent://public/default/testRedeliveryBrokerAbortSmallerEpoch", false).get().get();
Awaitility.await().until(() -> persistentTopic.getSubscription(subName).getDispatcher()
.getConsumers().get(0).getConsumerEpoch() == 1);
consumer.setConsumerEpoch(0);
producer.send("Hello Pulsar!");

// ignore this redeliver request
consumer.redeliverUnacknowledgedMessages();
consumer.receive();
assertEquals(persistentTopic.getSubscription(subName).getDispatcher()
.getConsumers().get(0).getConsumerEpoch(), 1);
}

@Test
public void testRedeliveryCommandDontCheckClientConnectionState() throws Exception{
final String topic = "testRedeliveryCommandDontCheckClientConnectionState";
final String subName = "my-sub";

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.create();

@Cleanup
ConsumerImpl<String> consumer = ((ConsumerImpl<String>) pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Failover)
.subscribe());

assertEquals(consumer.getState(), HandlerState.State.Ready);
consumer.setState(HandlerState.State.Connecting);
producer.send("Hello Pulsar!");
consumer.receive();
consumer.redeliverUnacknowledgedMessages();
PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopic(
"persistent://public/default/testRedeliveryCommandDontCheckClientConnectionState",
false).get().get();
Awaitility.await().until(() -> persistentTopic.getSubscription(subName).getDispatcher()
.getConsumers().get(0).getConsumerEpoch() == 1);

// redeliver success, consumer also can receive message again
consumer.receive();
}

@Test(dataProvider = "enableBatch")
public void testRedeliveryAddEpochAndPermits(boolean enableBatch) throws Exception {
final String topic = "testRedeliveryAddEpochAndPermits";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2103,8 +2103,10 @@ public void redeliverUnacknowledgedMessages() {
incomingQueueLock.unlock();
}

// is channel is connected, we should send redeliver command to broker
if (cnx != null && isConnected(cnx)) {
// If a subscription command has been sent to the broker, it is necessary to allow the redelivery
// request to be sent to the broker without checking the connection state, as failing to do so would
// result in the client consumer epoch being bigger than the broker consumer epoch.
if (cnx != null) {
cnx.ctx().writeAndFlush(Commands.newRedeliverUnacknowledgedMessages(
consumerId, CONSUMER_EPOCH.get(this)), cnx.ctx().voidPromise());
if (currentSize > 0) {
Expand All @@ -2114,9 +2116,6 @@ public void redeliverUnacknowledgedMessages() {
log.debug("[{}] [{}] [{}] Redeliver unacked messages and send {} permits", subscription, topic,
consumerName, currentSize);
}
} else {
log.warn("[{}] Send redeliver messages command but the client is reconnect or close, "
+ "so don't need to send redeliver command to broker", this);
}
}
}
Expand Down
Loading