Skip to content

Commit a2c3aee

Browse files
zymaphangc0276
authored andcommitted
Allow to set max operation numbers in a single rocksdb batch (#4044)
--- In rocksdb, the memory usage is related to the batch size. The more operations in a single batch, the more memory is consumed. Expose the configuration to allow control the batch size. (cherry picked from commit ad0ed21)
1 parent 2f3ab1c commit a2c3aee

File tree

4 files changed

+91
-2
lines changed

4 files changed

+91
-2
lines changed

Diff for: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorage.java

+4
Original file line numberDiff line numberDiff line change
@@ -168,5 +168,9 @@ public interface Batch extends Closeable {
168168
void clear();
169169

170170
void flush() throws IOException;
171+
172+
default int batchCount() {
173+
return -1;
174+
}
171175
}
172176
}

Diff for: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageRocksDB.java

+26-2
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,8 @@ public class KeyValueStorageRocksDB implements KeyValueStorage {
6969

7070
private final ReadOptions optionCache;
7171
private final ReadOptions optionDontCache;
72-
7372
private final WriteBatch emptyBatch;
73+
private final int writeBatchMaxSize;
7474

7575
private static final String ROCKSDB_LOG_PATH = "dbStorage_rocksDB_logPath";
7676
private static final String ROCKSDB_LOG_LEVEL = "dbStorage_rocksDB_logLevel";
@@ -206,6 +206,7 @@ public KeyValueStorageRocksDB(String basePath, String subPath, DbConfigType dbCo
206206

207207
optionCache.setFillCache(true);
208208
optionDontCache.setFillCache(false);
209+
this.writeBatchMaxSize = conf.getMaxOperationNumbersInSingleRocksDBBatch();
209210
}
210211

211212
@Override
@@ -403,21 +404,29 @@ public long count() throws IOException {
403404

404405
@Override
405406
public Batch newBatch() {
406-
return new RocksDBBatch();
407+
return new RocksDBBatch(writeBatchMaxSize);
407408
}
408409

409410
private class RocksDBBatch implements Batch {
410411
private final WriteBatch writeBatch = new WriteBatch();
412+
private final int batchSize;
413+
private int batchCount = 0;
414+
415+
RocksDBBatch(int batchSize) {
416+
this.batchSize = batchSize;
417+
}
411418

412419
@Override
413420
public void close() {
414421
writeBatch.close();
422+
batchCount = 0;
415423
}
416424

417425
@Override
418426
public void put(byte[] key, byte[] value) throws IOException {
419427
try {
420428
writeBatch.put(key, value);
429+
countBatchAndFlushIfNeeded();
421430
} catch (RocksDBException e) {
422431
throw new IOException("Failed to flush RocksDB batch", e);
423432
}
@@ -427,6 +436,7 @@ public void put(byte[] key, byte[] value) throws IOException {
427436
public void remove(byte[] key) throws IOException {
428437
try {
429438
writeBatch.delete(key);
439+
countBatchAndFlushIfNeeded();
430440
} catch (RocksDBException e) {
431441
throw new IOException("Failed to flush RocksDB batch", e);
432442
}
@@ -435,17 +445,31 @@ public void remove(byte[] key) throws IOException {
435445
@Override
436446
public void clear() {
437447
writeBatch.clear();
448+
batchCount = 0;
438449
}
439450

440451
@Override
441452
public void deleteRange(byte[] beginKey, byte[] endKey) throws IOException {
442453
try {
443454
writeBatch.deleteRange(beginKey, endKey);
455+
countBatchAndFlushIfNeeded();
444456
} catch (RocksDBException e) {
445457
throw new IOException("Failed to flush RocksDB batch", e);
446458
}
447459
}
448460

461+
private void countBatchAndFlushIfNeeded() throws IOException {
462+
if (++batchCount >= batchSize) {
463+
flush();
464+
clear();
465+
}
466+
}
467+
468+
@Override
469+
public int batchCount() {
470+
return batchCount;
471+
}
472+
449473
@Override
450474
public void flush() throws IOException {
451475
try {

Diff for: bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java

+24
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,9 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
310310
protected static final String AUTHORIZED_ROLES = "authorizedRoles";
311311
protected static final String ROCKSDB_DELETE_ENTRIES_BATCH_SIZE = "rocksDBDeleteEntriesBatchSize";
312312

313+
protected static final String MAX_OPERATION_NUMBERS_IN_SINGLE_ROCKSDB_WRITE_BATCH =
314+
"maxOperationNumbersInSingleRocksdbWriteBatch";
315+
313316
protected static final String SKIP_REPLAY_JOURNAL_INVALID_RECORD = "skipReplayJournalInvalidRecord";
314317

315318
/**
@@ -3664,4 +3667,25 @@ public ServerConfiguration setRocksDBDeleteEntriesBatchSize(int rocksDBDeleteEnt
36643667
this.setProperty(ROCKSDB_DELETE_ENTRIES_BATCH_SIZE, rocksDBDeleteEntriesBatchSize);
36653668
return this;
36663669
}
3670+
3671+
/**
3672+
* Set the max operation numbers in a single rocksdb write batch.
3673+
* The rocksdb write batch is related to the memory usage. If the batch is too large, it will cause the OOM.
3674+
*
3675+
* @param maxNumbersInSingleRocksDBBatch
3676+
* @return
3677+
*/
3678+
public ServerConfiguration setOperationMaxNumbersInSingleRocksDBWriteBatch(int maxNumbersInSingleRocksDBBatch) {
3679+
this.setProperty(MAX_OPERATION_NUMBERS_IN_SINGLE_ROCKSDB_WRITE_BATCH, maxNumbersInSingleRocksDBBatch);
3680+
return this;
3681+
}
3682+
3683+
/**
3684+
* Get the max operation numbers in a single rocksdb write batch.
3685+
*
3686+
* @return
3687+
*/
3688+
public int getMaxOperationNumbersInSingleRocksDBBatch() {
3689+
return getInt(MAX_OPERATION_NUMBERS_IN_SINGLE_ROCKSDB_WRITE_BATCH, 100000);
3690+
}
36673691
}

Diff for: bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageTest.java

+37
Original file line numberDiff line numberDiff line change
@@ -172,4 +172,41 @@ public void simple() throws Exception {
172172
db.close();
173173
FileUtils.deleteDirectory(tmpDir);
174174
}
175+
176+
@Test
177+
public void testBatch() throws Exception {
178+
179+
configuration.setOperationMaxNumbersInSingleRocksDBWriteBatch(5);
180+
181+
File tmpDir = Files.createTempDirectory("junitTemporaryFolder").toFile();
182+
Files.createDirectory(Paths.get(tmpDir.toString(), "subDir"));
183+
184+
KeyValueStorage db = storageFactory.newKeyValueStorage(tmpDir.toString(), "subDir", DbConfigType.Huge,
185+
configuration);
186+
187+
assertEquals(null, db.getFloor(toArray(3)));
188+
assertEquals(0, db.count());
189+
190+
Batch batch = db.newBatch();
191+
assertEquals(0, batch.batchCount());
192+
193+
batch.put(toArray(1), toArray(1));
194+
batch.put(toArray(2), toArray(2));
195+
assertEquals(2, batch.batchCount());
196+
197+
batch.put(toArray(3), toArray(3));
198+
batch.put(toArray(4), toArray(4));
199+
batch.put(toArray(5), toArray(5));
200+
assertEquals(0, batch.batchCount());
201+
batch.put(toArray(6), toArray(6));
202+
assertEquals(1, batch.batchCount());
203+
204+
batch.flush();
205+
assertEquals(1, batch.batchCount());
206+
batch.close();
207+
assertEquals(0, batch.batchCount());
208+
209+
db.close();
210+
FileUtils.deleteDirectory(tmpDir);
211+
}
175212
}

0 commit comments

Comments
 (0)