Skip to content

Commit ea579fa

Browse files
committed
Addressed PR feedback
1 parent d5ced1c commit ea579fa

File tree

4 files changed

+106
-88
lines changed

4 files changed

+106
-88
lines changed

fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java

Lines changed: 29 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -253,22 +253,24 @@ public class ConfigOptions {
253253
+ SERVER_BUFFER_MEMORY_SIZE.key()
254254
+ "').");
255255

256-
// noDefaultValue() leads to NPE in some code parts, hence we default to 0b
257256
public static final ConfigOption<MemorySize> SERVER_BUFFER_PER_REQUEST_MEMORY_SIZE =
258257
key("server.buffer.per-request-memory-size")
259258
.memoryType()
260-
.defaultValue(MemorySize.parse("0b"))
259+
.defaultValue(MemorySize.parse("16mb"))
261260
.withDescription(
262-
"The minimum number of bytes that will be allocated by the writer rounded down to the closest multiple of "
261+
"The minimum number of bytes that will be allocated by the writer rounded down to the closes multiple of "
263262
+ SERVER_BUFFER_PAGE_SIZE.key()
264-
+ " (but at least one page). This option allows to allocate memory in batches to have better CPU-cached friendliness due to contiguous segments.");
263+
+ "It must be greater than or equal to "
264+
+ SERVER_BUFFER_PAGE_SIZE.key()
265+
+ ". "
266+
+ "This option allows to allocate memory in batches to have better CPU-cached friendliness due to contiguous segments.");
265267

266-
public static final ConfigOption<Long> SERVER_BUFFER_POOL_WAIT_TIMEOUT =
268+
public static final ConfigOption<Duration> SERVER_BUFFER_POOL_WAIT_TIMEOUT =
267269
key("server.buffer.wait-timeout")
268-
.longType()
269-
.defaultValue(Long.MAX_VALUE)
270+
.durationType()
271+
.defaultValue(Duration.ofMillis(Long.MAX_VALUE))
270272
.withDescription(
271-
"Defines how long the buffer pool will block when waiting for segments to become available in ms.");
273+
"Defines how long the buffer pool will block when waiting for segments to become available.");
272274

273275
// ------------------------------------------------------------------
274276
// ZooKeeper Settings
@@ -630,30 +632,32 @@ public class ConfigOptions {
630632
+ CLIENT_WRITER_BUFFER_MEMORY_SIZE.key()
631633
+ "').");
632634

633-
public static final ConfigOption<MemorySize> CLIENT_WRITER_BATCH_SIZE =
634-
key("client.writer.batch-size")
635-
.memoryType()
636-
.defaultValue(MemorySize.parse("2mb"))
637-
.withDescription(
638-
"The writer or walBuilder will attempt to batch records together into one batch for"
639-
+ " the same bucket. This helps performance on both the client and the server.");
640-
641-
// noDefaultValue() leads to NPE in some code parts, hence we default to 0b
642635
public static final ConfigOption<MemorySize> CLIENT_WRITER_PER_REQUEST_MEMORY_SIZE =
643-
key("client.writer.per-request-memory-size")
636+
key("client.writer.buffer.per-request-memory-size")
644637
.memoryType()
645-
.defaultValue(MemorySize.parse("0b"))
638+
.defaultValue(MemorySize.parse("16mb"))
646639
.withDescription(
647640
"The minimum number of bytes that will be allocated by the writer rounded down to the closes multiple of "
648641
+ CLIENT_WRITER_BUFFER_PAGE_SIZE.key()
649-
+ " (but at least one page). This option allows to allocate memory in batches to have better CPU-cached friendliness due to contiguous segments.");
642+
+ "It must be greater than or equal to "
643+
+ CLIENT_WRITER_BUFFER_PAGE_SIZE.key()
644+
+ ". "
645+
+ "This option allows to allocate memory in batches to have better CPU-cached friendliness due to contiguous segments.");
650646

651-
public static final ConfigOption<Long> CLIENT_WRITER_WAIT_TIMEOUT =
652-
key("client.writer.wait-timeout")
653-
.longType()
654-
.defaultValue(Long.MAX_VALUE)
647+
public static final ConfigOption<Duration> CLIENT_WRITER_BUFFER_WAIT_TIMEOUT =
648+
key("client.writer.buffer.wait-timeout")
649+
.durationType()
650+
.defaultValue(Duration.ofMillis(Long.MAX_VALUE))
655651
.withDescription(
656-
"Defines how long the writer will block when waiting for segments to become available in ms.");
652+
"Defines how long the writer will block when waiting for segments to become available.");
653+
654+
public static final ConfigOption<MemorySize> CLIENT_WRITER_BATCH_SIZE =
655+
key("client.writer.batch-size")
656+
.memoryType()
657+
.defaultValue(MemorySize.parse("2mb"))
658+
.withDescription(
659+
"The writer or walBuilder will attempt to batch records together into one batch for"
660+
+ " the same bucket. This helps performance on both the client and the server.");
657661

658662
public static final ConfigOption<Duration> CLIENT_WRITER_BATCH_TIMEOUT =
659663
key("client.writer.batch-timeout")

fluss-common/src/main/java/com/alibaba/fluss/memory/LazyMemorySegmentPool.java

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import com.alibaba.fluss.annotation.VisibleForTesting;
2121
import com.alibaba.fluss.config.ConfigOptions;
2222
import com.alibaba.fluss.config.Configuration;
23-
import com.alibaba.fluss.config.MemorySize;
2423
import com.alibaba.fluss.exception.FlussRuntimeException;
2524

2625
import javax.annotation.concurrent.GuardedBy;
@@ -105,25 +104,19 @@ public static LazyMemorySegmentPool createWriterBufferPool(Configuration conf) {
105104
batchSize));
106105
int pageSize = (int) conf.get(ConfigOptions.CLIENT_WRITER_BUFFER_PAGE_SIZE).getBytes();
107106
long perRequestMemorySize =
108-
conf.getOptional(ConfigOptions.CLIENT_WRITER_PER_REQUEST_MEMORY_SIZE)
109-
.filter(s -> s.getBytes() > 0)
110-
.map(MemorySize::getBytes)
111-
.orElse((long) pageSize);
107+
conf.get(ConfigOptions.CLIENT_WRITER_PER_REQUEST_MEMORY_SIZE).getBytes();
112108
int segmentCount = (int) (totalBytes / pageSize);
113-
long waitTimeout = conf.get(ConfigOptions.CLIENT_WRITER_WAIT_TIMEOUT);
109+
long waitTimeout = conf.get(ConfigOptions.CLIENT_WRITER_BUFFER_WAIT_TIMEOUT).toMillis();
114110
return new LazyMemorySegmentPool(segmentCount, pageSize, waitTimeout, perRequestMemorySize);
115111
}
116112

117113
public static LazyMemorySegmentPool createServerBufferPool(Configuration conf) {
118114
long totalBytes = conf.get(ConfigOptions.SERVER_BUFFER_MEMORY_SIZE).getBytes();
119115
int pageSize = (int) conf.get(ConfigOptions.SERVER_BUFFER_PAGE_SIZE).getBytes();
120116
long perRequestMemorySize =
121-
conf.getOptional(ConfigOptions.SERVER_BUFFER_PER_REQUEST_MEMORY_SIZE)
122-
.filter(s -> s.getBytes() > 0)
123-
.map(MemorySize::getBytes)
124-
.orElse((long) pageSize);
117+
conf.get(ConfigOptions.SERVER_BUFFER_PER_REQUEST_MEMORY_SIZE).getBytes();
125118
int segmentCount = (int) (totalBytes / pageSize);
126-
long waitTimeout = conf.get(ConfigOptions.SERVER_BUFFER_POOL_WAIT_TIMEOUT);
119+
long waitTimeout = conf.get(ConfigOptions.SERVER_BUFFER_POOL_WAIT_TIMEOUT).toMillis();
127120
return new LazyMemorySegmentPool(segmentCount, pageSize, waitTimeout, perRequestMemorySize);
128121
}
129122

@@ -168,8 +161,11 @@ private List<MemorySegment> drain(int numPages) {
168161
@VisibleForTesting
169162
protected void lazilyAllocatePages(int required) {
170163
if (cachePages.size() < required) {
171-
int numPages =
172-
Math.min(freePages(), Math.max(required - cachePages.size(), perRequestPages));
164+
int minAllocatePages = required - cachePages.size();
165+
int maxAllocatePages = freePages() - cachePages.size();
166+
// try to allocate more pages than minAllocatePages to have better CPU cache
167+
int numPages = Math.min(maxAllocatePages, Math.max(minAllocatePages, perRequestPages));
168+
173169
for (int i = 0; i < numPages; i++) {
174170
cachePages.add(MemorySegment.allocateHeapMemory(pageSize));
175171
}

fluss-common/src/test/java/com/alibaba/fluss/memory/LazyMemorySegmentPoolTest.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ void testNewPagesAreAllocatedWhenCachedPagesDoNotSuffice() throws Exception {
111111
}
112112

113113
@Test
114-
void testPerRequestMemorySize() throws Exception {
114+
void testPerRequestMemorySizeAllocatesCorrectPageNumber() throws Exception {
115115
LazyMemorySegmentPool pool = buildLazyMemorySegmentSource(10, 512, Long.MAX_VALUE, 2048);
116116
List<MemorySegment> segments = new ArrayList<>();
117117
// should allocate 4 new pages (1 used, 3 cached)
@@ -132,6 +132,24 @@ void testPerRequestMemorySize() throws Exception {
132132
assertThat(pool.getAllCachePages().size()).isEqualTo(0);
133133
}
134134

135+
@Test
136+
void testPerRequestMemorySizeRespectsTotalPageLimit() throws Exception {
137+
LazyMemorySegmentPool pool = buildLazyMemorySegmentSource(5, 512, Long.MAX_VALUE, 2048);
138+
pool.allocatePages(3);
139+
// should allocate 4 new pages (3 used, 1 cached, 2 more available)
140+
assertThat(pool.getAllCachePages().size()).isEqualTo(1);
141+
assertThat(pool.availableMemory()).isEqualTo(1024);
142+
assertThat(pool.freePages()).isEqualTo(2);
143+
// should trigger an allocation request; should use the existing cached page and just
144+
// allocate 1 additional one and not 2048/512=4
145+
pool.allocatePages(2);
146+
// check if page limit is respected
147+
assertThat(pool.getAllCachePages().size()).isEqualTo(0);
148+
// there should also be no further pages available
149+
assertThat(pool.availableMemory()).isEqualTo(0);
150+
assertThat(pool.freePages()).isEqualTo(0);
151+
}
152+
135153
@Test
136154
void testCloseClearsCachedPages() throws Exception {
137155
LazyMemorySegmentPool pool = buildLazyMemorySegmentSource(1, 64, 100, 64);
@@ -463,7 +481,7 @@ void testCreateReaderBufferPoolWithCustomConfig() throws IOException {
463481
conf.set(ConfigOptions.CLIENT_WRITER_BATCH_SIZE, MemorySize.parse("64kb"));
464482
conf.set(ConfigOptions.CLIENT_WRITER_BUFFER_PAGE_SIZE, MemorySize.parse("2kb"));
465483
conf.set(ConfigOptions.CLIENT_WRITER_PER_REQUEST_MEMORY_SIZE, MemorySize.parse("2kb"));
466-
conf.set(ConfigOptions.CLIENT_WRITER_WAIT_TIMEOUT, 1000L);
484+
conf.set(ConfigOptions.CLIENT_WRITER_BUFFER_WAIT_TIMEOUT, Duration.ofMillis(1000L));
467485

468486
LazyMemorySegmentPool pool = LazyMemorySegmentPool.createWriterBufferPool(conf);
469487
assertThat(pool).isNotNull();
@@ -520,7 +538,7 @@ void testCreateServerBufferPoolWithCustomConfig() throws IOException {
520538
conf.set(ConfigOptions.SERVER_BUFFER_MEMORY_SIZE, MemorySize.parse("128kb"));
521539
conf.set(ConfigOptions.SERVER_BUFFER_PAGE_SIZE, MemorySize.parse("2kb"));
522540
conf.set(ConfigOptions.SERVER_BUFFER_PER_REQUEST_MEMORY_SIZE, MemorySize.parse("2kb"));
523-
conf.set(ConfigOptions.SERVER_BUFFER_POOL_WAIT_TIMEOUT, 1000L);
541+
conf.set(ConfigOptions.SERVER_BUFFER_POOL_WAIT_TIMEOUT, Duration.ofMillis(1000L));
524542

525543
LazyMemorySegmentPool pool = LazyMemorySegmentPool.createServerBufferPool(conf);
526544
assertThat(pool).isNotNull();

0 commit comments

Comments
 (0)