Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,8 @@ ManagedCursor openCursor(String name, InitialPosition initialPosition, Map<Strin
ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionName) throws ManagedLedgerException;
ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionName, InitialPosition initialPosition,
boolean isReadCompacted) throws ManagedLedgerException;
ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionName, InitialPosition initialPosition,
boolean isReadCompacted, boolean isReadReverse) throws ManagedLedgerException;

/**
* Delete a ManagedCursor asynchronously.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ public interface Position {
*/
Position getNext();

/**
* Get the position of the entry previous to this one. The returned position might point to a non-existing entry
*
* @return the position of the previous logical entry
*/
Position getPrevious();

long getLedgerId();

long getEntryId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public class ManagedCursorImpl implements ManagedCursor {
// Maintain the deletion status for batch messages
// (ledgerId, entryId) -> deletion indexes
private final ConcurrentSkipListMap<PositionImpl, BitSetRecyclable> batchDeletedIndexes;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
protected final ReadWriteLock lock = new ReentrantReadWriteLock();

private RateLimiter markDeleteLimiter;
// The cursor is considered "dirty" when there are mark-delete updates that are only applied in memory,
Expand Down Expand Up @@ -2401,8 +2401,9 @@ private void updateLastMarkDeleteEntryToLatest(final PositionImpl newPosition,
List<Entry> filterReadEntries(List<Entry> entries) {
lock.readLock().lock();
try {
Range<PositionImpl> entriesRange = Range.closed((PositionImpl) entries.get(0).getPosition(),
(PositionImpl) entries.get(entries.size() - 1).getPosition());
Range<PositionImpl> entriesRange = Range.closed(
(PositionImpl) entries.get(entries.size() - 1).getPosition(),
(PositionImpl) entries.get(0).getPosition());
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Filtering entries {} - alreadyDeleted: {}", ledger.getName(), name, entriesRange,
individualDeletedMessages);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static java.lang.Math.min;
import static org.apache.bookkeeper.mledger.impl.PositionImpl.LATEST;
import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -129,6 +130,7 @@
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.bookkeeper.net.BookieId;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
Expand Down Expand Up @@ -1094,17 +1096,24 @@ public ManagedCursor newNonDurableCursor(Position startCursorPosition) throws Ma
startCursorPosition,
"non-durable-cursor-" + UUID.randomUUID());
}

@Override
public ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionName)
throws ManagedLedgerException {
return newNonDurableCursor(startPosition, subscriptionName, InitialPosition.Latest, false);
return newNonDurableCursor(startPosition, subscriptionName, InitialPosition.Latest, false, false);
}

@Override
public ManagedCursor newNonDurableCursor(Position startCursorPosition, String cursorName,
public ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionName,
InitialPosition initialPosition, boolean isReadCompacted)
throws ManagedLedgerException {
return newNonDurableCursor(startPosition, subscriptionName, InitialPosition.Latest, isReadCompacted, false);
}

@Override
public ManagedCursor newNonDurableCursor(Position startCursorPosition, String cursorName,
InitialPosition initialPosition, boolean isReadCompacted,
boolean isReadReverse)
throws ManagedLedgerException {
Objects.requireNonNull(cursorName, "cursor name can't be null");
checkManagedLedgerIsOpen();
checkFenced();
Expand All @@ -1118,7 +1127,7 @@ public ManagedCursor newNonDurableCursor(Position startCursorPosition, String cu
}

NonDurableCursorImpl cursor = new NonDurableCursorImpl(bookKeeper, config, this, cursorName,
(PositionImpl) startCursorPosition, initialPosition, isReadCompacted);
(PositionImpl) startCursorPosition, initialPosition, isReadCompacted, isReadReverse);
cursor.setActive();

log.info("[{}] Opened new cursor: {}", name, cursor);
Expand Down Expand Up @@ -1849,9 +1858,12 @@ void asyncReadEntries(OpReadEntry opReadEntry) {
} else {
LedgerInfo ledgerInfo = ledgers.get(ledgerId);
if (ledgerInfo == null || ledgerInfo.getEntries() == 0) {
if (opReadEntry.readPosition.getLedgerId() - 1 < 0) {
return;
}
// Cursor is pointing to an empty ledger, there's no need to try opening it. Skip this ledger and
// move to the next one
opReadEntry.updateReadPosition(new PositionImpl(opReadEntry.readPosition.getLedgerId() + 1, 0));
opReadEntry.updateReadPosition(new PositionImpl(opReadEntry.readPosition.getLedgerId() - 1, -1));
opReadEntry.checkReadCompletion();
return;
}
Expand Down Expand Up @@ -2009,11 +2021,13 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry)
return;
}
// Perform the read
long firstEntry = opReadEntry.readPosition.getEntryId();
long lastEntryInLedger;
long firstEntry = 0;
//long lastEntryInLedger = opReadEntry.readPosition.getEntryId();
long lastEntryInLedger = ledger.getLastAddConfirmed();

PositionImpl lastPosition = lastConfirmedEntry;

/*
if (ledger.getId() == lastPosition.getLedgerId()) {
// For the current ledger, we only give read visibility to the last entry we have received a confirmation in
// the managed ledger layer
Expand All @@ -2027,6 +2041,7 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry)
if (ledger.getId() == opReadEntry.maxPosition.getLedgerId()) {
lastEntryInLedger = min(opReadEntry.maxPosition.getEntryId(), lastEntryInLedger);
}
*/

if (firstEntry > lastEntryInLedger) {
if (log.isDebugEnabled()) {
Expand All @@ -2037,9 +2052,9 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry)
if (currentLedger == null || ledger.getId() != currentLedger.getId()) {
// Cursor was placed past the end of one ledger, move it to the
// beginning of the next ledger
Long nextLedgerId = ledgers.ceilingKey(ledger.getId() + 1);
Long nextLedgerId = ledgers.ceilingKey(ledger.getId() - 1);
if (nextLedgerId != null) {
opReadEntry.updateReadPosition(new PositionImpl(nextLedgerId, 0));
opReadEntry.updateReadPosition(new PositionImpl(nextLedgerId, -1));
} else {
opReadEntry.updateReadPosition(new PositionImpl(ledger.getId() + 1, 0));
}
Expand All @@ -2051,7 +2066,9 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry)
return;
}

long lastEntry = min(firstEntry + opReadEntry.getNumberOfEntriesToRead() - 1, lastEntryInLedger);
//long lastEntry = min(firstEntry + opReadEntry.getNumberOfEntriesToRead() - 1, lastEntryInLedger);
//long lastEntry = 0;
long lastEntry = lastEntryInLedger;

// Filer out and skip unnecessary read entry
if (opReadEntry.skipCondition != null) {
Expand Down Expand Up @@ -2088,7 +2105,7 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry)
log.debug("[{}] Reading entries from ledger {} - first={} last={}", name, ledger.getId(), firstEntry,
lastEntry);
}
asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry, opReadEntry.ctx);
asyncReadEntry(ledger, lastEntry, firstEntry, opReadEntry, opReadEntry.ctx);
}

protected void asyncReadEntry(ReadHandle ledger, PositionImpl position, ReadEntryCallback callback, Object ctx) {
Expand Down Expand Up @@ -3573,9 +3590,29 @@ public Long getNextValidLedger(long ledgerId) {
return ledgers.ceilingKey(ledgerId + 1);
}

public PositionImpl getPreviousValidPosition(final PositionImpl position) {
return getValidPositionBeforeSkippedEntries(position, 1);
}

public PositionImpl getValidPositionBeforeSkippedEntries(final PositionImpl position, int skippedEntryNum) {
PositionImpl skippedPosition = position.getPositionBeforeEntries(skippedEntryNum);
while (!isValidPosition(skippedPosition)) {
Long previousLedgerId = ledgers.ceilingKey(skippedPosition.getLedgerId() - 1);
// TODO: this should not happen
/*
if (previousLedgerId == null) {
return lastConfirmedEntry.getNext();
}
*/
skippedPosition = PositionImpl.get(previousLedgerId, LATEST.entryId);
}
return skippedPosition;
}

public PositionImpl getNextValidPosition(final PositionImpl position) {
return getValidPositionAfterSkippedEntries(position, 1);
}

public PositionImpl getValidPositionAfterSkippedEntries(final PositionImpl position, int skippedEntryNum) {
PositionImpl skippedPosition = position.getPositionAfterEntries(skippedEntryNum);
while (!isValidPosition(skippedPosition)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,15 @@
public class NonDurableCursorImpl extends ManagedCursorImpl {

private final boolean readCompacted;
private final boolean readReverse;
private PositionImpl initialPosition;

NonDurableCursorImpl(BookKeeper bookkeeper, ManagedLedgerConfig config, ManagedLedgerImpl ledger, String cursorName,
PositionImpl startCursorPosition, CommandSubscribe.InitialPosition initialPosition,
boolean isReadCompacted) {
boolean isReadCompacted, boolean readReverse) {
super(bookkeeper, config, ledger, cursorName);
this.readCompacted = isReadCompacted;
this.readReverse = readReverse;

// Compare with "latest" position marker by using only the ledger id. Since the C++ client is using 48bits to
// store the entryId, it's not able to pass a Long.max() as entryId. In this case there's no point to require
Expand All @@ -56,20 +59,42 @@ public class NonDurableCursorImpl extends ManagedCursorImpl {
}
} else if (startCursorPosition.getLedgerId() == PositionImpl.EARLIEST.getLedgerId()) {
// Start from invalid ledger to read from first available entry
recoverCursor(ledger.getPreviousPosition(ledger.getFirstPosition()));
if (readReverse) {
recoverReverseCursor(ledger.getPreviousPosition(ledger.getFirstPosition()));
} else {
recoverCursor(ledger.getPreviousPosition(ledger.getFirstPosition()));
}
} else {
// Since the cursor is positioning on the mark-delete position, we need to take 1 step back from the desired
// read-position
recoverCursor(startCursorPosition);
if (readReverse) {
recoverReverseCursor(startCursorPosition);
} else {
recoverCursor(startCursorPosition);
}
}
STATE_UPDATER.set(this, State.Open);
log.info("[{}] Created non-durable cursor read-position={} mark-delete-position={}", ledger.getName(),
readPosition, markDeletePosition);
}

void initializeCursorPosition(Pair<PositionImpl, Long> lastPositionCounter) {
readPosition = ledger.getNextValidPosition(lastPositionCounter.getLeft());
ledger.onCursorReadPositionUpdated(this, readPosition);
markDeletePosition = lastPositionCounter.getLeft();
lastMarkDeleteEntry = new MarkDeleteEntry(markDeletePosition, getProperties(), null, null);
persistentMarkDeletePosition = null;
inProgressMarkDeletePersistPosition = null;

// Initialize the counter such that the difference between the messages written on the ML and the
// messagesConsumed is 0, to ensure the initial backlog count is 0.
messagesConsumedCounter = lastPositionCounter.getRight();
}

private void recoverCursor(PositionImpl mdPosition) {
Pair<PositionImpl, Long> lastEntryAndCounter = ledger.getLastPositionAndCounter();
this.readPosition = isReadCompacted() ? mdPosition.getNext() : ledger.getNextValidPosition(mdPosition);
this.initialPosition = this.readPosition;
markDeletePosition = mdPosition;

// Initialize the counter such that the difference between the messages written on the ML and the
Expand All @@ -84,6 +109,51 @@ private void recoverCursor(PositionImpl mdPosition) {
}
}

private void recoverReverseCursor(PositionImpl mdPosition) {
Pair<PositionImpl, Long> firstEntryAndCounter = ledger.getFirstPositionAndCounter();
this.readPosition = isReadCompacted() ? mdPosition.getPrevious() : ledger.getPreviousPosition(mdPosition);
this.initialPosition = this.readPosition;
markDeletePosition = mdPosition;

// Initialize the counter such that the difference between the messages written on the ML and the
// messagesConsumed is equal to the current backlog (negated).
if (null != this.readPosition) {
messagesConsumedCounter = readPosition.compareTo(firstEntryAndCounter.getLeft()) > 0
? ledger.getNumberOfEntries(Range.closed(firstEntryAndCounter.getLeft(), readPosition)) : 0;
} else {
log.warn("Recovered a non-durable cursor from position {} but didn't find a valid read position {}",
mdPosition, readPosition);
}
}

@Override
public long getNumberOfEntriesInBacklog(boolean isPrecise) {
if (log.isDebugEnabled()) {
log.debug("[{}] Consumer {} cursor ml-entries: {} -- deleted-counter: {} other counters: mdPos {} rdPos {}",
ledger.getName(), this.getName(), this.initialPosition, messagesConsumedCounter, markDeletePosition,
readPosition);
}
if (isPrecise) {
if (readReverse) {
return getNumberOfEntries(Range.open(ledger.getFirstPosition(), markDeletePosition));
} else {
return getNumberOfEntries(Range.openClosed(markDeletePosition, ledger.getLastPosition()));
}
}

// TODO: Let's store backlog number info in RAM. Currently, we get it precisely.
if (readReverse) {
return getNumberOfEntries(Range.open(ledger.getFirstPosition(), markDeletePosition));
} else {
long backlog = ManagedLedgerImpl.ENTRIES_ADDED_COUNTER_UPDATER.get(ledger) - messagesConsumedCounter;
if (backlog < 0) {
// In some case the counters get incorrect values, fall back to the precise backlog count
backlog = getNumberOfEntries(Range.openClosed(markDeletePosition, ledger.getLastPosition()));
}
return backlog;
}
}

@Override
public boolean isDurable() {
return false;
Expand Down Expand Up @@ -130,16 +200,29 @@ public void rewind() {
// we couldn't reset the read position to the next valid position of the original topic.
// Otherwise, the remaining data in the compacted ledger will be skipped.
if (!readCompacted) {
super.rewind();
lock.writeLock().lock();
try {
PositionImpl newReadPosition = ledger.getPreviousValidPosition(markDeletePosition);
PositionImpl oldReadPosition = readPosition;

log.info("[{}-{}] Rewind from {} to {}", ledger.getName(), this.getName(), oldReadPosition,
newReadPosition);

readPosition = newReadPosition;
ledger.onCursorReadPositionUpdated(NonDurableCursorImpl.this, newReadPosition);
} finally {
lock.writeLock().unlock();
}
} else {
readPosition = markDeletePosition.getNext();
}
}

@Override
public synchronized String toString() {
return MoreObjects.toStringHelper(this).add("ledger", ledger.getName()).add("ackPos", markDeletePosition)
.add("readPos", readPosition).toString();
return MoreObjects.toStringHelper(this).add("ledger", ledger.getName())
.add("ackPos", markDeletePosition).add("readPos", readPosition)
.add("readReverse", readReverse).toString();
}

private static final Logger log = LoggerFactory.getLogger(NonDurableCursorImpl.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class OpReadEntry implements ReadEntriesCallback {
private List<Entry> entries;
private PositionImpl nextReadPosition;
PositionImpl maxPosition;
PositionImpl minPosition;

Predicate<PositionImpl> skipCondition;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,15 @@ public PositionImpl getNext() {
}
}

@Override
public PositionImpl getPrevious() {
if (entryId > 0) {
return PositionImpl.get(ledgerId, entryId - 1);
} else {
return PositionImpl.get(ledgerId, 0);
}
}

/**
* Position after moving entryNum messages,
* if entryNum < 1, then return the current position.
Expand All @@ -109,6 +118,21 @@ public PositionImpl getPositionAfterEntries(int entryNum) {
}
}

/**
* Position after moving entryNum previous messages,
* if entryNum < 1, then return the current position.
* */
public PositionImpl getPositionBeforeEntries(int entryNum) {
if (entryNum < 1) {
return this;
}
if (entryId < 0) {
return PositionImpl.get(ledgerId, entryNum + 1);
} else {
return PositionImpl.get(ledgerId, entryId - entryNum);
}
}

/**
* String representation of virtual cursor - LedgerId:EntryId.
*/
Expand Down
Loading