Skip to content

Commit f838db1

Browse files
shibddao-jun
andauthored
[improve][broker][branch-3.3] Optimize PersistentTopic.getLastDispatchablePosition (#22707) (#23826)
Co-authored-by: 道君 <[email protected]>
1 parent 41c7e29 commit f838db1

File tree

4 files changed

+89
-16
lines changed

4 files changed

+89
-16
lines changed

Diff for: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java

+54-12
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,9 @@ private static class EstimateTimeBasedBacklogQuotaCheckResult {
316316
Long estimatedOldestUnacknowledgedMessageTimestamp;
317317
}
318318

319+
// The last position that can be dispatched to consumers
320+
private volatile Position lastDispatchablePosition;
321+
319322
/***
320323
* We use 3 futures to prevent a new closing if there is an in-progress deletion or closing. We make Pulsar return
321324
* the in-progress one when it is called the second time.
@@ -3889,18 +3892,57 @@ public Position getLastPosition() {
38893892

38903893
@Override
38913894
public CompletableFuture<Position> getLastDispatchablePosition() {
3892-
return ManagedLedgerImplUtils.asyncGetLastValidPosition((ManagedLedgerImpl) ledger, entry -> {
3893-
MessageMetadata md = Commands.parseMessageMetadata(entry.getDataBuffer());
3894-
// If a messages has marker will filter by AbstractBaseDispatcher.filterEntriesForConsumer
3895-
if (Markers.isServerOnlyMarker(md)) {
3896-
return false;
3897-
} else if (md.hasTxnidMostBits() && md.hasTxnidLeastBits()) {
3898-
// Filter-out transaction aborted messages.
3899-
TxnID txnID = new TxnID(md.getTxnidMostBits(), md.getTxnidLeastBits());
3900-
return !isTxnAborted(txnID, (PositionImpl) entry.getPosition());
3901-
}
3902-
return true;
3903-
}, getMaxReadPosition());
3895+
if (lastDispatchablePosition != null) {
3896+
return CompletableFuture.completedFuture(lastDispatchablePosition);
3897+
}
3898+
return ManagedLedgerImplUtils
3899+
.asyncGetLastValidPosition((ManagedLedgerImpl) ledger, entry -> {
3900+
MessageMetadata md = Commands.parseMessageMetadata(entry.getDataBuffer());
3901+
// If a messages has marker will filter by AbstractBaseDispatcher.filterEntriesForConsumer
3902+
if (Markers.isServerOnlyMarker(md)) {
3903+
return false;
3904+
} else if (md.hasTxnidMostBits() && md.hasTxnidLeastBits()) {
3905+
// Filter-out transaction aborted messages.
3906+
TxnID txnID = new TxnID(md.getTxnidMostBits(), md.getTxnidLeastBits());
3907+
return !isTxnAborted(txnID, (PositionImpl) entry.getPosition());
3908+
}
3909+
return true;
3910+
}, getMaxReadPosition())
3911+
.thenApply(position -> {
3912+
// Update lastDispatchablePosition to the given position
3913+
updateLastDispatchablePosition(position);
3914+
return position;
3915+
});
3916+
}
3917+
3918+
/**
3919+
* Update lastDispatchablePosition if the given position is greater than the lastDispatchablePosition.
3920+
*
3921+
* @param position
3922+
*/
3923+
public synchronized void updateLastDispatchablePosition(Position position) {
3924+
// Update lastDispatchablePosition to null if the position is null, fallback to
3925+
// ManagedLedgerImplUtils#asyncGetLastValidPosition
3926+
if (position == null) {
3927+
lastDispatchablePosition = null;
3928+
return;
3929+
}
3930+
3931+
PositionImpl position0 = (PositionImpl) position;
3932+
// If the position is greater than the maxReadPosition, ignore
3933+
if (position0.compareTo(getMaxReadPosition()) > 0) {
3934+
return;
3935+
}
3936+
// If the lastDispatchablePosition is null, set it to the position
3937+
if (lastDispatchablePosition == null) {
3938+
lastDispatchablePosition = position;
3939+
return;
3940+
}
3941+
// If the position is greater than the lastDispatchablePosition, update it
3942+
PositionImpl lastDispatchablePosition0 = (PositionImpl) lastDispatchablePosition;
3943+
if (position0.compareTo(lastDispatchablePosition0) > 0) {
3944+
lastDispatchablePosition = position;
3945+
}
39043946
}
39053947

39063948
@Override

Diff for: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java

+12-2
Original file line numberDiff line numberDiff line change
@@ -377,8 +377,11 @@ public boolean isTxnAborted(TxnID txnID, PositionImpl readPosition) {
377377

378378
@Override
379379
public void syncMaxReadPositionForNormalPublish(PositionImpl position, boolean isMarkerMessage) {
380-
if (!isMarkerMessage && maxReadPositionCallBack != null) {
381-
maxReadPositionCallBack.maxReadPositionMovedForward(null, position);
380+
if (!isMarkerMessage) {
381+
updateLastDispatchablePosition(position);
382+
if (maxReadPositionCallBack != null) {
383+
maxReadPositionCallBack.maxReadPositionMovedForward(null, position);
384+
}
382385
}
383386
}
384387

@@ -436,4 +439,11 @@ public long getCommittedTxnCount() {
436439
.filter(txnBuffer -> txnBuffer.status.equals(TxnStatus.COMMITTED))
437440
.count();
438441
}
442+
443+
// ThreadSafe
444+
private void updateLastDispatchablePosition(Position position) {
445+
if (topic instanceof PersistentTopic t) {
446+
t.updateLastDispatchablePosition(position);
447+
}
448+
}
439449
}

Diff for: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java

+11
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,11 @@ private void handleTransactionMessage(TxnID txnId, Position position) {
297297
}
298298
}
299299

300+
// ThreadSafe
301+
private void updateLastDispatchablePosition(Position position) {
302+
topic.updateLastDispatchablePosition(position);
303+
}
304+
300305
@Override
301306
public CompletableFuture<TransactionBufferReader> openTransactionBufferReader(TxnID txnID, long startSequenceId) {
302307
return null;
@@ -459,6 +464,8 @@ void removeTxnAndUpdateMaxReadPosition(TxnID txnID) {
459464
} else {
460465
updateMaxReadPosition((PositionImpl) topic.getManagedLedger().getLastConfirmedEntry(), false);
461466
}
467+
// Update the last dispatchable position to null if there is a TXN finished.
468+
updateLastDispatchablePosition(null);
462469
}
463470

464471
/**
@@ -523,6 +530,10 @@ public void syncMaxReadPositionForNormalPublish(PositionImpl position, boolean i
523530
}
524531
}
525532
}
533+
// If the message is a normal message, update the last dispatchable position.
534+
if (!isMarkerMessage) {
535+
updateLastDispatchablePosition(position);
536+
}
526537
}
527538

528539
@Override

Diff for: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java

+12-2
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,11 @@ public boolean isTxnAborted(TxnID txnID, PositionImpl readPosition) {
9999

100100
@Override
101101
public void syncMaxReadPositionForNormalPublish(PositionImpl position, boolean isMarkerMessage) {
102-
if (!isMarkerMessage && maxReadPositionCallBack != null) {
103-
maxReadPositionCallBack.maxReadPositionMovedForward(null, position);
102+
if (!isMarkerMessage) {
103+
updateLastDispatchablePosition(position);
104+
if (maxReadPositionCallBack != null) {
105+
maxReadPositionCallBack.maxReadPositionMovedForward(null, position);
106+
}
104107
}
105108
}
106109

@@ -148,4 +151,11 @@ public long getAbortedTxnCount() {
148151
public long getCommittedTxnCount() {
149152
return 0;
150153
}
154+
155+
// ThreadSafe
156+
private void updateLastDispatchablePosition(Position position) {
157+
if (topic instanceof PersistentTopic t) {
158+
t.updateLastDispatchablePosition(position);
159+
}
160+
}
151161
}

0 commit comments

Comments
 (0)