Skip to content

Commit 7560223

Browse files
committed
[kv] Introduce a shared memory cache for PrewriteBuffer between different replica
1 parent 5049663 commit 7560223

File tree

14 files changed

+552
-57
lines changed

14 files changed

+552
-57
lines changed

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1553,6 +1553,25 @@ public class ConfigOptions {
15531553
"The max size of the consumed memory for RocksDB batch write, "
15541554
+ "will flush just based on item count if this config set to 0.");
15551555

1556+
public static final ConfigOption<MemorySize> KV_PRE_WRITE_BUFFER_MEMORY_POOL_SIZE =
1557+
key("kv.preWriteBuffer.memory-pool-size")
1558+
.memoryType()
1559+
.defaultValue(MemorySize.parse("2gb"))
1560+
.withDescription(
1561+
"The max size of the off-heap memory pool for pre-write buffer. "
1562+
+ "All buckets will share the same buffer pool in one tabletServer. "
1563+
+ "If the used memory exceeds this config, PreWriteBufferFullException will be thrown. "
1564+
+ "The default value is 2GB.");
1565+
1566+
public static final ConfigOption<MemorySize> KV_PRE_WRITE_BUFFER_MEMORY_POOL_SIZ_PER_BUCKET =
1567+
key("kv.preWriteBuffer.memory-pool-size-per-bucket")
1568+
.memoryType()
1569+
.defaultValue(MemorySize.parse("200mb"))
1570+
.withDescription(
1571+
"The max size of the off-heap memory buffer pool for pre-write buffer of each table bucket. "
1572+
+ "If there is a table bucket who use the buffer size exceeds this config, "
1573+
+ "PreWriteBufferFullException will be thrown. The default value is 200MB.");
1574+
15561575
// --------------------------------------------------------------------------
15571576
// Provided configurable ColumnFamilyOptions within Fluss
15581577
// --------------------------------------------------------------------------
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.exception;
19+
20+
/** Pre-write buffer is full. */
21+
public class PreWriteBufferFullException extends RetriableException {
22+
private static final long serialVersionUID = 1L;
23+
24+
public PreWriteBufferFullException(String message) {
25+
super(message);
26+
}
27+
}

fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,12 @@ public class MetricNames {
124124
"preWriteBufferTruncateAsDuplicatedPerSecond";
125125
public static final String KV_PRE_WRITE_BUFFER_TRUNCATE_AS_ERROR_RATE =
126126
"preWriteBufferTruncateAsErrorPerSecond";
127+
public static final String KV_PRE_WRITE_BUFFER_MEMORY_POOL_MAX_SIZE =
128+
"preWriteBufferMemoryPoolMaxSize";
129+
public static final String KV_PRE_WRITE_BUFFER_MEMORY_POOL_MAX_SIZE_PER_BUCKET =
130+
"preWriteBufferMemoryPoolMaxSizePerBucket";
131+
public static final String KV_PRE_WRITE_BUFFER_MEMORY_POOL_USAGE_SIZE =
132+
"preWriteBufferMemoryPoolUsageSize";
127133

128134
// --------------------------------------------------------------------------------------------
129135
// metrics for table bucket

fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import org.apache.fluss.exception.OutOfOrderSequenceException;
6161
import org.apache.fluss.exception.PartitionAlreadyExistsException;
6262
import org.apache.fluss.exception.PartitionNotExistException;
63+
import org.apache.fluss.exception.PreWriteBufferFullException;
6364
import org.apache.fluss.exception.RecordTooLargeException;
6465
import org.apache.fluss.exception.RetriableAuthenticationException;
6566
import org.apache.fluss.exception.SchemaNotExistException;
@@ -228,7 +229,9 @@ public enum Errors {
228229
INVALID_ALTER_TABLE_EXCEPTION(
229230
56, "The alter table is invalid.", InvalidAlterTableException::new),
230231
DELETION_DISABLED_EXCEPTION(
231-
57, "Deletion operations are disabled on this table.", DeletionDisabledException::new);
232+
57, "Deletion operations are disabled on this table.", DeletionDisabledException::new),
233+
PRE_WRITE_BUFFER_FULL_EXCEPTION(
234+
58, "The pre-write buffer is full.", PreWriteBufferFullException::new);
232235

233236
private static final Logger LOG = LoggerFactory.getLogger(Errors.class);
234237

fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@
3232
import org.apache.fluss.metadata.TableBucket;
3333
import org.apache.fluss.metadata.TableInfo;
3434
import org.apache.fluss.metadata.TablePath;
35+
import org.apache.fluss.metrics.MetricNames;
3536
import org.apache.fluss.server.TabletManagerBase;
37+
import org.apache.fluss.server.kv.prewrite.KvPreWriteBufferMemoryPool;
3638
import org.apache.fluss.server.kv.rowmerger.RowMerger;
3739
import org.apache.fluss.server.log.LogManager;
3840
import org.apache.fluss.server.log.LogTablet;
@@ -76,6 +78,8 @@ public final class KvManager extends TabletManagerBase {
7678

7779
private final Map<TableBucket, KvTablet> currentKvs = MapUtils.newConcurrentHashMap();
7880

81+
private final KvPreWriteBufferMemoryPool kvPreWriteBufferMemoryPool;
82+
7983
/**
8084
* For arrow log format. The buffer allocator to allocate memory for arrow write batch of
8185
* changelog records.
@@ -104,7 +108,9 @@ private KvManager(
104108
this.zkClient = zkClient;
105109
this.remoteKvDir = FlussPaths.remoteKvDir(conf);
106110
this.remoteFileSystem = remoteKvDir.getFileSystem();
111+
this.kvPreWriteBufferMemoryPool = new KvPreWriteBufferMemoryPool(conf);
107112
this.serverMetricGroup = tabletServerMetricGroup;
113+
registerMetrics();
108114
}
109115

110116
public static KvManager create(
@@ -124,6 +130,18 @@ public static KvManager create(
124130
tabletServerMetricGroup);
125131
}
126132

133+
private void registerMetrics() {
134+
serverMetricGroup.gauge(
135+
MetricNames.KV_PRE_WRITE_BUFFER_MEMORY_POOL_USAGE_SIZE,
136+
kvPreWriteBufferMemoryPool::getTotalUsed);
137+
serverMetricGroup.gauge(
138+
MetricNames.KV_PRE_WRITE_BUFFER_MEMORY_POOL_MAX_SIZE_PER_BUCKET,
139+
kvPreWriteBufferMemoryPool::getMaxMemorySizePerBucket);
140+
serverMetricGroup.gauge(
141+
MetricNames.KV_PRE_WRITE_BUFFER_MEMORY_POOL_MAX_SIZE,
142+
kvPreWriteBufferMemoryPool::getMaxMemorySize);
143+
}
144+
127145
public void startup() {
128146
// should do nothing now
129147
}
@@ -140,6 +158,7 @@ public void shutdown() {
140158
}
141159
arrowBufferAllocator.close();
142160
memorySegmentPool.close();
161+
kvPreWriteBufferMemoryPool.close();
143162
LOG.info("Shut down KvManager complete.");
144163
}
145164

@@ -185,7 +204,8 @@ public KvTablet getOrCreateKv(
185204
kvFormat,
186205
merger,
187206
arrowCompressionInfo,
188-
schemaGetter);
207+
schemaGetter,
208+
kvPreWriteBufferMemoryPool);
189209
currentKvs.put(tableBucket, tablet);
190210

191211
LOG.info(
@@ -293,7 +313,8 @@ public KvTablet loadKv(File tabletDir, SchemaGetter schemaGetter) throws Excepti
293313
tableInfo.getTableConfig().getKvFormat(),
294314
rowMerger,
295315
tableInfo.getTableConfig().getArrowCompressionInfo(),
296-
schemaGetter);
316+
schemaGetter,
317+
kvPreWriteBufferMemoryPool);
297318
if (this.currentKvs.containsKey(tableBucket)) {
298319
throw new IllegalStateException(
299320
String.format(

fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.apache.fluss.row.encode.ValueDecoder;
4747
import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer;
4848
import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.TruncateReason;
49+
import org.apache.fluss.server.kv.prewrite.KvPreWriteBufferMemoryPool;
4950
import org.apache.fluss.server.kv.rocksdb.RocksDBKv;
5051
import org.apache.fluss.server.kv.rocksdb.RocksDBKvBuilder;
5152
import org.apache.fluss.server.kv.rocksdb.RocksDBResourceContainer;
@@ -84,6 +85,7 @@
8485
import java.util.concurrent.locks.ReadWriteLock;
8586
import java.util.concurrent.locks.ReentrantReadWriteLock;
8687

88+
import static org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.toByteArray;
8789
import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock;
8890
import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;
8991

@@ -111,6 +113,7 @@ public final class KvTablet {
111113
// defines how to merge rows on the same primary key
112114
private final RowMerger rowMerger;
113115
private final ArrowCompressionInfo arrowCompressionInfo;
116+
private final KvPreWriteBufferMemoryPool kvPreWriteBufferMemoryPool;
114117

115118
private final SchemaGetter schemaGetter;
116119

@@ -137,14 +140,21 @@ private KvTablet(
137140
KvFormat kvFormat,
138141
RowMerger rowMerger,
139142
ArrowCompressionInfo arrowCompressionInfo,
140-
SchemaGetter schemaGetter) {
143+
SchemaGetter schemaGetter,
144+
KvPreWriteBufferMemoryPool kvPreWriteBufferMemoryPool) {
141145
this.physicalPath = physicalPath;
142146
this.tableBucket = tableBucket;
143147
this.logTablet = logTablet;
144148
this.kvTabletDir = kvTabletDir;
145149
this.rocksDBKv = rocksDBKv;
146150
this.writeBatchSize = writeBatchSize;
147-
this.kvPreWriteBuffer = new KvPreWriteBuffer(createKvBatchWriter(), serverMetricGroup);
151+
this.kvPreWriteBufferMemoryPool = kvPreWriteBufferMemoryPool;
152+
this.kvPreWriteBuffer =
153+
new KvPreWriteBuffer(
154+
tableBucket,
155+
createKvBatchWriter(),
156+
kvPreWriteBufferMemoryPool,
157+
serverMetricGroup);
148158
this.logFormat = logFormat;
149159
this.arrowWriterProvider = new ArrowWriterPool(arrowBufferAllocator);
150160
this.memorySegmentPool = memorySegmentPool;
@@ -164,7 +174,8 @@ public static KvTablet create(
164174
KvFormat kvFormat,
165175
RowMerger rowMerger,
166176
ArrowCompressionInfo arrowCompressionInfo,
167-
SchemaGetter schemaGetter)
177+
SchemaGetter schemaGetter,
178+
KvPreWriteBufferMemoryPool kvPreWriteBufferMemoryPool)
168179
throws IOException {
169180
Tuple2<PhysicalTablePath, TableBucket> tablePathAndBucket =
170181
FlussPaths.parseTabletDir(kvTabletDir);
@@ -180,7 +191,8 @@ public static KvTablet create(
180191
kvFormat,
181192
rowMerger,
182193
arrowCompressionInfo,
183-
schemaGetter);
194+
schemaGetter,
195+
kvPreWriteBufferMemoryPool);
184196
}
185197

186198
public static KvTablet create(
@@ -195,7 +207,8 @@ public static KvTablet create(
195207
KvFormat kvFormat,
196208
RowMerger rowMerger,
197209
ArrowCompressionInfo arrowCompressionInfo,
198-
SchemaGetter schemaGetter)
210+
SchemaGetter schemaGetter,
211+
KvPreWriteBufferMemoryPool kvPreWriteBufferMemoryPool)
199212
throws IOException {
200213
RocksDBKv kv = buildRocksDBKv(serverConf, kvTabletDir);
201214
return new KvTablet(
@@ -212,7 +225,8 @@ public static KvTablet create(
212225
kvFormat,
213226
rowMerger,
214227
arrowCompressionInfo,
215-
schemaGetter);
228+
schemaGetter,
229+
kvPreWriteBufferMemoryPool);
216230
}
217231

218232
private static RocksDBKv buildRocksDBKv(Configuration configuration, File kvDir)
@@ -252,6 +266,10 @@ public long getFlushedLogOffset() {
252266
return flushedLogOffset;
253267
}
254268

269+
public long getPreWriteBufferMemoryPoolUsage() {
270+
return kvPreWriteBufferMemoryPool.getPerBucketUsage(tableBucket);
271+
}
272+
255273
/**
256274
* Put the KvRecordBatch into the kv storage, and return the appended wal log info.
257275
*
@@ -518,7 +536,9 @@ private byte[] getFromBufferOrKv(KvPreWriteBuffer.Key key) throws IOException {
518536
if (value == null) {
519537
return rocksDBKv.get(key.get());
520538
}
521-
return value.get();
539+
540+
// TODO maybe we can use ByteBuffer to avoid copy
541+
return toByteArray(value.get());
522542
}
523543

524544
public List<byte[]> multiGet(List<byte[]> keys) throws IOException {

0 commit comments

Comments
 (0)