Skip to content

Commit 272c180

Browse files
lhotarieolivelli
authored andcommitted
[Broker] Fix replicated subscriptions direct memory leak
(cherry picked from commit b09ad5a)
1 parent 6887b65 commit 272c180

File tree

1 file changed

+7
-4
lines changed

1 file changed

+7
-4
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java

+7-4
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ public void localSubscriptionUpdated(String subscriptionName, ReplicatedSubscrip
123123
}
124124

125125
ByteBuf subscriptionUpdate = Markers.newReplicatedSubscriptionsUpdate(subscriptionName, clusterIds);
126-
topic.publishMessage(subscriptionUpdate, this);
126+
writeMarker(subscriptionUpdate);
127127
}
128128

129129
private void receivedSnapshotRequest(ReplicatedSubscriptionsSnapshotRequest request) {
@@ -140,8 +140,7 @@ private void receivedSnapshotRequest(ReplicatedSubscriptionsSnapshotRequest requ
140140
request.getSourceCluster(),
141141
localCluster,
142142
lastMsgId.getLedgerId(), lastMsgId.getEntryId());
143-
144-
topic.publishMessage(marker, this);
143+
writeMarker(marker);
145144
}
146145

147146
private void receivedSnapshotResponse(Position position, ReplicatedSubscriptionsSnapshotResponse response) {
@@ -276,7 +275,11 @@ void snapshotCompleted(String snapshotId) {
276275
}
277276

278277
void writeMarker(ByteBuf marker) {
279-
topic.publishMessage(marker, this);
278+
try {
279+
topic.publishMessage(marker, this);
280+
} finally {
281+
marker.release();
282+
}
280283
}
281284

282285
/**

0 commit comments

Comments
 (0)