Skip to content

Commit c4196fb

Browse files
nodeceTechnoboy-
authored andcommitted
[fix][broker] Record GeoPersistentReplicator.msgOut before producer#sendAsync (#21673)
Signed-off-by: Zixuan Liu <[email protected]>
1 parent 2393ca7 commit c4196fb

File tree

1 file changed

+1
-3
lines changed

1 file changed

+1
-3
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -149,9 +149,6 @@ protected boolean replicateEntries(List<Entry> entries) {
149149
}
150150

151151
dispatchRateLimiter.ifPresent(rateLimiter -> rateLimiter.tryDispatchPermit(1, entry.getLength()));
152-
153-
msgOut.recordEvent(headersAndPayload.readableBytes());
154-
155152
msg.setReplicatedFrom(localCluster);
156153

157154
headersAndPayload.retain();
@@ -181,6 +178,7 @@ protected boolean replicateEntries(List<Entry> entries) {
181178
msg.setSchemaInfoForReplicator(schemaFuture.get());
182179
msg.getMessageBuilder().clearTxnidMostBits();
183180
msg.getMessageBuilder().clearTxnidLeastBits();
181+
msgOut.recordEvent(headersAndPayload.readableBytes());
184182
// Increment pending messages for messages produced locally
185183
PENDING_MESSAGES_UPDATER.incrementAndGet(this);
186184
producer.sendAsync(msg, ProducerSendCallback.create(this, entry, msg));

0 commit comments

Comments
 (0)