21
21
import com .google .common .annotations .VisibleForTesting ;
22
22
import java .util .Objects ;
23
23
import java .util .Optional ;
24
+ import java .util .SortedMap ;
24
25
import java .util .concurrent .atomic .AtomicIntegerFieldUpdater ;
25
26
import java .util .concurrent .atomic .LongAdder ;
26
27
import org .apache .bookkeeper .mledger .AsyncCallbacks .FindEntryCallback ;
30
31
import org .apache .bookkeeper .mledger .ManagedLedgerException .LedgerNotExistException ;
31
32
import org .apache .bookkeeper .mledger .ManagedLedgerException .NonRecoverableLedgerException ;
32
33
import org .apache .bookkeeper .mledger .Position ;
34
+ import org .apache .bookkeeper .mledger .impl .ManagedCursorImpl ;
33
35
import org .apache .bookkeeper .mledger .impl .ManagedLedgerImpl ;
34
36
import org .apache .bookkeeper .mledger .impl .PositionImpl ;
37
+ import org .apache .bookkeeper .mledger .proto .MLDataFormats ;
35
38
import org .apache .pulsar .client .impl .MessageImpl ;
36
39
import org .apache .pulsar .common .api .proto .CommandSubscribe .SubType ;
37
40
import org .apache .pulsar .common .protocol .Commands ;
@@ -78,7 +81,9 @@ public boolean expireMessages(int messageTTLInSeconds) {
78
81
if (expirationCheckInProgressUpdater .compareAndSet (this , FALSE , TRUE )) {
79
82
log .info ("[{}][{}] Starting message expiry check, ttl= {} seconds" , topicName , subName ,
80
83
messageTTLInSeconds );
81
-
84
+ // First filter the entire Ledger reached TTL based on the Ledger closing time to avoid client clock skew
85
+ checkExpiryByLedgerClosureTime (cursor , messageTTLInSeconds );
86
+ // Some part of entries in active Ledger may have reached TTL, so we need to continue searching.
82
87
cursor .asyncFindNewestMatching (ManagedCursor .FindPositionConstraint .SearchActiveEntries , entry -> {
83
88
try {
84
89
long entryTimestamp = Commands .getEntryTimestamp (entry .getDataBuffer ());
@@ -99,6 +104,34 @@ public boolean expireMessages(int messageTTLInSeconds) {
99
104
return false ;
100
105
}
101
106
}
107
+ private void checkExpiryByLedgerClosureTime (ManagedCursor cursor , int messageTTLInSeconds ) {
108
+ if (messageTTLInSeconds <= 0 ) {
109
+ return ;
110
+ }
111
+ if (cursor instanceof ManagedCursorImpl managedCursor ) {
112
+ ManagedLedgerImpl managedLedger = (ManagedLedgerImpl ) managedCursor .getManagedLedger ();
113
+ Position deletedPosition = managedCursor .getMarkDeletedPosition ();
114
+ SortedMap <Long , MLDataFormats .ManagedLedgerInfo .LedgerInfo > ledgerInfoSortedMap =
115
+ managedLedger .getLedgersInfo ().subMap (deletedPosition .getLedgerId (), true ,
116
+ managedLedger .getLedgersInfo ().lastKey (), true );
117
+ MLDataFormats .ManagedLedgerInfo .LedgerInfo info = null ;
118
+ for (MLDataFormats .ManagedLedgerInfo .LedgerInfo ledgerInfo : ledgerInfoSortedMap .values ()) {
119
+ if (!ledgerInfo .hasTimestamp () || !MessageImpl .isEntryExpired (messageTTLInSeconds ,
120
+ ledgerInfo .getTimestamp ())) {
121
+ break ;
122
+ }
123
+ info = ledgerInfo ;
124
+ }
125
+ if (info != null && info .getLedgerId () > -1 ) {
126
+ PositionImpl position = PositionImpl .get (info .getLedgerId (), info .getEntries () - 1 );
127
+ if (((PositionImpl ) managedLedger .getLastConfirmedEntry ()).compareTo (position ) < 0 ) {
128
+ findEntryComplete (managedLedger .getLastConfirmedEntry (), null );
129
+ } else {
130
+ findEntryComplete (position , null );
131
+ }
132
+ }
133
+ }
134
+ }
102
135
103
136
public boolean expireMessages (Position messagePosition ) {
104
137
// If it's beyond last position of this topic, do nothing.
0 commit comments