Skip to content

[improve] [ml] compress individual ack info to make maintain more records #21105

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.google.common.collect.Range;
import com.google.common.util.concurrent.RateLimiter;
import com.google.protobuf.InvalidProtocolBufferException;
import io.netty.util.concurrent.FastThreadLocal;
import java.time.Clock;
import java.util.ArrayDeque;
import java.util.ArrayList;
Expand All @@ -49,7 +50,6 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
Expand Down Expand Up @@ -91,9 +91,10 @@
import org.apache.bookkeeper.mledger.proto.MLDataFormats.LongProperty;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.MessageRange;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.StringProperty;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
Expand All @@ -117,6 +118,22 @@ public class ManagedCursorImpl implements ManagedCursor {

return 0;
};

private static final FastThreadLocal<MLDataFormats.NestedPositionInfo.Builder> NESTED_POSITION_BUILDER =
new FastThreadLocal<>() {
protected MLDataFormats.NestedPositionInfo.Builder initialValue() throws Exception {
return MLDataFormats.NestedPositionInfo
.newBuilder();
}
};

private static final FastThreadLocal<MLDataFormats.MessageRange.Builder> NESTED_RANGE_BUILDER =
new FastThreadLocal<>() {
protected MLDataFormats.MessageRange.Builder initialValue() throws Exception {
return MLDataFormats.MessageRange.newBuilder();
}
};

protected final BookKeeper bookkeeper;
protected final ManagedLedgerConfig config;
protected final ManagedLedgerImpl ledger;
Expand Down Expand Up @@ -2954,53 +2971,99 @@ private static List<StringProperty> buildStringPropertiesMap(Map<String, String>
return stringProperties;
}

private List<MLDataFormats.MessageRange> buildIndividualDeletedMessageRanges() {
@VisibleForTesting
List<MLDataFormats.MessageRange> buildIndividualDeletedMessageRanges() {
Comment on lines +2974 to +2975
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of adding "VisibleForTesting" annotation here, please extract this logic to a separate class which is tested separately. There's a lot of complexity here and this would be very hard to maintain in the future in this form. Smaller classes and smaller methods are a general preference in maintainable code.

lock.readLock().lock();
try {
if (individualDeletedMessages.isEmpty()) {
this.individualDeletedMessagesSerializedSize = 0;
return Collections.emptyList();
}

MLDataFormats.NestedPositionInfo.Builder nestedPositionBuilder = MLDataFormats.NestedPositionInfo
.newBuilder();

MLDataFormats.MessageRange.Builder messageRangeBuilder = MLDataFormats.MessageRange
.newBuilder();

AtomicInteger acksSerializedSize = new AtomicInteger(0);
List<MessageRange> rangeList = new ArrayList<>();

// Result and tools.
final List<MLDataFormats.MessageRange> rangeList = new ArrayList<>();
final MLDataFormats.NestedPositionInfo.Builder posBuilder = NESTED_POSITION_BUILDER.get();
final MLDataFormats.MessageRange.Builder rangeBuilder = NESTED_RANGE_BUILDER.get();
// Variables for the loop.
final MutableObject<long[]> preNode = new MutableObject<>();
preNode.setValue(new long[4]);
final MutableObject<long[]> startNodeForMerge = new MutableObject<>();
startNodeForMerge.setValue(new long[]{-1, -1});
final MutableObject<PositionImpl> nextPosOfPreNode = new MutableObject<>();
final MutableInt mergeNodeCounter = new MutableInt();
// Loop.
individualDeletedMessages.forEachRawRange((lowerKey, lowerValue, upperKey, upperValue) -> {
MLDataFormats.NestedPositionInfo lowerPosition = nestedPositionBuilder
.setLedgerId(lowerKey)
.setEntryId(lowerValue)
.build();

MLDataFormats.NestedPositionInfo upperPosition = nestedPositionBuilder
.setLedgerId(upperKey)
.setEntryId(upperValue)
.build();

MessageRange messageRange = messageRangeBuilder
.setLowerEndpoint(lowerPosition)
.setUpperEndpoint(upperPosition)
.build();

acksSerializedSize.addAndGet(messageRange.getSerializedSize());
rangeList.add(messageRange);
// If there is no entry between the previous node and the current node, the previous node is not the
// last node to merge.
// Else: the previous node is the last node to merge.
if (mergeNodeCounter.getValue() > 0) {
if (lowerKey > nextPosOfPreNode.getValue().getLedgerId()
|| lowerValue >= nextPosOfPreNode.getValue().getEntryId()) {
rangeList.add(buildMessageRangeForPersist(rangeBuilder, posBuilder,
startNodeForMerge.getValue()[0], startNodeForMerge.getValue()[1],
preNode.getValue()[2], preNode.getValue()[3]));
mergeNodeCounter.setValue(0);
startNodeForMerge.getValue()[0] = -1;
}
}
// If the node is the last node of a ledger: current node is not the last node to merge.
// Else: current node is the last node to merge.
PositionImpl nexPos = ledger.getNextValidPosition(PositionImpl.get(upperKey, upperValue));
if (nexPos.getLedgerId() > upperKey) {
if (startNodeForMerge.getValue()[0] == -1) {
// Is the first node to merge.
startNodeForMerge.getValue()[0] = lowerKey;
startNodeForMerge.getValue()[1] = lowerValue;
mergeNodeCounter.setValue(1);
} else {
// Is not the first node to merge.
mergeNodeCounter.increment();
}
} else {
if (mergeNodeCounter.getValue().equals(0)) {
rangeList.add(buildMessageRangeForPersist(rangeBuilder, posBuilder,
lowerKey, lowerValue, upperKey, upperValue));
} else {
rangeList.add(buildMessageRangeForPersist(rangeBuilder, posBuilder,
startNodeForMerge.getValue()[0], startNodeForMerge.getValue()[1], upperKey,
upperValue));
startNodeForMerge.getValue()[0] = -1;
}
mergeNodeCounter.setValue(0);
nextPosOfPreNode.setValue(null);
}

nextPosOfPreNode.setValue(nexPos);
preNode.getValue()[0] = lowerKey;
preNode.getValue()[1] = lowerValue;
preNode.getValue()[2] = upperKey;
preNode.getValue()[3] = upperValue;
return rangeList.size() <= config.getMaxUnackedRangesToPersist();
});

this.individualDeletedMessagesSerializedSize = acksSerializedSize.get();
individualDeletedMessages.resetDirtyKeys();
// If the last node still can be merged with next, it should be the last range.
if (mergeNodeCounter.getValue() > 0) {
rangeList.add(buildMessageRangeForPersist(rangeBuilder, posBuilder, startNodeForMerge.getValue()[0],
startNodeForMerge.getValue()[1], preNode.getValue()[2], preNode.getValue()[3]));
}
return rangeList;
} finally {
lock.readLock().unlock();
}
}

private MLDataFormats.MessageRange buildMessageRangeForPersist(MLDataFormats.MessageRange.Builder rangeBuilder,
MLDataFormats.NestedPositionInfo.Builder posBuilder,
long lowerKey, long lowerValue, long upperKey,
long upperValue) {
MLDataFormats.NestedPositionInfo lowerPosition = posBuilder
.setLedgerId(lowerKey)
.setEntryId(lowerValue)
.build();
MLDataFormats.NestedPositionInfo upperPosition = posBuilder
.setLedgerId(upperKey)
.setEntryId(upperValue)
.build();
return rangeBuilder
.setLowerEndpoint(lowerPosition)
.setUpperEndpoint(upperPosition)
.build();
}

private List<MLDataFormats.BatchedEntryDeletionIndexInfo> buildBatchEntryDeletionIndexInfoList() {
lock.readLock().lock();
try {
Expand Down
Loading