Skip to content

Commit 7e693a9

Browse files
committed
[kv] Introduce a shared memory cache for PrewriteBuffer between different replica
1 parent b432b14 commit 7e693a9

File tree

14 files changed

+551
-57
lines changed

14 files changed

+551
-57
lines changed

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1495,6 +1495,25 @@ public class ConfigOptions {
14951495
"The max size of the consumed memory for RocksDB batch write, "
14961496
+ "will flush just based on item count if this config set to 0.");
14971497

1498+
public static final ConfigOption<MemorySize> KV_PRE_WRITE_BUFFER_MEMORY_POOL_SIZE =
1499+
key("kv.preWriteBuffer.memory-pool-size")
1500+
.memoryType()
1501+
.defaultValue(MemorySize.parse("2gb"))
1502+
.withDescription(
1503+
"The max size of the off-heap memory pool for pre-write buffer. "
1504+
+ "All buckets will share the same buffer pool in one tabletServer. "
1505+
+ "If the used memory exceeds this config, PreWriteBufferFullException will be thrown. "
1506+
+ "The default value is 2GB.");
1507+
1508+
public static final ConfigOption<MemorySize> KV_PRE_WRITE_BUFFER_MEMORY_POOL_SIZ_PER_BUCKET =
1509+
key("kv.preWriteBuffer.memory-pool-size-per-bucket")
1510+
.memoryType()
1511+
.defaultValue(MemorySize.parse("200mb"))
1512+
.withDescription(
1513+
"The max size of the off-heap memory buffer pool for pre-write buffer of each table bucket. "
1514+
+ "If there is a table bucket who use the buffer size exceeds this config, "
1515+
+ "PreWriteBufferFullException will be thrown. The default value is 200MB.");
1516+
14981517
// --------------------------------------------------------------------------
14991518
// Provided configurable ColumnFamilyOptions within Fluss
15001519
// --------------------------------------------------------------------------
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.exception;
19+
20+
/** Pre-write buffer is full. */
21+
public class PreWriteBufferFullException extends RetriableException {
22+
private static final long serialVersionUID = 1L;
23+
24+
public PreWriteBufferFullException(String message) {
25+
super(message);
26+
}
27+
}

fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,12 @@ public class MetricNames {
124124
"preWriteBufferTruncateAsDuplicatedPerSecond";
125125
public static final String KV_PRE_WRITE_BUFFER_TRUNCATE_AS_ERROR_RATE =
126126
"preWriteBufferTruncateAsErrorPerSecond";
127+
public static final String KV_PRE_WRITE_BUFFER_MEMORY_POOL_MAX_SIZE =
128+
"preWriteBufferMemoryPoolMaxSize";
129+
public static final String KV_PRE_WRITE_BUFFER_MEMORY_POOL_MAX_SIZE_PER_BUCKET =
130+
"preWriteBufferMemoryPoolMaxSizePerBucket";
131+
public static final String KV_PRE_WRITE_BUFFER_MEMORY_POOL_USAGE_SIZE =
132+
"preWriteBufferMemoryPoolUsageSize";
127133

128134
// --------------------------------------------------------------------------------------------
129135
// metrics for table bucket

fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import org.apache.fluss.exception.OutOfOrderSequenceException;
6161
import org.apache.fluss.exception.PartitionAlreadyExistsException;
6262
import org.apache.fluss.exception.PartitionNotExistException;
63+
import org.apache.fluss.exception.PreWriteBufferFullException;
6364
import org.apache.fluss.exception.RecordTooLargeException;
6465
import org.apache.fluss.exception.RetriableAuthenticationException;
6566
import org.apache.fluss.exception.SchemaNotExistException;
@@ -228,7 +229,9 @@ public enum Errors {
228229
INVALID_ALTER_TABLE_EXCEPTION(
229230
56, "The alter table is invalid.", InvalidAlterTableException::new),
230231
DELETION_DISABLED_EXCEPTION(
231-
57, "Deletion operations are disabled on this table.", DeletionDisabledException::new);
232+
57, "Deletion operations are disabled on this table.", DeletionDisabledException::new),
233+
PRE_WRITE_BUFFER_FULL_EXCEPTION(
234+
58, "The pre-write buffer is full.", PreWriteBufferFullException::new);
232235

233236
private static final Logger LOG = LoggerFactory.getLogger(Errors.class);
234237

fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@
3232
import org.apache.fluss.metadata.TableBucket;
3333
import org.apache.fluss.metadata.TableInfo;
3434
import org.apache.fluss.metadata.TablePath;
35+
import org.apache.fluss.metrics.MetricNames;
3536
import org.apache.fluss.server.TabletManagerBase;
37+
import org.apache.fluss.server.kv.prewrite.KvPreWriteBufferMemoryPool;
3638
import org.apache.fluss.server.kv.rowmerger.RowMerger;
3739
import org.apache.fluss.server.log.LogManager;
3840
import org.apache.fluss.server.log.LogTablet;
@@ -76,6 +78,8 @@ public final class KvManager extends TabletManagerBase {
7678

7779
private final Map<TableBucket, KvTablet> currentKvs = MapUtils.newConcurrentHashMap();
7880

81+
private final KvPreWriteBufferMemoryPool kvPreWriteBufferMemoryPool;
82+
7983
/**
8084
* For arrow log format. The buffer allocator to allocate memory for arrow write batch of
8185
* changelog records.
@@ -104,7 +108,9 @@ private KvManager(
104108
this.zkClient = zkClient;
105109
this.remoteKvDir = FlussPaths.remoteKvDir(conf);
106110
this.remoteFileSystem = remoteKvDir.getFileSystem();
111+
this.kvPreWriteBufferMemoryPool = new KvPreWriteBufferMemoryPool(conf);
107112
this.serverMetricGroup = tabletServerMetricGroup;
113+
registerMetrics();
108114
}
109115

110116
public static KvManager create(
@@ -124,6 +130,18 @@ public static KvManager create(
124130
tabletServerMetricGroup);
125131
}
126132

133+
private void registerMetrics() {
134+
serverMetricGroup.gauge(
135+
MetricNames.KV_PRE_WRITE_BUFFER_MEMORY_POOL_USAGE_SIZE,
136+
kvPreWriteBufferMemoryPool::getTotalUsed);
137+
serverMetricGroup.gauge(
138+
MetricNames.KV_PRE_WRITE_BUFFER_MEMORY_POOL_MAX_SIZE_PER_BUCKET,
139+
kvPreWriteBufferMemoryPool::getMaxMemorySizePerBucket);
140+
serverMetricGroup.gauge(
141+
MetricNames.KV_PRE_WRITE_BUFFER_MEMORY_POOL_MAX_SIZE,
142+
kvPreWriteBufferMemoryPool::getMaxMemorySize);
143+
}
144+
127145
public void startup() {
128146
// should do nothing now
129147
}
@@ -140,6 +158,7 @@ public void shutdown() {
140158
}
141159
arrowBufferAllocator.close();
142160
memorySegmentPool.close();
161+
kvPreWriteBufferMemoryPool.close();
143162
LOG.info("Shut down KvManager complete.");
144163
}
145164

@@ -186,7 +205,8 @@ public KvTablet getOrCreateKv(
186205
kvFormat,
187206
schema,
188207
merger,
189-
arrowCompressionInfo);
208+
arrowCompressionInfo,
209+
kvPreWriteBufferMemoryPool);
190210
currentKvs.put(tableBucket, tablet);
191211

192212
LOG.info(
@@ -295,7 +315,8 @@ public KvTablet loadKv(File tabletDir) throws Exception {
295315
tableInfo.getTableConfig().getKvFormat(),
296316
tableInfo.getSchema(),
297317
rowMerger,
298-
tableInfo.getTableConfig().getArrowCompressionInfo());
318+
tableInfo.getTableConfig().getArrowCompressionInfo(),
319+
kvPreWriteBufferMemoryPool);
299320
if (this.currentKvs.containsKey(tableBucket)) {
300321
throw new IllegalStateException(
301322
String.format(

fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.apache.fluss.row.encode.ValueEncoder;
4343
import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer;
4444
import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.TruncateReason;
45+
import org.apache.fluss.server.kv.prewrite.KvPreWriteBufferMemoryPool;
4546
import org.apache.fluss.server.kv.rocksdb.RocksDBKv;
4647
import org.apache.fluss.server.kv.rocksdb.RocksDBKvBuilder;
4748
import org.apache.fluss.server.kv.rocksdb.RocksDBResourceContainer;
@@ -80,6 +81,7 @@
8081
import java.util.concurrent.locks.ReadWriteLock;
8182
import java.util.concurrent.locks.ReentrantReadWriteLock;
8283

84+
import static org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.toByteArray;
8385
import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock;
8486
import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;
8587

@@ -108,6 +110,7 @@ public final class KvTablet {
108110
// defines how to merge rows on the same primary key
109111
private final RowMerger rowMerger;
110112
private final ArrowCompressionInfo arrowCompressionInfo;
113+
private final KvPreWriteBufferMemoryPool kvPreWriteBufferMemoryPool;
111114

112115
/**
113116
* The kv data in pre-write buffer whose log offset is less than the flushedLogOffset has been
@@ -132,14 +135,21 @@ private KvTablet(
132135
KvFormat kvFormat,
133136
Schema schema,
134137
RowMerger rowMerger,
135-
ArrowCompressionInfo arrowCompressionInfo) {
138+
ArrowCompressionInfo arrowCompressionInfo,
139+
KvPreWriteBufferMemoryPool kvPreWriteBufferMemoryPool) {
136140
this.physicalPath = physicalPath;
137141
this.tableBucket = tableBucket;
138142
this.logTablet = logTablet;
139143
this.kvTabletDir = kvTabletDir;
140144
this.rocksDBKv = rocksDBKv;
141145
this.writeBatchSize = writeBatchSize;
142-
this.kvPreWriteBuffer = new KvPreWriteBuffer(createKvBatchWriter(), serverMetricGroup);
146+
this.kvPreWriteBufferMemoryPool = kvPreWriteBufferMemoryPool;
147+
this.kvPreWriteBuffer =
148+
new KvPreWriteBuffer(
149+
tableBucket,
150+
createKvBatchWriter(),
151+
kvPreWriteBufferMemoryPool,
152+
serverMetricGroup);
143153
this.logFormat = logFormat;
144154
this.arrowWriterProvider = new ArrowWriterPool(arrowBufferAllocator);
145155
this.memorySegmentPool = memorySegmentPool;
@@ -159,7 +169,8 @@ public static KvTablet create(
159169
KvFormat kvFormat,
160170
Schema schema,
161171
RowMerger rowMerger,
162-
ArrowCompressionInfo arrowCompressionInfo)
172+
ArrowCompressionInfo arrowCompressionInfo,
173+
KvPreWriteBufferMemoryPool kvPreWriteBufferMemoryPool)
163174
throws IOException {
164175
Tuple2<PhysicalTablePath, TableBucket> tablePathAndBucket =
165176
FlussPaths.parseTabletDir(kvTabletDir);
@@ -175,7 +186,8 @@ public static KvTablet create(
175186
kvFormat,
176187
schema,
177188
rowMerger,
178-
arrowCompressionInfo);
189+
arrowCompressionInfo,
190+
kvPreWriteBufferMemoryPool);
179191
}
180192

181193
public static KvTablet create(
@@ -190,7 +202,8 @@ public static KvTablet create(
190202
KvFormat kvFormat,
191203
Schema schema,
192204
RowMerger rowMerger,
193-
ArrowCompressionInfo arrowCompressionInfo)
205+
ArrowCompressionInfo arrowCompressionInfo,
206+
KvPreWriteBufferMemoryPool kvPreWriteBufferMemoryPool)
194207
throws IOException {
195208
RocksDBKv kv = buildRocksDBKv(serverConf, kvTabletDir);
196209
return new KvTablet(
@@ -207,7 +220,8 @@ public static KvTablet create(
207220
kvFormat,
208221
schema,
209222
rowMerger,
210-
arrowCompressionInfo);
223+
arrowCompressionInfo,
224+
kvPreWriteBufferMemoryPool);
211225
}
212226

213227
private static RocksDBKv buildRocksDBKv(Configuration configuration, File kvDir)
@@ -247,6 +261,10 @@ public long getFlushedLogOffset() {
247261
return flushedLogOffset;
248262
}
249263

264+
public long getPreWriteBufferMemoryPoolUsage() {
265+
return kvPreWriteBufferMemoryPool.getPerBucketUsage(tableBucket);
266+
}
267+
250268
/**
251269
* Put the KvRecordBatch into the kv storage, and return the appended wal log info.
252270
*
@@ -469,7 +487,9 @@ private byte[] getFromBufferOrKv(KvPreWriteBuffer.Key key) throws IOException {
469487
if (value == null) {
470488
return rocksDBKv.get(key.get());
471489
}
472-
return value.get();
490+
491+
// TODO maybe we can use ByteBuffer to avoid copy
492+
return toByteArray(value.get());
473493
}
474494

475495
public List<byte[]> multiGet(List<byte[]> keys) throws IOException {

0 commit comments

Comments
 (0)