Skip to content

Commit cb3d743

Browse files
BewareMyPowerDemogorgon314
authored andcommitted
[improve][broker] Reduce the CPU pressure from the transaction buffer in rolling restarts (#23062)
(cherry picked from commit 40c8c23)
1 parent adc71a2 commit cb3d743

10 files changed

+486
-336
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -919,7 +919,7 @@ public void start() throws PulsarServerException {
919919
MLTransactionMetadataStoreProvider.initBufferedWriterMetrics(getAdvertisedAddress());
920920
MLPendingAckStoreProvider.initBufferedWriterMetrics(getAdvertisedAddress());
921921

922-
this.transactionBufferSnapshotServiceFactory = new TransactionBufferSnapshotServiceFactory(getClient());
922+
this.transactionBufferSnapshotServiceFactory = new TransactionBufferSnapshotServiceFactory(this);
923923

924924
this.transactionTimer =
925925
new HashedWheelTimer(new DefaultThreadFactory("pulsar-transaction-timer"));

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java

+12-3
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,16 @@
2222
import java.util.concurrent.CompletableFuture;
2323
import java.util.concurrent.ConcurrentHashMap;
2424
import java.util.concurrent.atomic.AtomicLong;
25+
import lombok.Getter;
2526
import lombok.extern.slf4j.Slf4j;
27+
import org.apache.pulsar.broker.PulsarServerException;
28+
import org.apache.pulsar.broker.PulsarService;
2629
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
2730
import org.apache.pulsar.broker.systopic.SystemTopicClient;
2831
import org.apache.pulsar.broker.systopic.SystemTopicClientBase;
29-
import org.apache.pulsar.client.api.PulsarClient;
32+
import org.apache.pulsar.broker.transaction.buffer.impl.TableView;
3033
import org.apache.pulsar.client.api.PulsarClientException;
34+
import org.apache.pulsar.client.impl.PulsarClientImpl;
3135
import org.apache.pulsar.common.events.EventType;
3236
import org.apache.pulsar.common.naming.NamespaceName;
3337
import org.apache.pulsar.common.naming.TopicName;
@@ -42,6 +46,8 @@ public class SystemTopicTxnBufferSnapshotService<T> {
4246
protected final EventType systemTopicType;
4347

4448
private final ConcurrentHashMap<NamespaceName, ReferenceCountedWriter<T>> refCountedWriterMap;
49+
@Getter
50+
private final TableView<T> tableView;
4551

4652
// The class ReferenceCountedWriter will maintain the reference count,
4753
// when the reference count decrement to 0, it will be removed from writerFutureMap, the writer will be closed.
@@ -95,13 +101,16 @@ public synchronized void release() {
95101

96102
}
97103

98-
public SystemTopicTxnBufferSnapshotService(PulsarClient client, EventType systemTopicType,
99-
Class<T> schemaType) {
104+
public SystemTopicTxnBufferSnapshotService(PulsarService pulsar, EventType systemTopicType,
105+
Class<T> schemaType) throws PulsarServerException {
106+
final var client = (PulsarClientImpl) pulsar.getClient();
100107
this.namespaceEventsSystemTopicFactory = new NamespaceEventsSystemTopicFactory(client);
101108
this.systemTopicType = systemTopicType;
102109
this.schemaType = schemaType;
103110
this.clients = new ConcurrentHashMap<>();
104111
this.refCountedWriterMap = new ConcurrentHashMap<>();
112+
this.tableView = new TableView<>(this::createReader,
113+
client.getConfiguration().getOperationTimeoutMs(), pulsar.getExecutor());
105114
}
106115

107116
public CompletableFuture<SystemTopicClient.Reader<T>> createReader(TopicName topicName) {

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransactionBufferSnapshotServiceFactory.java

+8-18
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,15 @@
1818
*/
1919
package org.apache.pulsar.broker.service;
2020

21+
import lombok.Getter;
22+
import org.apache.pulsar.broker.PulsarServerException;
23+
import org.apache.pulsar.broker.PulsarService;
2124
import org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
2225
import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexes;
2326
import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotSegment;
24-
import org.apache.pulsar.client.api.PulsarClient;
2527
import org.apache.pulsar.common.events.EventType;
2628

29+
@Getter
2730
public class TransactionBufferSnapshotServiceFactory {
2831

2932
private SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshot> txnBufferSnapshotService;
@@ -33,29 +36,16 @@ public class TransactionBufferSnapshotServiceFactory {
3336

3437
private SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshotIndexes> txnBufferSnapshotIndexService;
3538

36-
public TransactionBufferSnapshotServiceFactory(PulsarClient pulsarClient) {
37-
this.txnBufferSnapshotSegmentService = new SystemTopicTxnBufferSnapshotService<>(pulsarClient,
39+
public TransactionBufferSnapshotServiceFactory(PulsarService pulsar) throws PulsarServerException {
40+
this.txnBufferSnapshotSegmentService = new SystemTopicTxnBufferSnapshotService<>(pulsar,
3841
EventType.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS,
3942
TransactionBufferSnapshotSegment.class);
40-
this.txnBufferSnapshotIndexService = new SystemTopicTxnBufferSnapshotService<>(pulsarClient,
43+
this.txnBufferSnapshotIndexService = new SystemTopicTxnBufferSnapshotService<>(pulsar,
4144
EventType.TRANSACTION_BUFFER_SNAPSHOT_INDEXES, TransactionBufferSnapshotIndexes.class);
42-
this.txnBufferSnapshotService = new SystemTopicTxnBufferSnapshotService<>(pulsarClient,
45+
this.txnBufferSnapshotService = new SystemTopicTxnBufferSnapshotService<>(pulsar,
4346
EventType.TRANSACTION_BUFFER_SNAPSHOT, TransactionBufferSnapshot.class);
4447
}
4548

46-
public SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshotIndexes> getTxnBufferSnapshotIndexService() {
47-
return this.txnBufferSnapshotIndexService;
48-
}
49-
50-
public SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshotSegment>
51-
getTxnBufferSnapshotSegmentService() {
52-
return this.txnBufferSnapshotSegmentService;
53-
}
54-
55-
public SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshot> getTxnBufferSnapshotService() {
56-
return this.txnBufferSnapshotService;
57-
}
58-
5949
public void close() throws Exception {
6050
if (this.txnBufferSnapshotIndexService != null) {
6151
this.txnBufferSnapshotIndexService.close();

pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java

+19-54
Original file line numberDiff line numberDiff line change
@@ -21,25 +21,18 @@
2121
import java.util.ArrayList;
2222
import java.util.List;
2323
import java.util.concurrent.CompletableFuture;
24-
import java.util.concurrent.TimeUnit;
25-
import java.util.concurrent.TimeoutException;
2624
import lombok.extern.slf4j.Slf4j;
2725
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
2826
import org.apache.bookkeeper.mledger.impl.PositionImpl;
2927
import org.apache.commons.collections4.map.LinkedMap;
30-
import org.apache.pulsar.broker.service.BrokerServiceException;
3128
import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService.ReferenceCountedWriter;
3229
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
33-
import org.apache.pulsar.broker.systopic.SystemTopicClient;
3430
import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
3531
import org.apache.pulsar.broker.transaction.buffer.metadata.AbortTxnMetadata;
3632
import org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
37-
import org.apache.pulsar.client.api.Message;
3833
import org.apache.pulsar.client.api.transaction.TxnID;
39-
import org.apache.pulsar.client.impl.PulsarClientImpl;
4034
import org.apache.pulsar.common.naming.TopicName;
4135
import org.apache.pulsar.common.policies.data.TransactionBufferStats;
42-
import org.apache.pulsar.common.util.FutureUtil;
4336

4437
@Slf4j
4538
public class SingleSnapshotAbortedTxnProcessorImpl implements AbortedTxnProcessor {
@@ -90,48 +83,27 @@ public boolean checkAbortedTransaction(TxnID txnID) {
9083
return aborts.containsKey(txnID);
9184
}
9285

93-
private long getSystemClientOperationTimeoutMs() throws Exception {
94-
PulsarClientImpl pulsarClient = (PulsarClientImpl) topic.getBrokerService().getPulsar().getClient();
95-
return pulsarClient.getConfiguration().getOperationTimeoutMs();
96-
}
97-
9886
@Override
9987
public CompletableFuture<PositionImpl> recoverFromSnapshot() {
100-
return topic.getBrokerService().getPulsar().getTransactionBufferSnapshotServiceFactory()
101-
.getTxnBufferSnapshotService()
102-
.createReader(TopicName.get(topic.getName())).thenComposeAsync(reader -> {
103-
try {
104-
PositionImpl startReadCursorPosition = null;
105-
while (reader.hasMoreEvents()) {
106-
Message<TransactionBufferSnapshot> message = reader.readNextAsync()
107-
.get(getSystemClientOperationTimeoutMs(), TimeUnit.MILLISECONDS);
108-
if (topic.getName().equals(message.getKey())) {
109-
TransactionBufferSnapshot transactionBufferSnapshot = message.getValue();
110-
if (transactionBufferSnapshot != null) {
111-
handleSnapshot(transactionBufferSnapshot);
112-
startReadCursorPosition = PositionImpl.get(
113-
transactionBufferSnapshot.getMaxReadPositionLedgerId(),
114-
transactionBufferSnapshot.getMaxReadPositionEntryId());
115-
}
116-
}
117-
}
118-
return CompletableFuture.completedFuture(startReadCursorPosition);
119-
} catch (TimeoutException ex) {
120-
Throwable t = FutureUtil.unwrapCompletionException(ex);
121-
String errorMessage = String.format("[%s] Transaction buffer recover fail by read "
122-
+ "transactionBufferSnapshot timeout!", topic.getName());
123-
log.error(errorMessage, t);
124-
return FutureUtil.failedFuture(
125-
new BrokerServiceException.ServiceUnitNotReadyException(errorMessage, t));
126-
} catch (Exception ex) {
127-
log.error("[{}] Transaction buffer recover fail when read "
128-
+ "transactionBufferSnapshot!", topic.getName(), ex);
129-
return FutureUtil.failedFuture(ex);
130-
} finally {
131-
closeReader(reader);
132-
}
133-
}, topic.getBrokerService().getPulsar().getTransactionExecutorProvider()
134-
.getExecutor(this));
88+
final var future = new CompletableFuture<PositionImpl>();
89+
final var pulsar = topic.getBrokerService().getPulsar();
90+
pulsar.getTransactionExecutorProvider().getExecutor(this).execute(() -> {
91+
try {
92+
final var snapshot = pulsar.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotService()
93+
.getTableView().readLatest(topic.getName());
94+
if (snapshot != null) {
95+
handleSnapshot(snapshot);
96+
final var startReadCursorPosition = new PositionImpl(snapshot.getMaxReadPositionLedgerId(),
97+
snapshot.getMaxReadPositionEntryId());
98+
future.complete(startReadCursorPosition);
99+
} else {
100+
future.complete(null);
101+
}
102+
} catch (Throwable e) {
103+
future.completeExceptionally(e);
104+
}
105+
});
106+
return future;
135107
}
136108

137109
@Override
@@ -190,13 +162,6 @@ public synchronized CompletableFuture<Void> closeAsync() {
190162
return CompletableFuture.completedFuture(null);
191163
}
192164

193-
private void closeReader(SystemTopicClient.Reader<TransactionBufferSnapshot> reader) {
194-
reader.closeAsync().exceptionally(e -> {
195-
log.error("[{}]Transaction buffer reader close error!", topic.getName(), e);
196-
return null;
197-
});
198-
}
199-
200165
private void handleSnapshot(TransactionBufferSnapshot snapshot) {
201166
if (snapshot.getAborts() != null) {
202167
snapshot.getAborts().forEach(abortTxnMetadata ->

0 commit comments

Comments
 (0)