13
13
*/
14
14
package io .streamnative .pulsar .handlers .kop .storage ;
15
15
16
+ import com .google .common .annotations .VisibleForTesting ;
16
17
import com .google .common .collect .Maps ;
17
18
import java .util .ArrayList ;
18
19
import java .util .HashMap ;
22
23
import java .util .TreeMap ;
23
24
import java .util .concurrent .CompletableFuture ;
24
25
import java .util .concurrent .Executor ;
26
+ import java .util .concurrent .atomic .AtomicLong ;
25
27
import lombok .Getter ;
26
28
import lombok .extern .slf4j .Slf4j ;
27
29
import org .apache .bookkeeper .util .SafeRunnable ;
@@ -47,21 +49,27 @@ public class ProducerStateManager {
47
49
private final ProducerStateManagerSnapshotBuffer producerStateManagerSnapshotBuffer ;
48
50
49
51
private final int kafkaTxnProducerStateTopicSnapshotIntervalSeconds ;
52
+ private final int kafkaTxnPurgeAbortedTxnIntervalSeconds ;
50
53
51
54
private volatile long mapEndOffset = -1 ;
52
55
53
56
private long lastSnapshotTime ;
57
+ private long lastPurgeAbortedTxnTime ;
54
58
59
+ private volatile long abortedTxnsPurgeOffset = -1 ;
55
60
56
61
public ProducerStateManager (String topicPartition ,
57
62
String kafkaTopicUUID ,
58
63
ProducerStateManagerSnapshotBuffer producerStateManagerSnapshotBuffer ,
59
- int kafkaTxnProducerStateTopicSnapshotIntervalSeconds ) {
64
+ int kafkaTxnProducerStateTopicSnapshotIntervalSeconds ,
65
+ int kafkaTxnPurgeAbortedTxnIntervalSeconds ) {
60
66
this .topicPartition = topicPartition ;
61
67
this .kafkaTopicUUID = kafkaTopicUUID ;
62
68
this .producerStateManagerSnapshotBuffer = producerStateManagerSnapshotBuffer ;
63
69
this .kafkaTxnProducerStateTopicSnapshotIntervalSeconds = kafkaTxnProducerStateTopicSnapshotIntervalSeconds ;
70
+ this .kafkaTxnPurgeAbortedTxnIntervalSeconds = kafkaTxnPurgeAbortedTxnIntervalSeconds ;
64
71
this .lastSnapshotTime = System .currentTimeMillis ();
72
+ this .lastPurgeAbortedTxnTime = System .currentTimeMillis ();
65
73
}
66
74
67
75
public CompletableFuture <Void > recover (PartitionLog partitionLog , Executor executor ) {
@@ -159,6 +167,34 @@ void maybeTakeSnapshot(Executor executor) {
159
167
takeSnapshot (executor );
160
168
}
161
169
170
+ void updateAbortedTxnsPurgeOffset (long abortedTxnsPurgeOffset ) {
171
+ if (log .isDebugEnabled ()) {
172
+ log .debug ("{} updateAbortedTxnsPurgeOffset {}" , topicPartition , abortedTxnsPurgeOffset );
173
+ }
174
+ if (abortedTxnsPurgeOffset < 0 ) {
175
+ return ;
176
+ }
177
+ this .abortedTxnsPurgeOffset = abortedTxnsPurgeOffset ;
178
+ }
179
+
180
+ long maybePurgeAbortedTx () {
181
+ if (mapEndOffset == -1 ) {
182
+ return 0 ;
183
+ }
184
+ long now = System .currentTimeMillis ();
185
+ long deltaFromLast = (now - lastPurgeAbortedTxnTime ) / 1000 ;
186
+ if (deltaFromLast / 1000 <= kafkaTxnPurgeAbortedTxnIntervalSeconds ) {
187
+ return 0 ;
188
+ }
189
+ lastPurgeAbortedTxnTime = now ;
190
+ return executePurgeAbortedTx ();
191
+ }
192
+
193
+ @ VisibleForTesting
194
+ long executePurgeAbortedTx () {
195
+ return purgeAbortedTxns (abortedTxnsPurgeOffset );
196
+ }
197
+
162
198
private ProducerStateManagerSnapshot getProducerStateManagerSnapshot () {
163
199
ProducerStateManagerSnapshot snapshot ;
164
200
synchronized (abortedIndexList ) {
@@ -270,15 +306,39 @@ public void completeTxn(CompletedTxn completedTxn) {
270
306
}
271
307
}
272
308
309
+ public boolean hasSomeAbortedTransactions () {
310
+ return !abortedIndexList .isEmpty ();
311
+ }
312
+
313
+ public long purgeAbortedTxns (long offset ) {
314
+ AtomicLong count = new AtomicLong ();
315
+ synchronized (abortedIndexList ) {
316
+ abortedIndexList .removeIf (tx -> {
317
+ boolean toRemove = tx .lastOffset () < offset ;
318
+ if (toRemove ) {
319
+ log .info ("Transaction {} can be removed (lastOffset {} < {})" , tx , tx .lastOffset (), offset );
320
+ count .incrementAndGet ();
321
+ }
322
+ return toRemove ;
323
+ });
324
+ if (!abortedIndexList .isEmpty ()) {
325
+ log .info ("There are still {} aborted tx on {}" , abortedIndexList .size (), topicPartition );
326
+ }
327
+ }
328
+ return count .get ();
329
+ }
330
+
273
331
public List <FetchResponse .AbortedTransaction > getAbortedIndexList (long fetchOffset ) {
274
- List <FetchResponse .AbortedTransaction > abortedTransactions = new ArrayList <>();
275
- for (AbortedTxn abortedTxn : abortedIndexList ) {
276
- if (abortedTxn .lastOffset () >= fetchOffset ) {
277
- abortedTransactions .add (
278
- new FetchResponse .AbortedTransaction (abortedTxn .producerId (), abortedTxn .firstOffset ()));
332
+ synchronized (abortedIndexList ) {
333
+ List <FetchResponse .AbortedTransaction > abortedTransactions = new ArrayList <>();
334
+ for (AbortedTxn abortedTxn : abortedIndexList ) {
335
+ if (abortedTxn .lastOffset () >= fetchOffset ) {
336
+ abortedTransactions .add (
337
+ new FetchResponse .AbortedTransaction (abortedTxn .producerId (), abortedTxn .firstOffset ()));
338
+ }
279
339
}
340
+ return abortedTransactions ;
280
341
}
281
- return abortedTransactions ;
282
342
}
283
343
284
344
public void handleMissingDataBeforeRecovery (long minOffset , long snapshotOffset ) {
0 commit comments