diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index bd68d2fb9800c..fbce0279762e5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -494,7 +494,7 @@ public CompletableFuture getFuture() { @Override public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { - InFlightTask inFlightTask = (InFlightTask) ctx; + completeFailedReadTask(ctx); if (state != Started) { log.info("Replicator was disconnected while reading entries, stopping reads"); return; @@ -515,14 +515,12 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { terminate(); return; } else if (!(exception instanceof TooManyRequestsException)) { - inFlightTask.setEntries(Collections.emptyList()); log.error() .attr("ctx", ctx) .attr("waitTimeSec", waitTimeMillis / 1000.0) .exception(exception) .log("Error reading entries, retrying"); } else { - inFlightTask.setEntries(Collections.emptyList()); log.debug() .attr("ctx", ctx) .attr("waitTimeSec", waitTimeMillis / 1000.0) @@ -533,6 +531,15 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { brokerService.executor().schedule(this::readMoreEntries, waitTimeMillis, TimeUnit.MILLISECONDS); } + private void completeFailedReadTask(Object ctx) { + if (ctx instanceof InFlightTask) { + InFlightTask inFlightTask = (InFlightTask) ctx; + if (inFlightTask.entries == null) { + inFlightTask.setEntries(Collections.emptyList()); + } + } + } + public CompletableFuture clearBacklog() { CompletableFuture future = new CompletableFuture<>(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java index 0499dd90e990e..478437ffde015 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java @@ -28,17 +28,25 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import lombok.CustomLog; import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.PositionFactory; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerTest; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.service.AbstractReplicator; import org.apache.pulsar.broker.service.BrokerServiceInternalMethodInvoker; import org.apache.pulsar.broker.service.OneWayReplicatorTestBase; import org.apache.pulsar.broker.service.persistent.PersistentReplicator.InFlightTask; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.Schema; +import org.awaitility.Awaitility; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.testng.Assert; @@ -107,6 +115,58 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { assertTrue(counter.get() <= 1); } + @Test + public void testReadEntriesFailedCompletesInFlightTaskAfterReplicatorTerminated() throws Exception { + String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_"); + CountDownLatch readStarted = new CountDownLatch(1); + CountDownLatch failRead = new CountDownLatch(1); + Producer producer = null; + try { + admin1.topics().createNonPartitionedTopic(topicName); + admin2.topics().createNonPartitionedTopic(topicName); + producer = client1.newProducer(Schema.STRING).topic(topicName).create(); + producer.send("msg"); + + PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false) + .join().get(); + ManagedLedgerImpl ml = (ManagedLedgerImpl) topic.getManagedLedger(); + ManagedLedgerTest.makeReadEntryProbFail(ml, () -> { + readStarted.countDown(); + try { + if (!failRead.await(30, TimeUnit.SECONDS)) { + return new ManagedLedgerException("Timed out waiting to fail read entries"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return new ManagedLedgerException(e); + } + return new ManagedLedgerException.TooManyRequestsException("mocked read failure"); + }); + + pulsar1.getConfig().setReplicationStartAt("earliest"); + admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2)); + assertTrue(readStarted.await(30, TimeUnit.SECONDS)); + + PersistentReplicator replicator = (PersistentReplicator) topic.getReplicators().get(cluster2); + Assert.assertNotNull(replicator, "Replicator should not be null"); + Assert.assertTrue(replicator.hasPendingRead()); + + replicator.terminate(); + Assert.assertTrue(replicator.getState() != AbstractReplicator.State.Started); + failRead.countDown(); + + Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> + Assert.assertFalse(replicator.hasPendingRead())); + } finally { + failRead.countDown(); + if (producer != null) { + producer.close(); + } + admin1.topics().delete(topicName, true); + admin2.topics().delete(topicName, true); + } + } + @Test public void testCreateOrRecycleInFlightTaskIntoQueue() throws Exception { log.info("Starting testCreateOrRecycleInFlightTaskIntoQueue");