diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java index 667c8598529b3..5d6f6f688cb9f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java @@ -134,8 +134,6 @@ protected boolean replicateEntries(List entries, InFlightTask inFlightTas msg.getMessageBuilder().addProperty().setKey(MSG_PROP_REPL_SOURCE_POSITION) .setValue(String.format("%s:%s", entry.getLedgerId(), entry.getEntryId())); - headersAndPayload.retain(); - // Increment pending messages for messages produced locally producer.sendAsync(msg, ProducerSendCallback.create(this, entry, msg, inFlightTask)); atLeastOneMessageSentForReplication = true; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java index 40ff0633f4fb5..21ead24cdd463 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java @@ -19,14 +19,20 @@ package org.apache.pulsar.broker.service.persistent; import static org.apache.pulsar.broker.service.persistent.BrokerServicePersistInternalMethodInvoker.ensureNoBacklogByInflightTask; +import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload; import static org.testng.Assert.assertTrue; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import java.nio.charset.StandardCharsets; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import lombok.Cleanup; import lombok.CustomLog; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.impl.EntryImpl; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.BrokerTestBase; @@ -38,9 +44,11 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ReplicatorStats; import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.schema.Schemas; import org.awaitility.Awaitility; import org.testng.Assert; @@ -146,6 +154,39 @@ public void testShadowReplication() throws Exception { Assert.assertEquals(shadowMessage.getMessageId(), sourceMessage.getMessageId()); } + @Test + public void testShadowReplicatorReleasesSourceEntryBuffer() throws Exception { + String sourceTopicName = BrokerTestUtil.newUniqueName("persistent://prop1/ns-source/source-topic"); + String shadowTopicName = BrokerTestUtil.newUniqueName("persistent://prop1/ns-shadow/shadow-topic"); + + admin.topics().createNonPartitionedTopic(sourceTopicName); + admin.topics().createShadowTopic(shadowTopicName, sourceTopicName); + admin.topics().setShadowTopics(sourceTopicName, Lists.newArrayList(shadowTopicName)); + + PersistentTopic sourceTopic = + (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(sourceTopicName).get().get(); + Awaitility.await().untilAsserted(() -> Assert.assertEquals(sourceTopic.getShadowReplicators().size(), 1)); + ShadowReplicator replicator = (ShadowReplicator) sourceTopic.getShadowReplicators().get(shadowTopicName); + Awaitility.await().untilAsserted(() -> + Assert.assertEquals(String.valueOf(replicator.getState()), "Started")); + + Entry entry = createEntry(1, 0, "ref-count-check", 1); + ByteBuf entryBuffer = entry.getDataBuffer(); + Assert.assertEquals(entryBuffer.refCnt(), 1); + + List entries = Lists.newArrayList(entry); + PersistentReplicator.InFlightTask inFlightTask = + new PersistentReplicator.InFlightTask( + entry.getPosition(), entries.size(), replicator.getReplicatorId()); + inFlightTask.setEntries(entries); + Assert.assertTrue(replicator.replicateEntries(entries, inFlightTask)); + + Awaitility.await().untilAsserted(() -> { + Assert.assertTrue(inFlightTask.isDone()); + Assert.assertEquals(entryBuffer.refCnt(), 0); + }); + } + private static PersistentReplicator getAnyShadowReplicator(TopicName topicName, PulsarService pulsar) { PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName.toString(), false).join().get(); @@ -196,4 +237,22 @@ public void testCounterOfPendingMessagesCorrect() throws Exception { ensureNoBacklogByInflightTask(replicator); } -} \ No newline at end of file + private Entry createEntry(long ledgerId, long entryId, String message, long sequenceId) { + ByteBuf headersAndPayload = createMessage(message, sequenceId); + Entry entry = EntryImpl.create(ledgerId, entryId, headersAndPayload); + headersAndPayload.release(); + return entry; + } + + private ByteBuf createMessage(String message, long sequenceId) { + MessageMetadata messageMetadata = new MessageMetadata() + .setSequenceId(sequenceId) + .setProducerName("testProducer") + .setPublishTime(System.currentTimeMillis()); + ByteBuf payload = Unpooled.copiedBuffer(message.getBytes(StandardCharsets.UTF_8)); + ByteBuf headersAndPayload = serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata, payload); + payload.release(); + return headersAndPayload; + } + +}