Skip to content

Commit a3f5289

Browse files
Technoboy-gaoran10
authored andcommitted
Fix lost message issue due to ledger rollover. (#14664)
(cherry picked from commit ad2cc2d)
1 parent 5258f12 commit a3f5289

File tree

5 files changed

+49
-10
lines changed

5 files changed

+49
-10
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -762,8 +762,8 @@ private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) {
762762
}
763763
} else if (state == State.ClosedLedger) {
764764
// No ledger and no pending operations. Create a new ledger
765-
log.info("[{}] Creating a new ledger", name);
766765
if (STATE_UPDATER.compareAndSet(this, State.ClosedLedger, State.CreatingLedger)) {
766+
log.info("[{}] Creating a new ledger", name);
767767
this.lastLedgerCreationInitiationTimestamp = System.currentTimeMillis();
768768
mbean.startDataLedgerCreateOp();
769769
asyncCreateLedger(bookKeeper, config, digestType, this, Collections.emptyMap());
@@ -1588,8 +1588,8 @@ synchronized void ledgerClosed(final LedgerHandle lh) {
15881588
}
15891589

15901590
synchronized void createLedgerAfterClosed() {
1591-
if(isNeededCreateNewLedgerAfterCloseLedger()) {
1592-
log.info("[{}] Creating a new ledger", name);
1591+
if (isNeededCreateNewLedgerAfterCloseLedger()) {
1592+
log.info("[{}] Creating a new ledger after closed", name);
15931593
STATE_UPDATER.set(this, State.CreatingLedger);
15941594
this.lastLedgerCreationInitiationTimestamp = System.currentTimeMillis();
15951595
mbean.startDataLedgerCreateOp();
@@ -1612,8 +1612,8 @@ boolean isNeededCreateNewLedgerAfterCloseLedger() {
16121612
@Override
16131613
public void rollCurrentLedgerIfFull() {
16141614
log.info("[{}] Start checking if current ledger is full", name);
1615-
if (currentLedgerEntries > 0 && currentLedgerIsFull()) {
1616-
STATE_UPDATER.set(this, State.ClosingLedger);
1615+
if (currentLedgerEntries > 0 && currentLedgerIsFull()
1616+
&& STATE_UPDATER.compareAndSet(this, State.LedgerOpened, State.ClosingLedger)) {
16171617
currentLedger.asyncClose(new AsyncCallback.CloseCallback() {
16181618
@Override
16191619
public void closeComplete(int rc, LedgerHandle lh, Object o) {

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java

+3
Original file line numberDiff line numberDiff line change
@@ -2238,6 +2238,9 @@ void testFindNewestMatchingAfterLedgerRollover() throws Exception {
22382238
// roll a new ledger
22392239
int numLedgersBefore = ledger.getLedgersInfo().size();
22402240
ledger.getConfig().setMaxEntriesPerLedger(1);
2241+
Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
2242+
stateUpdater.setAccessible(true);
2243+
stateUpdater.set(ledger, ManagedLedgerImpl.State.LedgerOpened);
22412244
ledger.rollCurrentLedgerIfFull();
22422245
Awaitility.await().atMost(20, TimeUnit.SECONDS)
22432246
.until(() -> ledger.getLedgersInfo().size() > numLedgersBefore);

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java

+33-4
Original file line numberDiff line numberDiff line change
@@ -1936,6 +1936,9 @@ public void testDeletionAfterLedgerClosedAndRetention() throws Exception {
19361936
c1.skipEntries(1, IndividualDeletedEntries.Exclude);
19371937
c2.skipEntries(1, IndividualDeletedEntries.Exclude);
19381938
// let current ledger close
1939+
Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
1940+
stateUpdater.setAccessible(true);
1941+
stateUpdater.set(ml, ManagedLedgerImpl.State.LedgerOpened);
19391942
ml.rollCurrentLedgerIfFull();
19401943
// let retention expire
19411944
Thread.sleep(1500);
@@ -2205,6 +2208,9 @@ public void testGetPositionAfterN() throws Exception {
22052208
managedCursor.markDelete(positionMarkDelete);
22062209

22072210
//trigger ledger rollover and wait for the new ledger created
2211+
Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
2212+
stateUpdater.setAccessible(true);
2213+
stateUpdater.set(managedLedger, ManagedLedgerImpl.State.LedgerOpened);
22082214
managedLedger.rollCurrentLedgerIfFull();
22092215
Awaitility.await().untilAsserted(() -> assertEquals(managedLedger.getLedgersInfo().size(), 3));
22102216
assertEquals(5, managedLedger.getLedgersInfoAsList().get(0).getEntries());
@@ -3063,7 +3069,7 @@ public void testManagedLedgerRollOverIfFull() throws Exception {
30633069
ledger.addEntry(new byte[1024 * 1024]);
30643070
}
30653071

3066-
Assert.assertEquals(ledger.getLedgersInfoAsList().size(), msgNum / 2);
3072+
Awaitility.await().untilAsserted(() -> Assert.assertEquals(ledger.getLedgersInfoAsList().size(), msgNum / 2));
30673073
List<Entry> entries = cursor.readEntries(msgNum);
30683074
Assert.assertEquals(msgNum, entries.size());
30693075

@@ -3074,9 +3080,12 @@ public void testManagedLedgerRollOverIfFull() throws Exception {
30743080

30753081
// all the messages have benn acknowledged
30763082
// and all the ledgers have been removed except the last ledger
3077-
Thread.sleep(1000);
3078-
Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 1);
3079-
Assert.assertEquals(ledger.getTotalSize(), 0);
3083+
Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
3084+
stateUpdater.setAccessible(true);
3085+
stateUpdater.set(ledger, ManagedLedgerImpl.State.LedgerOpened);
3086+
ledger.rollCurrentLedgerIfFull();
3087+
Awaitility.await().untilAsserted(() -> Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 1));
3088+
Awaitility.await().untilAsserted(() -> Assert.assertEquals(ledger.getTotalSize(), 0));
30803089
}
30813090

30823091
@Test
@@ -3094,6 +3103,26 @@ public void testLedgerReachMaximumRolloverTime() throws Exception {
30943103
.until(() -> firstLedgerId != ml.addEntry("test".getBytes()).getLedgerId());
30953104
}
30963105

3106+
@Test
3107+
public void testLedgerNotRolloverWithoutOpenState() throws Exception {
3108+
ManagedLedgerConfig config = new ManagedLedgerConfig();
3109+
config.setMaxEntriesPerLedger(2);
3110+
3111+
ManagedLedgerImpl ml = spy((ManagedLedgerImpl)factory.open("ledger-not-rollover-without-open-state", config));
3112+
ml.addEntry("test1".getBytes()).getLedgerId();
3113+
long ledgerId2 = ml.addEntry("test2".getBytes()).getLedgerId();
3114+
Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
3115+
stateUpdater.setAccessible(true);
3116+
// Set state to CreatingLedger to avoid rollover
3117+
stateUpdater.set(ml, ManagedLedgerImpl.State.CreatingLedger);
3118+
ml.rollCurrentLedgerIfFull();
3119+
Field currentLedger = ManagedLedgerImpl.class.getDeclaredField("currentLedger");
3120+
currentLedger.setAccessible(true);
3121+
LedgerHandle lh = (LedgerHandle) currentLedger.get(ml);
3122+
Awaitility.await()
3123+
.until(() -> ledgerId2 == lh.getId());
3124+
}
3125+
30973126
@Test
30983127
public void testExpiredLedgerDeletionAfterManagedLedgerRestart() throws Exception {
30993128
ManagedLedgerConfig config = new ManagedLedgerConfig();

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java

+4
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pulsar.broker.service;
2020

21+
import java.lang.reflect.Field;
2122
import java.time.Duration;
2223
import java.util.concurrent.TimeUnit;
2324
import lombok.Cleanup;
@@ -98,6 +99,9 @@ public void testCurrentLedgerRolloverIfFull() throws Exception {
9899
});
99100

100101
// trigger a ledger rollover
102+
Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
103+
stateUpdater.setAccessible(true);
104+
stateUpdater.set(managedLedger, ManagedLedgerImpl.State.LedgerOpened);
101105
managedLedger.rollCurrentLedgerIfFull();
102106

103107
// the last ledger will be closed and removed and we have one ledger for empty

pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -164,8 +164,11 @@ public void testRecoverSequenceId(boolean isUseManagedLedgerProperties) throws E
164164
ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) field.get(mlTransactionLog);
165165
Position position = managedLedger.getLastConfirmedEntry();
166166
if (isUseManagedLedgerProperties) {
167+
Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
168+
stateUpdater.setAccessible(true);
169+
stateUpdater.set(managedLedger, ManagedLedgerImpl.State.LedgerOpened);
170+
managedLedger.rollCurrentLedgerIfFull();
167171
Awaitility.await().until(() -> {
168-
managedLedger.rollCurrentLedgerIfFull();
169172
return !managedLedger.ledgerExists(position.getLedgerId());
170173
});
171174
}

0 commit comments

Comments
 (0)