Skip to content

Commit 14dbfd2

Browse files
hangc0276zymap
authored andcommitted
Fix data lost when configured multiple ledger directories (#3329)
(cherry picked from commit 8a76703)
1 parent 9f63cf7 commit 14dbfd2

File tree

4 files changed

+194
-2
lines changed

4 files changed

+194
-2
lines changed

Diff for: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java

+5
Original file line numberDiff line numberDiff line change
@@ -1285,4 +1285,9 @@ public OfLong getListOfEntriesOfLedger(long ledgerId) throws IOException, NoLedg
12851285
}
12861286
}
12871287
}
1288+
1289+
@VisibleForTesting
1290+
public List<Journal> getJournals() {
1291+
return this.journals;
1292+
}
12881293
}

Diff for: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ void rollLog(LastLogMark lastMark) throws NoWritableLedgerDirException {
226226
* The last mark should first be max journal log id,
227227
* and then max log position in max journal log.
228228
*/
229-
void readLog() {
229+
public void readLog() {
230230
byte[] buff = new byte[16];
231231
ByteBuffer bb = ByteBuffer.wrap(buff);
232232
LogMark mark = new LogMark();

Diff for: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage
142142
private final long maxReadAheadBytesSize;
143143

144144
private final Counter flushExecutorTime;
145+
private final boolean singleLedgerDirs;
145146

146147
public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager,
147148
LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager,
@@ -172,6 +173,7 @@ public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager le
172173
this.writeCacheMaxSize = writeCacheSize;
173174
this.writeCache = new WriteCache(allocator, writeCacheMaxSize / 2);
174175
this.writeCacheBeingFlushed = new WriteCache(allocator, writeCacheMaxSize / 2);
176+
this.singleLedgerDirs = conf.getLedgerDirs().length == 1;
175177

176178
readCacheMaxSize = readCacheSize;
177179
this.readAheadCacheBatchSize = readAheadCacheBatchSize;
@@ -895,7 +897,9 @@ private void swapWriteCache() {
895897
public void flush() throws IOException {
896898
Checkpoint cp = checkpointSource.newCheckpoint();
897899
checkpoint(cp);
898-
checkpointSource.checkpointComplete(cp, true);
900+
if (singleLedgerDirs) {
901+
checkpointSource.checkpointComplete(cp, true);
902+
}
899903
}
900904

901905
@Override

Diff for: bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java

+183
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,21 @@
3131
import io.netty.buffer.Unpooled;
3232
import io.netty.util.ReferenceCountUtil;
3333
import java.io.File;
34+
import java.io.FileInputStream;
3435
import java.io.IOException;
36+
import java.nio.ByteBuffer;
3537
import java.util.List;
3638
import org.apache.bookkeeper.bookie.Bookie;
3739
import org.apache.bookkeeper.bookie.Bookie.NoEntryException;
3840
import org.apache.bookkeeper.bookie.BookieException;
3941
import org.apache.bookkeeper.bookie.BookieImpl;
42+
import org.apache.bookkeeper.bookie.CheckpointSource;
43+
import org.apache.bookkeeper.bookie.CheckpointSourceList;
4044
import org.apache.bookkeeper.bookie.DefaultEntryLogger;
4145
import org.apache.bookkeeper.bookie.EntryLocation;
4246
import org.apache.bookkeeper.bookie.LedgerDirsManager;
4347
import org.apache.bookkeeper.bookie.LedgerStorage;
48+
import org.apache.bookkeeper.bookie.LogMark;
4449
import org.apache.bookkeeper.bookie.TestBookieImpl;
4550
import org.apache.bookkeeper.bookie.storage.EntryLogger;
4651
import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -639,4 +644,182 @@ public void testStorageStateFlags() throws Exception {
639644

640645
storage = (DbLedgerStorage) new TestBookieImpl(conf).getLedgerStorage();
641646
}
647+
648+
@Test
649+
public void testMultiLedgerDirectoryCheckpoint() throws Exception {
650+
int gcWaitTime = 1000;
651+
File firstDir = new File(tmpDir, "dir1");
652+
File secondDir = new File(tmpDir, "dir2");
653+
ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
654+
conf.setGcWaitTime(gcWaitTime);
655+
conf.setProperty(DbLedgerStorage.WRITE_CACHE_MAX_SIZE_MB, 4);
656+
conf.setProperty(DbLedgerStorage.READ_AHEAD_CACHE_MAX_SIZE_MB, 4);
657+
conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
658+
conf.setLedgerDirNames(new String[] { firstDir.getCanonicalPath(), secondDir.getCanonicalPath() });
659+
660+
BookieImpl bookie = new TestBookieImpl(conf);
661+
ByteBuf entry1 = Unpooled.buffer(1024);
662+
entry1.writeLong(1); // ledger id
663+
entry1.writeLong(2); // entry id
664+
entry1.writeBytes("entry-1".getBytes());
665+
666+
bookie.getLedgerStorage().addEntry(entry1);
667+
// write one entry to first ledger directory and flush with logMark(1, 2),
668+
// only the first ledger directory should have lastMark
669+
bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(1, 2);
670+
((DbLedgerStorage) bookie.getLedgerStorage()).getLedgerStorageList().get(0).flush();
671+
672+
File firstDirMark = new File(firstDir + "/current", "lastMark");
673+
File secondDirMark = new File(secondDir + "/current", "lastMark");
674+
675+
// LedgerStorage flush won't trigger lastMark update due to two ledger directories configured
676+
try {
677+
readLogMark(firstDirMark);
678+
readLogMark(secondDirMark);
679+
fail();
680+
} catch (Exception e) {
681+
//
682+
}
683+
684+
// write the second entry to second leger directory and flush with log(4, 5),
685+
// the fist ledger directory's lastMark is (1, 2) and the second ledger directory's lastMark is (4, 5);
686+
ByteBuf entry2 = Unpooled.buffer(1024);
687+
entry2.writeLong(2); // ledger id
688+
entry2.writeLong(1); // entry id
689+
entry2.writeBytes("entry-2".getBytes());
690+
691+
bookie.getLedgerStorage().addEntry(entry2);
692+
// write one entry to first ledger directory and flush with logMark(1, 2),
693+
// only the first ledger directory should have lastMark
694+
bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(4, 5);
695+
((DbLedgerStorage) bookie.getLedgerStorage()).getLedgerStorageList().get(1).flush();
696+
697+
// LedgerStorage flush won't trigger lastMark update due to two ledger directories configured
698+
try {
699+
readLogMark(firstDirMark);
700+
readLogMark(secondDirMark);
701+
fail();
702+
} catch (Exception e) {
703+
//
704+
}
705+
706+
// The dbLedgerStorage flush also won't trigger lastMark update due to two ledger directories configured.
707+
bookie.getLedgerStorage().flush();
708+
try {
709+
readLogMark(firstDirMark);
710+
readLogMark(secondDirMark);
711+
fail();
712+
} catch (Exception e) {
713+
//
714+
}
715+
716+
// trigger checkpoint simulate SyncThread do checkpoint.
717+
CheckpointSource checkpointSource = new CheckpointSourceList(bookie.getJournals());
718+
bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(7, 8);
719+
CheckpointSource.Checkpoint checkpoint = checkpointSource.newCheckpoint();
720+
checkpointSource.checkpointComplete(checkpoint, false);
721+
722+
try {
723+
LogMark firstLogMark = readLogMark(firstDirMark);
724+
LogMark secondLogMark = readLogMark(secondDirMark);
725+
assertEquals(7, firstLogMark.getLogFileId());
726+
assertEquals(8, firstLogMark.getLogFileOffset());
727+
assertEquals(7, secondLogMark.getLogFileId());
728+
assertEquals(8, secondLogMark.getLogFileOffset());
729+
} catch (Exception e) {
730+
fail();
731+
}
732+
733+
// test replay journal lastMark, to make sure we get the right LastMark position
734+
bookie.getJournals().get(0).getLastLogMark().readLog();
735+
LogMark logMark = bookie.getJournals().get(0).getLastLogMark().getCurMark();
736+
assertEquals(7, logMark.getLogFileId());
737+
assertEquals(8, logMark.getLogFileOffset());
738+
}
739+
740+
private LogMark readLogMark(File file) throws IOException {
741+
byte[] buff = new byte[16];
742+
ByteBuffer bb = ByteBuffer.wrap(buff);
743+
LogMark mark = new LogMark();
744+
try (FileInputStream fis = new FileInputStream(file)) {
745+
int bytesRead = fis.read(buff);
746+
if (bytesRead != 16) {
747+
throw new IOException("Couldn't read enough bytes from lastMark."
748+
+ " Wanted " + 16 + ", got " + bytesRead);
749+
}
750+
}
751+
bb.clear();
752+
mark.readLogMark(bb);
753+
754+
return mark;
755+
}
756+
757+
@Test
758+
public void testSingleLedgerDirectoryCheckpoint() throws Exception {
759+
int gcWaitTime = 1000;
760+
File ledgerDir = new File(tmpDir, "dir");
761+
ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
762+
conf.setGcWaitTime(gcWaitTime);
763+
conf.setProperty(DbLedgerStorage.WRITE_CACHE_MAX_SIZE_MB, 4);
764+
conf.setProperty(DbLedgerStorage.READ_AHEAD_CACHE_MAX_SIZE_MB, 4);
765+
conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
766+
conf.setLedgerDirNames(new String[] { ledgerDir.getCanonicalPath() });
767+
768+
BookieImpl bookie = new TestBookieImpl(conf);
769+
ByteBuf entry1 = Unpooled.buffer(1024);
770+
entry1.writeLong(1); // ledger id
771+
entry1.writeLong(2); // entry id
772+
entry1.writeBytes("entry-1".getBytes());
773+
bookie.getLedgerStorage().addEntry(entry1);
774+
775+
bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(1, 2);
776+
((DbLedgerStorage) bookie.getLedgerStorage()).getLedgerStorageList().get(0).flush();
777+
778+
File ledgerDirMark = new File(ledgerDir + "/current", "lastMark");
779+
try {
780+
LogMark logMark = readLogMark(ledgerDirMark);
781+
assertEquals(1, logMark.getLogFileId());
782+
assertEquals(2, logMark.getLogFileOffset());
783+
} catch (Exception e) {
784+
fail();
785+
}
786+
787+
ByteBuf entry2 = Unpooled.buffer(1024);
788+
entry2.writeLong(2); // ledger id
789+
entry2.writeLong(1); // entry id
790+
entry2.writeBytes("entry-2".getBytes());
791+
792+
bookie.getLedgerStorage().addEntry(entry2);
793+
// write one entry to first ledger directory and flush with logMark(1, 2),
794+
// only the first ledger directory should have lastMark
795+
bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(4, 5);
796+
797+
bookie.getLedgerStorage().flush();
798+
try {
799+
LogMark logMark = readLogMark(ledgerDirMark);
800+
assertEquals(4, logMark.getLogFileId());
801+
assertEquals(5, logMark.getLogFileOffset());
802+
} catch (Exception e) {
803+
fail();
804+
}
805+
806+
CheckpointSource checkpointSource = new CheckpointSourceList(bookie.getJournals());
807+
bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(7, 8);
808+
CheckpointSource.Checkpoint checkpoint = checkpointSource.newCheckpoint();
809+
checkpointSource.checkpointComplete(checkpoint, false);
810+
811+
try {
812+
LogMark firstLogMark = readLogMark(ledgerDirMark);
813+
assertEquals(7, firstLogMark.getLogFileId());
814+
assertEquals(8, firstLogMark.getLogFileOffset());
815+
} catch (Exception e) {
816+
fail();
817+
}
818+
819+
// test replay journal lastMark, to make sure we get the right LastMark position
820+
bookie.getJournals().get(0).getLastLogMark().readLog();
821+
LogMark logMark = bookie.getJournals().get(0).getLastLogMark().getCurMark();
822+
assertEquals(7, logMark.getLogFileId());
823+
assertEquals(8, logMark.getLogFileOffset());
824+
}
642825
}

0 commit comments

Comments
 (0)