diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index 7eb2039999..b48f4cff72 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -1569,6 +1569,25 @@ public class ConfigOptions { "The max size of the consumed memory for RocksDB batch write, " + "will flush just based on item count if this config set to 0."); + public static final ConfigOption KV_PRE_WRITE_BUFFER_MEMORY_POOL_SIZE = + key("kv.preWriteBuffer.memory-pool-size") + .memoryType() + .defaultValue(MemorySize.parse("2gb")) + .withDescription( + "The max size of the off-heap memory pool for pre-write buffer. " + + "All buckets will share the same buffer pool in one tabletServer. " + + "If the used memory exceeds this config, PreWriteBufferFullException will be thrown. " + + "The default value is 2GB."); + + public static final ConfigOption KV_PRE_WRITE_BUFFER_MEMORY_POOL_SIZ_PER_BUCKET = + key("kv.preWriteBuffer.memory-pool-size-per-bucket") + .memoryType() + .defaultValue(MemorySize.parse("200mb")) + .withDescription( + "The max size of the off-heap memory buffer pool for pre-write buffer of each table bucket. " + + "If there is a table bucket who use the buffer size exceeds this config, " + + "PreWriteBufferFullException will be thrown. The default value is 200MB."); + // -------------------------------------------------------------------------- // Provided configurable ColumnFamilyOptions within Fluss // -------------------------------------------------------------------------- diff --git a/fluss-common/src/main/java/org/apache/fluss/exception/PreWriteBufferFullException.java b/fluss-common/src/main/java/org/apache/fluss/exception/PreWriteBufferFullException.java new file mode 100644 index 0000000000..91d3d38480 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/exception/PreWriteBufferFullException.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.exception; + +/** Pre-write buffer is full. */ +public class PreWriteBufferFullException extends RetriableException { + private static final long serialVersionUID = 1L; + + public PreWriteBufferFullException(String message) { + super(message); + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java index 4008c6b19e..86261a1a17 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java +++ b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java @@ -124,6 +124,12 @@ public class MetricNames { "preWriteBufferTruncateAsDuplicatedPerSecond"; public static final String KV_PRE_WRITE_BUFFER_TRUNCATE_AS_ERROR_RATE = "preWriteBufferTruncateAsErrorPerSecond"; + public static final String KV_PRE_WRITE_BUFFER_MEMORY_POOL_MAX_SIZE = + "preWriteBufferMemoryPoolMaxSize"; + public static final String KV_PRE_WRITE_BUFFER_MEMORY_POOL_MAX_SIZE_PER_BUCKET = + "preWriteBufferMemoryPoolMaxSizePerBucket"; + public static final String KV_PRE_WRITE_BUFFER_MEMORY_POOL_USAGE_SIZE = + "preWriteBufferMemoryPoolUsageSize"; // -------------------------------------------------------------------------------------------- // metrics for table bucket diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java index 4b49a58475..ac3d4bfa83 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java @@ -60,6 +60,7 @@ import org.apache.fluss.exception.OutOfOrderSequenceException; import org.apache.fluss.exception.PartitionAlreadyExistsException; import org.apache.fluss.exception.PartitionNotExistException; +import org.apache.fluss.exception.PreWriteBufferFullException; import org.apache.fluss.exception.RecordTooLargeException; import org.apache.fluss.exception.RetriableAuthenticationException; import org.apache.fluss.exception.SchemaNotExistException; @@ -228,7 +229,9 @@ public enum Errors { INVALID_ALTER_TABLE_EXCEPTION( 56, "The alter table is invalid.", InvalidAlterTableException::new), DELETION_DISABLED_EXCEPTION( - 57, "Deletion operations are disabled on this table.", DeletionDisabledException::new); + 57, "Deletion operations are disabled on this table.", DeletionDisabledException::new), + PRE_WRITE_BUFFER_FULL_EXCEPTION( + 58, "The pre-write buffer is full.", PreWriteBufferFullException::new); private static final Logger LOG = LoggerFactory.getLogger(Errors.class); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java index b5318ef60b..09f2d2bca6 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java @@ -32,7 +32,9 @@ import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.metrics.MetricNames; import org.apache.fluss.server.TabletManagerBase; +import org.apache.fluss.server.kv.prewrite.KvPreWriteBufferMemoryPool; import org.apache.fluss.server.kv.rowmerger.RowMerger; import org.apache.fluss.server.log.LogManager; import org.apache.fluss.server.log.LogTablet; @@ -76,6 +78,8 @@ public final class KvManager extends TabletManagerBase { private final Map currentKvs = MapUtils.newConcurrentHashMap(); + private final KvPreWriteBufferMemoryPool kvPreWriteBufferMemoryPool; + /** * For arrow log format. The buffer allocator to allocate memory for arrow write batch of * changelog records. @@ -104,7 +108,9 @@ private KvManager( this.zkClient = zkClient; this.remoteKvDir = FlussPaths.remoteKvDir(conf); this.remoteFileSystem = remoteKvDir.getFileSystem(); + this.kvPreWriteBufferMemoryPool = new KvPreWriteBufferMemoryPool(conf); this.serverMetricGroup = tabletServerMetricGroup; + registerMetrics(); } public static KvManager create( @@ -124,6 +130,18 @@ public static KvManager create( tabletServerMetricGroup); } + private void registerMetrics() { + serverMetricGroup.gauge( + MetricNames.KV_PRE_WRITE_BUFFER_MEMORY_POOL_USAGE_SIZE, + kvPreWriteBufferMemoryPool::getTotalUsed); + serverMetricGroup.gauge( + MetricNames.KV_PRE_WRITE_BUFFER_MEMORY_POOL_MAX_SIZE_PER_BUCKET, + kvPreWriteBufferMemoryPool::getMaxMemorySizePerBucket); + serverMetricGroup.gauge( + MetricNames.KV_PRE_WRITE_BUFFER_MEMORY_POOL_MAX_SIZE, + kvPreWriteBufferMemoryPool::getMaxMemorySize); + } + public void startup() { // should do nothing now } @@ -140,6 +158,7 @@ public void shutdown() { } arrowBufferAllocator.close(); memorySegmentPool.close(); + kvPreWriteBufferMemoryPool.close(); LOG.info("Shut down KvManager complete."); } @@ -186,7 +205,8 @@ public KvTablet getOrCreateKv( merger, arrowCompressionInfo, schemaGetter, - tableConfig.getChangelogImage()); + tableConfig.getChangelogImage(), + kvPreWriteBufferMemoryPool); currentKvs.put(tableBucket, tablet); LOG.info( @@ -294,7 +314,8 @@ public KvTablet loadKv(File tabletDir, SchemaGetter schemaGetter) throws Excepti rowMerger, tableConfig.getArrowCompressionInfo(), schemaGetter, - tableConfig.getChangelogImage()); + tableConfig.getChangelogImage(), + kvPreWriteBufferMemoryPool); if (this.currentKvs.containsKey(tableBucket)) { throw new IllegalStateException( String.format( diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java index 1f1fb5f0f5..0c78042c61 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java @@ -47,6 +47,7 @@ import org.apache.fluss.row.encode.ValueDecoder; import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer; import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.TruncateReason; +import org.apache.fluss.server.kv.prewrite.KvPreWriteBufferMemoryPool; import org.apache.fluss.server.kv.rocksdb.RocksDBKv; import org.apache.fluss.server.kv.rocksdb.RocksDBKvBuilder; import org.apache.fluss.server.kv.rocksdb.RocksDBResourceContainer; @@ -86,6 +87,7 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import static org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.toByteArray; import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock; import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock; @@ -113,6 +115,7 @@ public final class KvTablet { // defines how to merge rows on the same primary key private final RowMerger rowMerger; private final ArrowCompressionInfo arrowCompressionInfo; + private final KvPreWriteBufferMemoryPool kvPreWriteBufferMemoryPool; private final SchemaGetter schemaGetter; @@ -143,14 +146,21 @@ private KvTablet( RowMerger rowMerger, ArrowCompressionInfo arrowCompressionInfo, SchemaGetter schemaGetter, - ChangelogImage changelogImage) { + ChangelogImage changelogImage, + KvPreWriteBufferMemoryPool kvPreWriteBufferMemoryPool) { this.physicalPath = physicalPath; this.tableBucket = tableBucket; this.logTablet = logTablet; this.kvTabletDir = kvTabletDir; this.rocksDBKv = rocksDBKv; this.writeBatchSize = writeBatchSize; - this.kvPreWriteBuffer = new KvPreWriteBuffer(createKvBatchWriter(), serverMetricGroup); + this.kvPreWriteBufferMemoryPool = kvPreWriteBufferMemoryPool; + this.kvPreWriteBuffer = + new KvPreWriteBuffer( + tableBucket, + createKvBatchWriter(), + kvPreWriteBufferMemoryPool, + serverMetricGroup); this.logFormat = logFormat; this.arrowWriterProvider = new ArrowWriterPool(arrowBufferAllocator); this.memorySegmentPool = memorySegmentPool; @@ -172,7 +182,8 @@ public static KvTablet create( RowMerger rowMerger, ArrowCompressionInfo arrowCompressionInfo, SchemaGetter schemaGetter, - ChangelogImage changelogImage) + ChangelogImage changelogImage, + KvPreWriteBufferMemoryPool kvPreWriteBufferMemoryPool) throws IOException { Tuple2 tablePathAndBucket = FlussPaths.parseTabletDir(kvTabletDir); @@ -189,7 +200,8 @@ public static KvTablet create( rowMerger, arrowCompressionInfo, schemaGetter, - changelogImage); + changelogImage, + kvPreWriteBufferMemoryPool); } public static KvTablet create( @@ -205,7 +217,8 @@ public static KvTablet create( RowMerger rowMerger, ArrowCompressionInfo arrowCompressionInfo, SchemaGetter schemaGetter, - ChangelogImage changelogImage) + ChangelogImage changelogImage, + KvPreWriteBufferMemoryPool kvPreWriteBufferMemoryPool) throws IOException { RocksDBKv kv = buildRocksDBKv(serverConf, kvTabletDir); return new KvTablet( @@ -223,7 +236,8 @@ public static KvTablet create( rowMerger, arrowCompressionInfo, schemaGetter, - changelogImage); + changelogImage, + kvPreWriteBufferMemoryPool); } private static RocksDBKv buildRocksDBKv(Configuration configuration, File kvDir) @@ -263,6 +277,10 @@ public long getFlushedLogOffset() { return flushedLogOffset; } + public long getPreWriteBufferMemoryPoolUsage() { + return kvPreWriteBufferMemoryPool.getPerBucketUsage(tableBucket); + } + /** * Put the KvRecordBatch into the kv storage, and return the appended wal log info. * @@ -610,7 +628,9 @@ private byte[] getFromBufferOrKv(KvPreWriteBuffer.Key key) throws IOException { if (value == null) { return rocksDBKv.get(key.get()); } - return value.get(); + + // TODO maybe we can use ByteBuffer to avoid copy + return toByteArray(value.get()); } public List multiGet(List keys) throws IOException { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBuffer.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBuffer.java index 255d7a29ef..4c4c051cc7 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBuffer.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBuffer.java @@ -19,6 +19,7 @@ import org.apache.fluss.annotation.VisibleForTesting; import org.apache.fluss.memory.MemorySegment; +import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metrics.Counter; import org.apache.fluss.metrics.Histogram; import org.apache.fluss.server.kv.KvBatchWriter; @@ -29,7 +30,7 @@ import javax.annotation.concurrent.NotThreadSafe; import java.io.IOException; -import java.util.Arrays; +import java.nio.ByteBuffer; import java.util.Base64; import java.util.HashMap; import java.util.Iterator; @@ -86,7 +87,9 @@ */ @NotThreadSafe public class KvPreWriteBuffer implements AutoCloseable { + private final TableBucket tb; private final KvBatchWriter kvBatchWriter; + private final KvPreWriteBufferMemoryPool kvPreWriteBufferMemoryPool; // a mapping from the key to the kv-entry private final Map kvEntryMap = new HashMap<>(); @@ -104,8 +107,13 @@ public class KvPreWriteBuffer implements AutoCloseable { private long maxLogSequenceNumber = -1; public KvPreWriteBuffer( - KvBatchWriter kvBatchWriter, TabletServerMetricGroup serverMetricGroup) { + TableBucket tb, + KvBatchWriter kvBatchWriter, + KvPreWriteBufferMemoryPool kvPreWriteBufferMemoryPool, + TabletServerMetricGroup serverMetricGroup) { + this.tb = tb; this.kvBatchWriter = kvBatchWriter; + this.kvPreWriteBufferMemoryPool = kvPreWriteBufferMemoryPool; flushCount = serverMetricGroup.kvFlushCount(); flushLatencyHistogram = serverMetricGroup.kvFlushLatencyHistogram(); @@ -128,7 +136,13 @@ public void delete(Key key, long logSequenceNumber) { * @param logSequenceNumber the log sequence number for the put operation */ public void put(Key key, @Nullable byte[] value, long logSequenceNumber) { - update(key, Value.of(value), logSequenceNumber); + if (value == null) { + update(key, Value.of(null), logSequenceNumber); + } else { + // allocate a new byte buffer from memory pool. + ByteBuffer newBuffer = kvPreWriteBufferMemoryPool.allocate(tb, value); + update(key, Value.of(newBuffer), logSequenceNumber); + } } private void update(Key key, Value value, long lsn) { @@ -189,7 +203,13 @@ public void truncateTo(long targetLogSequenceNumber, TruncateReason truncateReas break; } descIter.remove(); + boolean removed = kvEntryMap.remove(entry.getKey(), entry); + Value value = entry.getValue(); + if (value.value != null) { + kvPreWriteBufferMemoryPool.release(tb, value.value); + } + // if the latest entry is removed, we need to rollback the previous entry to the map if (removed && entry.previousEntry != null) { kvEntryMap.put(entry.getKey(), entry.previousEntry); @@ -223,7 +243,7 @@ public void flush(long exclusiveUpToLogSequenceNumber) throws IOException { Value value = entry.getValue(); if (value.value != null) { flushedCount += 1; - kvBatchWriter.put(entry.getKey().key, value.value); + kvBatchWriter.put(entry.getKey().key, toByteArray(value.value)); } else { flushedCount += 1; kvBatchWriter.delete(entry.getKey().key); @@ -233,6 +253,10 @@ public void flush(long exclusiveUpToLogSequenceNumber) throws IOException { // can remove it from the map. Although it's not a must to remove from the map, // we remove it to reduce the memory usage kvEntryMap.remove(entry.getKey(), entry); + if (value.value != null) { + // release the buffer to buffer pool + kvPreWriteBufferMemoryPool.release(tb, value.value); + } } // flush to underlying kv tablet if (flushedCount > 0) { @@ -423,19 +447,19 @@ public String toString() { * {@link KvEntry} with the value is for key deletion. */ public static class Value { - private final @Nullable byte[] value; + private final @Nullable ByteBuffer value; - private Value(@Nullable byte[] value) { + private Value(@Nullable ByteBuffer value) { this.value = value; } - public static Value of(@Nullable byte[] value) { + public static Value of(@Nullable ByteBuffer value) { return new Value(value); } /** Return the value. Return null if marked as deleted. */ @Nullable - public byte[] get() { + public ByteBuffer get() { return value; } @@ -448,17 +472,27 @@ public boolean equals(Object o) { return false; } Value value1 = (Value) o; - return Arrays.equals(value, value1.value); + return Objects.equals(value, value1.value); } @Override public int hashCode() { - return Arrays.hashCode(value); + return Objects.hash(value); } + } - @Override - public String toString() { - return value == null ? "null" : "[" + Base64.getEncoder().encodeToString(value) + "]"; + public static byte[] toByteArray(ByteBuffer buffer) { + if (buffer == null) { + return null; + } + + int originalPosition = buffer.position(); + try { + byte[] bytes = new byte[buffer.remaining()]; + buffer.get(bytes); + return bytes; + } finally { + buffer.position(originalPosition); } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBufferMemoryPool.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBufferMemoryPool.java new file mode 100644 index 0000000000..b4addde20c --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBufferMemoryPool.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.kv.prewrite; + +import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.exception.PreWriteBufferFullException; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.utils.MapUtils; + +import javax.annotation.concurrent.GuardedBy; + +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import static org.apache.fluss.utils.concurrent.LockUtils.inLock; + +/** a shared off-heap memory pool for {@link KvPreWriteBuffer} of different tableBucket. */ +public class KvPreWriteBufferMemoryPool { + + /** The maximum memory size for this memory pool. */ + private final long maxMemorySize; + + /** The maximum memory size for this memory pool per bucket. */ + private final long maxMemorySizePerBucket; + + private final Lock lock = new ReentrantLock(); + + /** The total used memory size. */ + @GuardedBy("lock") + private final AtomicLong totalUsedBytes = new AtomicLong(0); + + /** Memory allocated per bucket. */ + @GuardedBy("lock") + private final Map allocatedBufferForBucket = MapUtils.newConcurrentHashMap(); + + public KvPreWriteBufferMemoryPool(Configuration conf) { + this( + conf.get(ConfigOptions.KV_PRE_WRITE_BUFFER_MEMORY_POOL_SIZE).getBytes(), + conf.get(ConfigOptions.KV_PRE_WRITE_BUFFER_MEMORY_POOL_SIZ_PER_BUCKET).getBytes()); + } + + @VisibleForTesting + public KvPreWriteBufferMemoryPool(long maxMemorySize, long maxMemorySizePerBucket) { + this.maxMemorySize = maxMemorySize; + this.maxMemorySizePerBucket = maxMemorySizePerBucket; + } + + public long getTotalUsed() { + return totalUsedBytes.get(); + } + + public long getPerBucketUsage(TableBucket bucket) { + return allocatedBufferForBucket.getOrDefault(bucket, 0L); + } + + public long getMaxMemorySize() { + return maxMemorySize; + } + + public long getMaxMemorySizePerBucket() { + return maxMemorySizePerBucket; + } + + /** + * Allocate a buffer and copy the input value into the new buffer. + * + * @param bucket the bucket of the table. + * @param value the value to copy. + * @return the new buffer. + */ + ByteBuffer allocate(TableBucket bucket, ByteBuffer value) { + if (value == null) { + throw new NullPointerException("The input buffer buffer cannot be null"); + } + int size = value.remaining(); + ByteBuffer newBuffer = allocate(bucket, size); + newBuffer.put(value.duplicate()); + newBuffer.flip(); + return newBuffer; + } + + /** + * Allocate a buffer and copy the input value into the new buffer. + * + * @param bucket the bucket of the table. + * @param value the value to copy. + * @return the new buffer. + */ + public ByteBuffer allocate(TableBucket bucket, byte[] value) { + if (value == null) { + throw new NullPointerException("The input buffer buffer cannot be null"); + } + return allocate(bucket, ByteBuffer.wrap(value)); + } + + /** + * Allocate a buffer with the given size. + * + * @param bucket the bucket of the table. + * @param size the size of the buffer. + * @return the new buffer. + */ + private ByteBuffer allocate(TableBucket bucket, int size) { + if (size <= 0) { + throw new IllegalArgumentException("Allocation size must be > 0"); + } + + inLock( + lock, + () -> { + long currentBucketUsage = allocatedBufferForBucket.getOrDefault(bucket, 0L); + + // check global memory limit. + long currentTotal = totalUsedBytes.get(); + long newTotal = currentTotal + size; + + if (newTotal > maxMemorySize) { + throw new PreWriteBufferFullException( + "Global PreWriteBuffer memory pool exhausted. Requested: " + + size + + " bytes, Total used: " + + currentTotal + + ", Limit: " + + maxMemorySize); + } + + // check per bucket memory limit. + long newBucketUsage = currentBucketUsage + size; + if (newBucketUsage > maxMemorySizePerBucket) { + throw new PreWriteBufferFullException( + "PreWriteBuffer memory limit exceeded for table bucket: " + + bucket + + ", Current: " + + currentBucketUsage + + ", Request: " + + size + + ", Limit: " + + maxMemorySizePerBucket); + } + + allocatedBufferForBucket.put(bucket, newBucketUsage); + totalUsedBytes.addAndGet(size); + }); + + // TODO Maybe we can cache small buffers to avoid frequent allocation and release. + // Allocate a new buffer lazily. + return ByteBuffer.allocateDirect(size); + } + + /** + * Release the buffer. + * + * @param bucket the bucket of the table. + * @param buffer the buffer to release. + */ + public void release(TableBucket bucket, ByteBuffer buffer) { + if (buffer == null || !buffer.isDirect()) { + return; + } + + int size = buffer.capacity(); + inLock( + lock, + () -> { + Long currentBucketUsage = allocatedBufferForBucket.get(bucket); + if (currentBucketUsage == null || currentBucketUsage < size) { + throw new IllegalStateException( + "Release more than allocated for bucket: " + bucket); + } + + long newUsage = currentBucketUsage - size; + if (newUsage <= 0) { + allocatedBufferForBucket.remove(bucket); + } else { + allocatedBufferForBucket.put(bucket, newUsage); + } + + totalUsedBytes.addAndGet(-size); + }); + } + + public void close() { + // do nothing now. + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java index 1669e004df..249c76d467 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java @@ -265,6 +265,12 @@ private void registerMetrics() { logicalStorageMetrics.gauge( MetricNames.LOCAL_STORAGE_LOG_SIZE, this::logicalStorageLogSize); logicalStorageMetrics.gauge(MetricNames.LOCAL_STORAGE_KV_SIZE, this::logicalStorageKvSize); + + // PreWriteBufferSize. + MetricGroup kvMetrics = bucketMetricGroup.addGroup("kv"); + kvMetrics.gauge( + MetricNames.KV_PRE_WRITE_BUFFER_MEMORY_POOL_USAGE_SIZE, + this::kvPreWriteBufferMemoryPoolUsage); } public long logicalStorageLogSize() { @@ -286,6 +292,14 @@ public long logicalStorageKvSize() { } } + public long kvPreWriteBufferMemoryPoolUsage() { + if (isLeader() && isKvTable()) { + return kvTablet.getPreWriteBufferMemoryPoolUsage(); + } else { + return 0L; + } + } + public boolean isKvTable() { return kvManager != null; } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java index 94d41bffe0..ef991f336a 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java @@ -47,6 +47,7 @@ import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.Key; import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.KvEntry; import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.Value; +import org.apache.fluss.server.kv.prewrite.KvPreWriteBufferMemoryPool; import org.apache.fluss.server.kv.rowmerger.RowMerger; import org.apache.fluss.server.log.FetchIsolation; import org.apache.fluss.server.log.LogAppendInfo; @@ -70,6 +71,7 @@ import javax.annotation.Nullable; import java.io.File; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -191,7 +193,8 @@ private KvTablet createKvTablet( rowMerger, DEFAULT_COMPRESSION, schemaGetter, - tableConf.getChangelogImage()); + tableConf.getChangelogImage(), + new KvPreWriteBufferMemoryPool(10 * 1024 * 1024, 1024 * 1024)); } @Test @@ -1238,6 +1241,6 @@ private void checkEqual(LogRecords actaulLogRecords, List expe } private Value valueOf(BinaryRow row) { - return Value.of(ValueEncoder.encodeValue(schemaId, row)); + return Value.of(ByteBuffer.wrap(ValueEncoder.encodeValue(schemaId, row))); } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBufferMemoryPoolTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBufferMemoryPoolTest.java new file mode 100644 index 0000000000..f2d3df20db --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBufferMemoryPoolTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.kv.prewrite; + +import org.apache.fluss.exception.PreWriteBufferFullException; +import org.apache.fluss.metadata.TableBucket; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.nio.ByteBuffer; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test for {@link KvPreWriteBufferMemoryPool}. */ +public class KvPreWriteBufferMemoryPoolTest { + private static final long GLOBAL_LIMIT = 1000 * 1024; // 1mb total + private static final long PER_BUCKET_LIMIT = 600 * 1024; // 600KB per bucket + + private KvPreWriteBufferMemoryPool pool; + private TableBucket tb1; + private TableBucket tb2; + + @BeforeEach + void setup() { + pool = new KvPreWriteBufferMemoryPool(GLOBAL_LIMIT, PER_BUCKET_LIMIT); + tb1 = new TableBucket(0, 1); + tb2 = new TableBucket(0, 2); + } + + @AfterEach + void teardown() { + if (pool != null) { + pool.close(); + } + } + + @Test + void testSuccessAllocateAndRelease() { + byte[] data = new byte[] {0, 1, 2}; + ByteBuffer src = ByteBuffer.wrap(data); + ByteBuffer buffer = pool.allocate(tb1, src); + + assertThat(buffer).isNotNull(); + assertThat(buffer.isDirect()).isTrue(); + assertThat(buffer.capacity()).isEqualTo(3); + assertThat(buffer.position()).isZero(); + assertThat(buffer.limit()).isEqualTo(3); + + // Verify content + ByteBuffer dup = buffer.duplicate(); + byte[] actual = new byte[dup.remaining()]; + dup.get(actual); + assertThat(actual).hasSize(3).containsExactly(data); + + // Release + pool.release(tb1, buffer); + + assertThat(pool.getTotalUsed()).isZero(); + assertThat(pool.getPerBucketUsage(tb1)).isZero(); + } + + @Test + void testEnforcePerBucketLimit() { + int chunk = (int) (PER_BUCKET_LIMIT / 3); + + // First two chunks OK + pool.allocate(tb1, new byte[chunk]); + pool.allocate(tb1, new byte[chunk]); + + assertThat(pool.getPerBucketUsage(tb1)).isEqualTo(chunk * 2); + + // Third chunk exceeds limit. + assertThatThrownBy(() -> pool.allocate(tb1, new byte[chunk + 1])) + .isInstanceOf(PreWriteBufferFullException.class) + .hasMessageContaining( + "PreWriteBuffer memory limit exceeded for table bucket: " + + "TableBucket{tableId=0, bucket=1}, Current: 409600, Request: 204801, Limit: 614400"); + } + + @Test + void testEnforceGlobalMemoryLimit() { + int chunk = (int) (GLOBAL_LIMIT / 3); + + pool.allocate(tb1, new byte[chunk]); + pool.allocate(tb2, new byte[chunk]); + + assertThat(pool.getTotalUsed()).isEqualTo(chunk * 2); + + // Third allocation exceeds global limit. + assertThatThrownBy(() -> pool.allocate(tb1, new byte[chunk + 5])) + .isInstanceOf(PreWriteBufferFullException.class) + .hasMessageContaining( + "Global PreWriteBuffer memory pool exhausted. Requested: 341338 bytes, " + + "Total used: 682666, Limit: 1024000"); + } +} diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBufferTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBufferTest.java index b39ec39ac6..2a9a03b9e3 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBufferTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBufferTest.java @@ -17,25 +17,41 @@ package org.apache.fluss.server.kv.prewrite; +import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.server.kv.KvBatchWriter; import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.TruncateReason; import org.apache.fluss.server.metrics.group.TestingMetricGroups; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import javax.annotation.Nonnull; +import java.nio.ByteBuffer; + +import static org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.toByteArray; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Test for {@link org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer}. */ class KvPreWriteBufferTest { + private final TableBucket tb = new TableBucket(1, 0); + private KvPreWriteBufferMemoryPool memoryPool; + + @BeforeEach + void setup() { + memoryPool = new KvPreWriteBufferMemoryPool(10 * 1024 * 1024, 1024 * 1024); + } + @Test void testIllegalLSN() { KvPreWriteBuffer buffer = new KvPreWriteBuffer( - new NopKvBatchWriter(), TestingMetricGroups.TABLET_SERVER_METRICS); + tb, + new NopKvBatchWriter(), + memoryPool, + TestingMetricGroups.TABLET_SERVER_METRICS); bufferPut(buffer, "key1", "value1", 1); bufferDelete(buffer, "key1", 3); @@ -56,7 +72,10 @@ void testIllegalLSN() { void testWriteAndFlush() throws Exception { KvPreWriteBuffer buffer = new KvPreWriteBuffer( - new NopKvBatchWriter(), TestingMetricGroups.TABLET_SERVER_METRICS); + tb, + new NopKvBatchWriter(), + memoryPool, + TestingMetricGroups.TABLET_SERVER_METRICS); int elementCount = 0; // put a series of kv entries @@ -139,7 +158,10 @@ void testWriteAndFlush() throws Exception { void testTruncate() { KvPreWriteBuffer buffer = new KvPreWriteBuffer( - new NopKvBatchWriter(), TestingMetricGroups.TABLET_SERVER_METRICS); + tb, + new NopKvBatchWriter(), + memoryPool, + TestingMetricGroups.TABLET_SERVER_METRICS); int elementCount = 0; // put a series of kv entries @@ -204,8 +226,8 @@ private static String getValue(KvPreWriteBuffer preWriteBuffer, String keyStr) { KvPreWriteBuffer.Key key = toKey(keyStr); KvPreWriteBuffer.Value value = preWriteBuffer.get(key); if (value != null && value.get() != null) { - byte[] bytes = value.get(); - return bytes != null ? new String(bytes) : null; + ByteBuffer bytes = value.get(); + return bytes != null ? new String(toByteArray(bytes)) : null; } else { return null; } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java index a1ee83fa69..815a167de5 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java @@ -316,6 +316,10 @@ private void startTabletServer(int serverId, @Nullable Configuration overwriteCo tabletServerConf.set(ConfigOptions.DATA_DIR, dataDir); tabletServerConf.setString( ConfigOptions.ZOOKEEPER_ADDRESS, zooKeeperServer.getConnectString()); + tabletServerConf.setString( + ConfigOptions.KV_PRE_WRITE_BUFFER_MEMORY_POOL_SIZE.key(), "20MB"); + tabletServerConf.setString( + ConfigOptions.KV_PRE_WRITE_BUFFER_MEMORY_POOL_SIZ_PER_BUCKET.key(), "5MB"); tabletServerConf.setString(ConfigOptions.BIND_LISTENERS, tabletServerListeners); if (overwriteConfig != null) { tabletServerConf.addAll(overwriteConfig); diff --git a/website/docs/maintenance/configuration.md b/website/docs/maintenance/configuration.md index f6988aacef..71f4fce8ef 100644 --- a/website/docs/maintenance/configuration.md +++ b/website/docs/maintenance/configuration.md @@ -131,33 +131,36 @@ during the Fluss cluster working. ## Kv -| Option | Type | Default | Description | -|---------------------------------------------------|------------|-------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| kv.snapshot.interval | Duration | 10min | The interval to perform periodic snapshot for kv data. The default setting is 10 minutes. | -| kv.snapshot.scheduler-thread-num | Integer | 1 | The number of threads that the server uses to schedule snapshot kv data for all the replicas in the server. | -| kv.snapshot.transfer-thread-num | Integer | 4 | **Deprecated**: This option is deprecated. Please use `server.io-pool.size` instead. The number of threads the server uses to transfer (download and upload) kv snapshot files. | -| kv.snapshot.num-retained | Integer | 1 | The maximum number of completed snapshots to retain. | -| kv.rocksdb.thread.num | Integer | 2 | The maximum number of concurrent background flush and compaction jobs (per bucket of table). The default value is `2`. | -| kv.rocksdb.files.open | Integer | -1 | The maximum number of open files (per bucket of table) that can be used by the DB, `-1` means no limit. The default value is `-1`. | -| kv.rocksdb.log.max-file-size | MemorySize | 25mb | The maximum size of RocksDB's file used for information logging. If the log files becomes larger than this, a new file will be created. If 0, all logs will be written to one log file. The default maximum file size is `25MB`. | -| kv.rocksdb.log.file-num | Integer | 4 | The maximum number of files RocksDB should keep for information logging (Default setting: 4). | -| kv.rocksdb.log.dir | String | `${FLUSS_HOME}/log/rocksdb` | The directory for RocksDB's information logging files. If empty (Fluss default setting), log files will be in the same directory as the Fluss log. If non-empty, this directory will be used and the data directory's absolute path will be used as the prefix of the log file name. If setting this option as a non-existing location, e.g `/dev/null`, RocksDB will then create the log under its own database folder as before. | -| kv.rocksdb.log.level | Enum | INFO_LEVEL | The specified information logging level for RocksDB. Candidate log level is `DEBUG_LEVEL`, `INFO_LEVEL`, `WARN_LEVEL`, `ERROR_LEVEL`, `FATAL_LEVEL`, `HEADER_LEVEL`, NUM_INFO_LOG_LEVELS, . If unset, Fluss will use INFO_LEVEL. Note: RocksDB info logs will not be written to the Fluss's tablet server logs and there is no rolling strategy, unless you configure `kv.rocksdb.log.dir`, `kv.rocksdb.log.max-file-size` and `kv.rocksdb.log.file-num` accordingly. Without a rolling strategy, it may lead to uncontrolled disk space usage if configured with increased log levels! There is no need to modify the RocksDB log level, unless for troubleshooting RocksDB. | -| kv.rocksdb.write-batch-size | MemorySize | 2mb | The max size of the consumed memory for RocksDB batch write, will flush just based on item count if this config set to 0. | -| kv.rocksdb.compaction.style | Enum | LEVEL | The specified compaction style for DB. Candidate compaction style is LEVEL, FIFO, UNIVERSAL, or NONE, and Fluss chooses `LEVEL` as default style. | -| kv.rocksdb.compaction.level.use-dynamic-size | Boolean | false | If true, RocksDB will pick target size of each level dynamically. From an empty DB, RocksDB would make last level the base level, which means merging L0 data into the last level, until it exceeds max_bytes_for_level_base. And then repeat this process for second last level and so on. The default value is `false`. For more information, please refer to RocksDB's [doc](https://github.com/facebook/rocksdb/wiki/Leveled-Compaction#level_compaction_dynamic_level_bytes-is-true) . | -| kv.rocksdb.compression.per.level | Enum | LZ4,LZ4,LZ4,LZ4,LZ4,ZSTD,ZSTD | A comma-separated list of Compression Type. Different levels can have different compression policies. In many cases, lower levels use fast compression algorithms, while higher levels with more data use slower but more effective compression algorithms. The N th element in the List corresponds to the compression type of the level N-1 When `kv.rocksdb.compaction.level.use-dynamic-size` is true, compression_per_level[0] still determines L0, but other elements are based on the base level and may not match the level seen in the info log. Note: If the List size is smaller than the level number, the undefined lower level uses the last Compression Type in the List. The optional values include NO, SNAPPY, LZ4, ZSTD. For more information about compression type, please refer to [doc](https://github.com/facebook/rocksdb/wiki/Compression). The default value is ‘LZ4,LZ4,LZ4,LZ4,LZ4,ZSTD,ZSTD’, indicates there is lz4 compaction of level0 and level4, ZSTD compaction algorithm is used from level5 to level6. LZ4 is a lightweight compression algorithm so it usually strikes a good balance between space and CPU usage. ZSTD is more space save than LZ4, but it is more CPU-intensive. Different machines deploy compaction modes according to CPU and I/O resources. The default value is for the scenario that CPU resources are adequate. If you find the IO pressure of the system is not big when writing a lot of data, but CPU resources are inadequate, you can exchange I/O resources for CPU resources and change the compaction mode to `NO,NO,NO,LZ4,LZ4,ZSTD,ZSTD`. | -| kv.rocksdb.compaction.level.target-file-size-base | MemorySize | 64mb | The target file size for compaction, which determines a level-1 file size. The default value is `64MB`. | -| kv.rocksdb.compaction.level.max-size-level-base | MemorySize | 256mb | The upper-bound of the total size of level base files in bytes. The default value is `256MB`. | -| kv.rocksdb.writebuffer.size | MemorySize | 64mb | The amount of data built up in memory (backed by an unsorted log on disk) before converting to a sorted on-disk files. The default writebuffer size is `64MB`. | -| kv.rocksdb.writebuffer.count | Integer | 2 | The maximum number of write buffers that are built up in memory. The default value is `2`. | -| kv.rocksdb.writebuffer.number-to-merge | Integer | 1 | The minimum number of write buffers that will be merged together before writing to storage. The default value is `1`. | -| kv.rocksdb.block.blocksize | MemorySize | 4kb | The approximate size (in bytes) of user data packed per block. The default blocksize is `4KB`. | -| kv.rocksdb.block.cache-size | MemorySize | 8mb | The amount of the cache for data blocks in RocksDB. The default block-cache size is `8MB`. | -| kv.rocksdb.use-bloom-filter | Boolean | true | If true, every newly created SST file will contain a Bloom filter. It is enabled by default. | -| kv.rocksdb.bloom-filter.bits-per-key | Double | 10.0 | Bits per key that bloom filter will use, this only take effect when bloom filter is used. The default value is 10.0. | -| kv.rocksdb.bloom-filter.block-based-mode | Boolean | false | If true, RocksDB will use block-based filter instead of full filter, this only take effect when bloom filter is used. The default value is `false`. | -| kv.recover.log-record-batch.max-size | MemorySize | 16mb | The max fetch size for fetching log to apply to kv during recovering kv. | +| Option | Type | Default | Description | +|-----------------------------------------------------|--------------|-------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| kv.snapshot.interval | Duration | 10min | The interval to perform periodic snapshot for kv data. The default setting is 10 minutes. | +| kv.snapshot.scheduler-thread-num | Integer | 1 | The number of threads that the server uses to schedule snapshot kv data for all the replicas in the server. | +| kv.snapshot.transfer-thread-num | Integer | 4 | **Deprecated**: This option is deprecated. Please use `server.io-pool.size` instead. The number of threads the server uses to transfer (download and upload) kv snapshot files. | +| kv.snapshot.num-retained | Integer | 1 | The maximum number of completed snapshots to retain. | +| kv.rocksdb.thread.num | Integer | 2 | The maximum number of concurrent background flush and compaction jobs (per bucket of table). The default value is `2`. | +| kv.rocksdb.files.open | Integer | -1 | The maximum number of open files (per bucket of table) that can be used by the DB, `-1` means no limit. The default value is `-1`. | +| kv.rocksdb.log.max-file-size | MemorySize | 25mb | The maximum size of RocksDB's file used for information logging. If the log files becomes larger than this, a new file will be created. If 0, all logs will be written to one log file. The default maximum file size is `25MB`. | +| kv.rocksdb.log.file-num | Integer | 4 | The maximum number of files RocksDB should keep for information logging (Default setting: 4). | +| kv.rocksdb.log.dir | String | `${FLUSS_HOME}/log/rocksdb` | The directory for RocksDB's information logging files. If empty (Fluss default setting), log files will be in the same directory as the Fluss log. If non-empty, this directory will be used and the data directory's absolute path will be used as the prefix of the log file name. If setting this option as a non-existing location, e.g `/dev/null`, RocksDB will then create the log under its own database folder as before. | +| kv.rocksdb.log.level | Enum | INFO_LEVEL | The specified information logging level for RocksDB. Candidate log level is `DEBUG_LEVEL`, `INFO_LEVEL`, `WARN_LEVEL`, `ERROR_LEVEL`, `FATAL_LEVEL`, `HEADER_LEVEL`, NUM_INFO_LOG_LEVELS, . If unset, Fluss will use INFO_LEVEL. Note: RocksDB info logs will not be written to the Fluss's tablet server logs and there is no rolling strategy, unless you configure `kv.rocksdb.log.dir`, `kv.rocksdb.log.max-file-size` and `kv.rocksdb.log.file-num` accordingly. Without a rolling strategy, it may lead to uncontrolled disk space usage if configured with increased log levels! There is no need to modify the RocksDB log level, unless for troubleshooting RocksDB. | +| kv.rocksdb.write-batch-size | MemorySize | 2mb | The max size of the consumed memory for RocksDB batch write, will flush just based on item count if this config set to 0. | +| kv.rocksdb.compaction.style | Enum | LEVEL | The specified compaction style for DB. Candidate compaction style is LEVEL, FIFO, UNIVERSAL, or NONE, and Fluss chooses `LEVEL` as default style. | +| kv.rocksdb.compaction.level.use-dynamic-size | Boolean | false | If true, RocksDB will pick target size of each level dynamically. From an empty DB, RocksDB would make last level the base level, which means merging L0 data into the last level, until it exceeds max_bytes_for_level_base. And then repeat this process for second last level and so on. The default value is `false`. For more information, please refer to RocksDB's [doc](https://github.com/facebook/rocksdb/wiki/Leveled-Compaction#level_compaction_dynamic_level_bytes-is-true) . | +| kv.rocksdb.compression.per.level | Enum | LZ4,LZ4,LZ4,LZ4,LZ4,ZSTD,ZSTD | A comma-separated list of Compression Type. Different levels can have different compression policies. In many cases, lower levels use fast compression algorithms, while higher levels with more data use slower but more effective compression algorithms. The N th element in the List corresponds to the compression type of the level N-1 When `kv.rocksdb.compaction.level.use-dynamic-size` is true, compression_per_level[0] still determines L0, but other elements are based on the base level and may not match the level seen in the info log. Note: If the List size is smaller than the level number, the undefined lower level uses the last Compression Type in the List. The optional values include NO, SNAPPY, LZ4, ZSTD. For more information about compression type, please refer to [doc](https://github.com/facebook/rocksdb/wiki/Compression). The default value is ‘LZ4,LZ4,LZ4,LZ4,LZ4,ZSTD,ZSTD’, indicates there is lz4 compaction of level0 and level4, ZSTD compaction algorithm is used from level5 to level6. LZ4 is a lightweight compression algorithm so it usually strikes a good balance between space and CPU usage. ZSTD is more space save than LZ4, but it is more CPU-intensive. Different machines deploy compaction modes according to CPU and I/O resources. The default value is for the scenario that CPU resources are adequate. If you find the IO pressure of the system is not big when writing a lot of data, but CPU resources are inadequate, you can exchange I/O resources for CPU resources and change the compaction mode to `NO,NO,NO,LZ4,LZ4,ZSTD,ZSTD`. | +| kv.rocksdb.compaction.level.target-file-size-base | MemorySize | 64mb | The target file size for compaction, which determines a level-1 file size. The default value is `64MB`. | +| kv.rocksdb.compaction.level.max-size-level-base | MemorySize | 256mb | The upper-bound of the total size of level base files in bytes. The default value is `256MB`. | +| kv.rocksdb.writebuffer.size | MemorySize | 64mb | The amount of data built up in memory (backed by an unsorted log on disk) before converting to a sorted on-disk files. The default writebuffer size is `64MB`. | +| kv.rocksdb.writebuffer.count | Integer | 2 | The maximum number of write buffers that are built up in memory. The default value is `2`. | +| kv.rocksdb.writebuffer.number-to-merge | Integer | 1 | The minimum number of write buffers that will be merged together before writing to storage. The default value is `1`. | +| kv.rocksdb.block.blocksize | MemorySize | 4kb | The approximate size (in bytes) of user data packed per block. The default blocksize is `4KB`. | +| kv.rocksdb.block.cache-size | MemorySize | 8mb | The amount of the cache for data blocks in RocksDB. The default block-cache size is `8MB`. | +| kv.rocksdb.use-bloom-filter | Boolean | true | If true, every newly created SST file will contain a Bloom filter. It is enabled by default. | +| kv.rocksdb.bloom-filter.bits-per-key | Double | 10.0 | Bits per key that bloom filter will use, this only take effect when bloom filter is used. The default value is 10.0. | +| kv.rocksdb.bloom-filter.block-based-mode | Boolean | false | If true, RocksDB will use block-based filter instead of full filter, this only take effect when bloom filter is used. The default value is `false`. | +| kv.recover.log-record-batch.max-size | MemorySize | 16mb | The max fetch size for fetching log to apply to kv during recovering kv. | +| kv.preWriteBuffer.memory-pool-size | MemorySize | 2gb | The max size of the off-heap memory pool for pre-write buffer. All buckets will share the same buffer pool in one tabletServer. If the used memory exceeds this config, PreWriteBufferFullException will be thrown. The default value is 2GB. | +| kv.preWriteBuffer.memory-pool-size-per-bucket | MemorySize | 200mb | The max size of the off-heap memory buffer pool for pre-write buffer of each table bucket. If there is a table bucket who use the buffer size exceeds this config, PreWriteBufferFullException will be thrown. The default value is 200MB. | + ## Metrics