Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bouncy-castle/bc/LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -205,5 +205,5 @@
This projects includes binary packages with the following licenses:
Bouncy Castle License
* Bouncy Castle -- licenses/LICENSE-bouncycastle.txt
- org.bouncycastle-bcpkix-jdk18on-1.78.1.jar
- org.bouncycastle-bcpkix-jdk18on-1.81.jar
- org.bouncycastle-bcprov-jdk18on-1.78.1.jar
2 changes: 0 additions & 2 deletions bouncy-castle/bc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,11 @@
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk18on</artifactId>
<version>${bouncycastle.version}</version>
</dependency>

<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-ext-jdk18on</artifactId>
<version>${bouncycastle.version}</version>
</dependency>
</dependencies>

Expand Down
4 changes: 2 additions & 2 deletions bouncy-castle/bcfips/LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -205,5 +205,5 @@
This projects includes binary packages with the following licenses:
Bouncy Castle License
* Bouncy Castle -- licenses/LICENSE-bouncycastle.txt
- org.bouncycastle-bcpkix-fips-1.0.1.jar
- org.bouncycastle-bc-fips-1.0.1.jar
- org.bouncycastle-bcpkix-fips-1.0.7.jar
- org.bouncycastle-bc-fips-1.0.2.6.jar
4 changes: 2 additions & 2 deletions distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -619,9 +619,9 @@ Creative Commons Attribution License

Bouncy Castle License
* Bouncy Castle -- ../licenses/LICENSE-bouncycastle.txt
- org.bouncycastle-bcpkix-jdk18on-1.78.1.jar
- org.bouncycastle-bcpkix-jdk18on-1.81.jar
- org.bouncycastle-bcprov-jdk18on-1.78.1.jar
- org.bouncycastle-bcutil-jdk18on-1.78.1.jar
- org.bouncycastle-bcutil-jdk18on-1.81.jar

------------------------

Expand Down
4 changes: 2 additions & 2 deletions distribution/shell/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -474,9 +474,9 @@ Creative Commons Attribution License

Bouncy Castle License
* Bouncy Castle -- ../licenses/LICENSE-bouncycastle.txt
- bcpkix-jdk18on-1.78.1.jar
- bcpkix-jdk18on-1.81.jar
- bcprov-jdk18on-1.78.1.jar
- bcutil-jdk18on-1.78.1.jar
- bcutil-jdk18on-1.81.jar

------------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -912,7 +912,7 @@ public void asyncReadEntriesWithSkip(int numberOfEntriesToRead, long maxSizeByte
// Skip deleted entries.
skipCondition = skipCondition == null ? this::isMessageDeleted : skipCondition.or(this::isMessageDeleted);
OpReadEntry op =
OpReadEntry.create(this, readPosition, numOfEntriesToRead, callback, ctx, maxPosition, skipCondition);
OpReadEntry.create(this, readPosition, numOfEntriesToRead, callback, ctx, maxPosition, skipCondition, true);
ledger.asyncReadEntries(op);
}

Expand Down Expand Up @@ -1072,7 +1072,7 @@ public void asyncReadEntriesWithSkipOrWait(int maxEntries, long maxSizeBytes, Re
// Skip deleted entries.
skipCondition = skipCondition == null ? this::isMessageDeleted : skipCondition.or(this::isMessageDeleted);
OpReadEntry op = OpReadEntry.create(this, readPosition, numberOfEntriesToRead, callback,
ctx, maxPosition, skipCondition);
ctx, maxPosition, skipCondition, true);
int opReadId = op.id;
if (!WAITING_READ_OP_UPDATER.compareAndSet(this, null, op)) {
op.recycle();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1937,6 +1937,19 @@ void asyncReadEntries(OpReadEntry opReadEntry) {
return;
}

// Optimization: Check if all entries in this ledger have been deleted (acknowledged)
// If so, skip opening the ledger and move to the next one
if (opReadEntry.cursor != null && opReadEntry.skipOpenLedgerFullyAcked
&& isLedgerFullyAcked(ledgerId, ledgerInfo, opReadEntry.cursor)) {
log.info("[{}] All entries in ledger {} have been acked, skipping ledger opening", name, ledgerId);
// Move to the next ledger
Long nextLedgerId = ledgers.ceilingKey(ledgerId + 1);
opReadEntry.updateReadPosition(
PositionFactory.create(Objects.requireNonNullElseGet(nextLedgerId, () -> ledgerId + 1), 0));
opReadEntry.checkReadCompletion();
return;
}

// Get a ledger handle to read from
getLedgerHandle(ledgerId).thenAccept(ledger -> internalReadFromLedger(ledger, opReadEntry)).exceptionally(ex
-> {
Expand All @@ -1949,6 +1962,56 @@ void asyncReadEntries(OpReadEntry opReadEntry) {
}
}

/**
* Check if all entries in the specified ledger have been acknowledged by the cursor.
* This optimization helps avoid opening ledgers that have no unacked entries.
*
* @param ledgerId the ledger ID to check
* @param ledgerInfo the ledger information
* @param cursor the cursor reading from this ledger
* @return true if all entries in the ledger have been acknowledged, false otherwise
*/
private boolean isLedgerFullyAcked(long ledgerId, LedgerInfo ledgerInfo, ManagedCursorImpl cursor) {
if (ledgerInfo == null || ledgerInfo.getEntries() == 0) {
return true;
}

// Get the cursor's mark delete position
Position markDeletedPosition = cursor.getMarkDeletedPosition();
if (markDeletedPosition == null) {
return false;
}

// If the mark delete position is in a later ledger, then this ledger is fully acknowledged
if (markDeletedPosition.getLedgerId() > ledgerId) {
return true;
}

// Check if all entries in this ledger are individually deleted
if (markDeletedPosition.getLedgerId() <= ledgerId) {
final long lastEntryInLedger = ledgerInfo.getEntries() - 1;
Position startPosition = PositionFactory.create(ledgerId, 0);
if (markDeletedPosition.getLedgerId() == ledgerId) {
// The mark delete position represents the last acknowledged entry
// If it points to the last entry in the ledger, then the ledger is fully acknowledged
if (markDeletedPosition.getEntryId() >= lastEntryInLedger) {
return true;
}

startPosition = markDeletedPosition;
}

Range<Position> scanRange =
Range.closed(startPosition, PositionFactory.create(ledgerId, lastEntryInLedger));
long unackMessages = cursor.getNumberOfEntries(scanRange);
// All entries are individually deleted
return unackMessages == 0;
}

// If mark delete position is in an earlier ledger, this ledger is not consumed
return false;
}

public CompletableFuture<LedgerMetadata> getLedgerMetadata(long ledgerId) {
LedgerHandle currentLedger = this.currentLedger;
if (currentLedger != null && ledgerId == currentLedger.getId()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,12 @@ class OpReadEntry implements ReadEntriesCallback {
Position maxPosition;

Predicate<Position> skipCondition;
boolean skipOpenLedgerFullyAcked = false;

public static OpReadEntry create(ManagedCursorImpl cursor, Position readPositionRef, int count,
ReadEntriesCallback callback, Object ctx, Position maxPosition, Predicate<Position> skipCondition) {
ReadEntriesCallback callback, Object ctx, Position maxPosition,
Predicate<Position> skipCondition,
boolean skipOpenLedgerFullyAcked) {
OpReadEntry op = RECYCLER.get();
op.id = opReadIdGenerator.getAndIncrement();
op.readPosition = cursor.ledger.startReadOperationOnLedger(readPositionRef);
Expand All @@ -70,6 +73,7 @@ public static OpReadEntry create(ManagedCursorImpl cursor, Position readPosition
}
op.maxPosition = maxPosition;
op.skipCondition = skipCondition;
op.skipOpenLedgerFullyAcked = skipOpenLedgerFullyAcked;
op.ctx = ctx;
op.nextReadPosition = PositionFactory.create(op.readPosition);
return op;
Expand Down Expand Up @@ -247,6 +251,7 @@ public void recycle() {
nextReadPosition = null;
maxPosition = null;
skipCondition = null;
skipOpenLedgerFullyAcked = false;
recyclerHandle.recycle(this);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public void find() {
}
if (cursor.hasMoreEntries(searchPosition)) {
OpReadEntry opReadEntry = OpReadEntry.create(cursor, searchPosition, batchSize,
this, OpScan.this.ctx, null, null);
this, OpScan.this.ctx, null, null, false);
ledger.asyncReadEntries(opReadEntry);
} else {
callback.scanComplete(lastSeenPosition, ScanOutcome.COMPLETED, OpScan.this.ctx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@

import static org.apache.bookkeeper.mledger.impl.EntryCountEstimator.estimateEntryCountByBytesSize;
import static org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
Expand Down Expand Up @@ -4381,7 +4385,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {

// op readPosition is bigger than maxReadPosition
OpReadEntry opReadEntry = OpReadEntry.create(cursor, ledger.lastConfirmedEntry, 10, callback,
null, PositionFactory.create(lastPosition.getLedgerId(), -1), null);
null, PositionFactory.create(lastPosition.getLedgerId(), -1), null, true);
Field field = ManagedCursorImpl.class.getDeclaredField("readPosition");
field.setAccessible(true);
field.set(cursor, PositionFactory.EARLIEST);
Expand All @@ -4404,7 +4408,7 @@ public void testOpReadEntryRecycle() throws Exception {

@Cleanup final MockedStatic<OpReadEntry> mockedStaticOpReadEntry = Mockito.mockStatic(OpReadEntry.class);
mockedStaticOpReadEntry.when(() -> OpReadEntry.create(any(), any(), anyInt(), any(),
any(), any(), any())).thenAnswer(__ -> createOpReadEntry.get());
any(), any(), any(), anyBoolean())).thenAnswer(__ -> createOpReadEntry.get());

final ManagedLedgerConfig ledgerConfig = new ManagedLedgerConfig();
ledgerConfig.setNewEntriesCheckDelayInMillis(10);
Expand Down Expand Up @@ -5458,6 +5462,77 @@ public void close() {
}).toList(), IntStream.range(0, 10).mapToObj(i -> "msg-" + i).toList());
}

@Test
public void testSkipOpenLedgerFullyAcked() throws Exception {
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
managedLedgerConfig.setMaxEntriesPerLedger(10);
managedLedgerConfig.setMinimumRolloverTime(0, TimeUnit.MILLISECONDS);
ManagedLedger ledger = factory.open("testSkipOpenLedgerFullyAcked", managedLedgerConfig);
ManagedCursor cursor = ledger.openCursor("cursor");

List<Position> positions = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Position pos = ledger.addEntry(("entry-" + i).getBytes());
positions.add(pos);
}

((ManagedLedgerImpl) ledger).rollCurrentLedgerIfFull();

for (int i = 10; i < 20; i++) {
Position pos = ledger.addEntry(("entry-" + i).getBytes());
positions.add(pos);
}

ManagedLedgerImpl ledgerImpl = (ManagedLedgerImpl) ledger;
ManagedCursorImpl cursorImpl = (ManagedCursorImpl) cursor;

for (int i = 0; i < 5; i++) {
cursor.markDelete(positions.get(i));
}

for (int i = 5; i < 10; i++) {
cursor.delete(positions.get(i));
}

long firstLedgerId = positions.get(0).getLedgerId();
long secondLedgerId = positions.get(10).getLedgerId();
log.info("First ledger id is {}, Second ledger id is {}", firstLedgerId, secondLedgerId);

ManagedLedgerImpl spyLedger = spy(ledgerImpl);

Position readPosition = PositionFactory.create(firstLedgerId, 0);
CountDownLatch readLatch = new CountDownLatch(1);

ReadEntriesCallback callback = new ReadEntriesCallback() {
@Override
public void readEntriesComplete(List<Entry> entries, Object ctx) {
try {
if (!entries.isEmpty()) {
entries.forEach(Entry::release);
}
} finally {
readLatch.countDown();
}
}

@Override
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
log.error("Read failed", exception);
readLatch.countDown();
}
};

OpReadEntry opReadEntry = OpReadEntry.create(cursorImpl, readPosition, 5, callback, null, null, null, true);

spyLedger.asyncReadEntries(opReadEntry);

assertTrue(readLatch.await(10, TimeUnit.SECONDS));

verify(spyLedger, never()).getLedgerHandle(firstLedgerId);

ledger.close();
}

class TestPulsarMockBookKeeper extends PulsarMockBookKeeper {
Map<Long, Integer> ledgerErrors = new HashMap<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,7 @@ public void readEntriesComplete(List<Entry> entries, Object ctx) {
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {

}
}, null, maxPosition, null);
}, null, maxPosition, null, false);
Assert.assertEquals(opReadEntry.readPosition, position);
}

Expand Down
17 changes: 13 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,12 @@ flexible messaging model and an intuitive client API.</description>
<slf4j.version>2.0.13</slf4j.version>
<commons.collections4.version>4.4</commons.collections4.version>
<log4j2.version>2.23.1</log4j2.version>
<bouncycastle.version>1.78.1</bouncycastle.version>
<!-- bouncycastle dependencies aren't necessarily aligned -->
<bouncycastle.bcprov-jdk18on.version>1.78.1</bouncycastle.bcprov-jdk18on.version>
<bouncycastle.bcpkix-jdk18on.version>1.81</bouncycastle.bcpkix-jdk18on.version>
<bouncycastle.bcprov-ext-jdk18on.version>1.78.1</bouncycastle.bcprov-ext-jdk18on.version>
<bouncycastle.bcpkix-fips.version>1.0.7</bouncycastle.bcpkix-fips.version>
<bouncycastle.bc-fips.version>1.0.2.5</bouncycastle.bc-fips.version>
<bouncycastle.bc-fips.version>1.0.2.6</bouncycastle.bc-fips.version>
<jackson.version>2.17.2</jackson.version>
<fastutil.version>8.5.14</fastutil.version>
<reflections.version>0.10.2</reflections.version>
Expand Down Expand Up @@ -1013,13 +1016,19 @@ flexible messaging model and an intuitive client API.</description>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk18on</artifactId>
<version>${bouncycastle.version}</version>
<version>${bouncycastle.bcprov-jdk18on.version}</version>
</dependency>

<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk18on</artifactId>
<version>${bouncycastle.version}</version>
<version>${bouncycastle.bcpkix-jdk18on.version}</version>
</dependency>

<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-ext-jdk18on</artifactId>
<version>${bouncycastle.bcprov-ext-jdk18on.version}</version>
</dependency>

<dependency>
Expand Down
Loading