Skip to content

Commit 2da48d5

Browse files
authored
[client] Support dynamic estimate write batch size (#976)
1 parent abe5966 commit 2da48d5

File tree

5 files changed

+205
-5
lines changed

5 files changed

+205
-5
lines changed
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Copyright (c) 2025 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.client.write;
18+
19+
import com.alibaba.fluss.annotation.Internal;
20+
import com.alibaba.fluss.metadata.PhysicalTablePath;
21+
import com.alibaba.fluss.utils.MapUtils;
22+
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
26+
import javax.annotation.concurrent.ThreadSafe;
27+
28+
import java.util.concurrent.ConcurrentHashMap;
29+
30+
/** An estimator to estimate the buffer usage of a writeBatch. */
31+
@Internal
32+
@ThreadSafe
33+
public class DynamicWriteBatchSizeEstimator {
34+
35+
private static final Logger LOG = LoggerFactory.getLogger(DynamicWriteBatchSizeEstimator.class);
36+
37+
private static final double RATIO_TO_INCREASE_BATCH_SIZE = 0.9d;
38+
private static final double RATIO_TO_DECREASE_BATCH_SIZE = 0.5d;
39+
private final int maxBatchSize;
40+
private final int pageSize;
41+
private final boolean dynamicBatchSizeEnabled;
42+
43+
private final ConcurrentHashMap<PhysicalTablePath, Integer> estimatedBatchSizeMap;
44+
45+
public DynamicWriteBatchSizeEstimator(
46+
boolean dynamicBatchSizeEnabled, int maxBatchSize, int pageSize) {
47+
this.dynamicBatchSizeEnabled = dynamicBatchSizeEnabled;
48+
49+
if (dynamicBatchSizeEnabled) {
50+
this.estimatedBatchSizeMap = MapUtils.newConcurrentHashMap();
51+
} else {
52+
this.estimatedBatchSizeMap = null;
53+
}
54+
55+
this.maxBatchSize = maxBatchSize;
56+
this.pageSize = pageSize;
57+
}
58+
59+
public void updateEstimation(PhysicalTablePath physicalTablePath, int observedBatchSize) {
60+
if (!dynamicBatchSizeEnabled) {
61+
return;
62+
}
63+
64+
int estimatedBatchSize =
65+
estimatedBatchSizeMap.getOrDefault(physicalTablePath, maxBatchSize);
66+
int newEstimatedBatchSize;
67+
if (observedBatchSize >= estimatedBatchSize
68+
|| observedBatchSize > estimatedBatchSize * RATIO_TO_INCREASE_BATCH_SIZE) {
69+
// To increase 1%
70+
newEstimatedBatchSize = Math.min((int) (estimatedBatchSize * 1.1), maxBatchSize);
71+
} else if (observedBatchSize < estimatedBatchSize * RATIO_TO_INCREASE_BATCH_SIZE
72+
&& observedBatchSize > estimatedBatchSize * RATIO_TO_DECREASE_BATCH_SIZE) {
73+
// To decrease 5%
74+
newEstimatedBatchSize = Math.max((int) (estimatedBatchSize * 0.95), pageSize);
75+
} else {
76+
// To decrease 10%
77+
newEstimatedBatchSize = Math.max((int) (estimatedBatchSize * 0.9), pageSize);
78+
}
79+
80+
estimatedBatchSizeMap.put(physicalTablePath, newEstimatedBatchSize);
81+
LOG.debug(
82+
"Set estimated batch size for {} from {} to {}",
83+
physicalTablePath,
84+
estimatedBatchSize,
85+
newEstimatedBatchSize);
86+
}
87+
88+
public int getEstimatedBatchSize(PhysicalTablePath physicalTablePath) {
89+
return dynamicBatchSizeEnabled
90+
? estimatedBatchSizeMap.getOrDefault(physicalTablePath, maxBatchSize)
91+
: maxBatchSize;
92+
}
93+
}

fluss-client/src/main/java/com/alibaba/fluss/client/write/RecordAccumulator.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,6 @@ public final class RecordAccumulator {
7979
private final AtomicInteger flushesInProgress;
8080
private final AtomicInteger appendsInProgress;
8181
private final int batchSize;
82-
private final int pagesPerBatch;
8382

8483
/**
8584
* An artificial delay time to add before declaring a records instance that isn't full ready for
@@ -110,6 +109,7 @@ public final class RecordAccumulator {
110109

111110
private final IdempotenceManager idempotenceManager;
112111
private final Clock clock;
112+
private final DynamicWriteBatchSizeEstimator batchSizeEstimator;
113113

114114
// TODO add retryBackoffMs to retry the produce request upon receiving an error.
115115
// TODO add deliveryTimeoutMs to report success or failure on record delivery.
@@ -132,11 +132,15 @@ public final class RecordAccumulator {
132132
Math.max(1, (int) conf.get(ConfigOptions.CLIENT_WRITER_BATCH_SIZE).getBytes());
133133

134134
this.writerBufferPool = LazyMemorySegmentPool.createWriterBufferPool(conf);
135-
this.pagesPerBatch = Math.max(1, MathUtils.ceilDiv(batchSize, writerBufferPool.pageSize()));
136135
this.bufferAllocator = new RootAllocator(Long.MAX_VALUE);
137136
this.arrowWriterPool = new ArrowWriterPool(bufferAllocator);
138137
this.incomplete = new IncompleteBatches();
139138
this.nodesDrainIndex = new HashMap<>();
139+
this.batchSizeEstimator =
140+
new DynamicWriteBatchSizeEstimator(
141+
conf.get(ConfigOptions.CLIENT_WRITER_DYNAMIC_BATCH_SIZE_ENABLED),
142+
batchSize,
143+
(int) conf.get(ConfigOptions.CLIENT_WRITER_BUFFER_PAGE_SIZE).getBytes());
140144
this.idempotenceManager = idempotenceManager;
141145
this.clock = clock;
142146
registerMetrics(writerMetricGroup);
@@ -200,7 +204,7 @@ public RecordAppendResult append(
200204
return new RecordAppendResult(true, false, true);
201205
}
202206

203-
memorySegments = allocateMemorySegments(writeRecord);
207+
memorySegments = allocateMemorySegments(writeRecord, physicalTablePath);
204208
synchronized (dq) {
205209
RecordAppendResult appendResult =
206210
appendNewBatch(
@@ -421,7 +425,15 @@ public Set<PhysicalTablePath> getPhysicalTablePathsInBatches() {
421425
return writeBatches.keySet();
422426
}
423427

424-
private List<MemorySegment> allocateMemorySegments(WriteRecord writeRecord) throws IOException {
428+
private List<MemorySegment> allocateMemorySegments(
429+
WriteRecord writeRecord, PhysicalTablePath physicalTablePath) throws IOException {
430+
int pagesPerBatch =
431+
Math.max(
432+
1,
433+
MathUtils.ceilDiv(
434+
batchSizeEstimator.getEstimatedBatchSize(physicalTablePath),
435+
writerBufferPool.pageSize()));
436+
425437
if (writeRecord.getWriteFormat() == WriteFormat.ARROW_LOG) {
426438
// pre-allocate a batch memory size for Arrow, if it is not sufficient during batching,
427439
// it will allocate memory from heap
@@ -714,7 +726,10 @@ private List<ReadyWriteBatch> drainBatchesForOneNode(
714726
// the rest of the work by processing outside the lock close() is particularly expensive
715727
checkNotNull(batch, "batch should not be null");
716728
batch.close();
717-
size += batch.estimatedSizeInBytes();
729+
int currentBatchSize = batch.estimatedSizeInBytes();
730+
size += currentBatchSize;
731+
batchSizeEstimator.updateEstimation(physicalTablePath, currentBatchSize);
732+
718733
ready.add(new ReadyWriteBatch(tableBucket, batch));
719734
// mark the batch as drained.
720735
batch.drained(System.currentTimeMillis());
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright (c) 2025 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.client.write;
18+
19+
import org.junit.jupiter.api.BeforeEach;
20+
import org.junit.jupiter.api.Test;
21+
22+
import static com.alibaba.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH;
23+
import static org.assertj.core.api.Assertions.assertThat;
24+
25+
/** Test for {@link DynamicWriteBatchSizeEstimator}. */
26+
public class DynamicWriteBatchSizeEstimatorTest {
27+
28+
private DynamicWriteBatchSizeEstimator estimator;
29+
30+
@BeforeEach
31+
public void setup() {
32+
estimator = new DynamicWriteBatchSizeEstimator(true, 1000, 100);
33+
}
34+
35+
@Test
36+
void testEstimator() {
37+
assertThat(estimator.getEstimatedBatchSize(DATA1_PHYSICAL_TABLE_PATH)).isEqualTo(1000);
38+
estimator = new DynamicWriteBatchSizeEstimator(false, 1000, 100);
39+
assertThat(estimator.getEstimatedBatchSize(DATA1_PHYSICAL_TABLE_PATH)).isEqualTo(1000);
40+
41+
estimator = new DynamicWriteBatchSizeEstimator(true, 1000, 100);
42+
// test decrease 10%
43+
estimator.updateEstimation(DATA1_PHYSICAL_TABLE_PATH, 450);
44+
assertThat(estimator.getEstimatedBatchSize(DATA1_PHYSICAL_TABLE_PATH)).isEqualTo(900);
45+
46+
// test decrease 5%
47+
estimator.updateEstimation(DATA1_PHYSICAL_TABLE_PATH, (int) (900 * 0.9) - 10);
48+
assertThat(estimator.getEstimatedBatchSize(DATA1_PHYSICAL_TABLE_PATH)).isEqualTo(855);
49+
50+
// test increase 1%
51+
estimator.updateEstimation(DATA1_PHYSICAL_TABLE_PATH, 852);
52+
assertThat(estimator.getEstimatedBatchSize(DATA1_PHYSICAL_TABLE_PATH))
53+
.isEqualTo((int) (855 * 1.1));
54+
}
55+
56+
@Test
57+
void testMinDecreaseToPageSize() {
58+
int estimatedSize = estimator.getEstimatedBatchSize(DATA1_PHYSICAL_TABLE_PATH);
59+
estimator.updateEstimation(DATA1_PHYSICAL_TABLE_PATH, 1000);
60+
while (estimatedSize > 100) {
61+
estimator.updateEstimation(DATA1_PHYSICAL_TABLE_PATH, (int) (estimatedSize * 0.5) - 10);
62+
estimatedSize = estimator.getEstimatedBatchSize(DATA1_PHYSICAL_TABLE_PATH);
63+
}
64+
65+
assertThat(estimator.getEstimatedBatchSize(DATA1_PHYSICAL_TABLE_PATH)).isEqualTo(100);
66+
estimator.updateEstimation(DATA1_PHYSICAL_TABLE_PATH, 0);
67+
assertThat(estimator.getEstimatedBatchSize(DATA1_PHYSICAL_TABLE_PATH)).isEqualTo(100);
68+
}
69+
70+
@Test
71+
void testMaxIncreaseToMaxBatchSize() {
72+
assertThat(estimator.getEstimatedBatchSize(DATA1_PHYSICAL_TABLE_PATH)).isEqualTo(1000);
73+
estimator.updateEstimation(DATA1_PHYSICAL_TABLE_PATH, 2000);
74+
assertThat(estimator.getEstimatedBatchSize(DATA1_PHYSICAL_TABLE_PATH)).isEqualTo(1000);
75+
}
76+
}

fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -813,6 +813,21 @@ public class ConfigOptions {
813813
"The writer or walBuilder will attempt to batch records together into one batch for"
814814
+ " the same bucket. This helps performance on both the client and the server.");
815815

816+
public static final ConfigOption<Boolean> CLIENT_WRITER_DYNAMIC_BATCH_SIZE_ENABLED =
817+
key("client.writer.dynamic-batch-size.enabled")
818+
.booleanType()
819+
.defaultValue(true)
820+
.withDescription(
821+
"Controls whether the client writer dynamically adjusts the batch size based on actual write throughput. Enabled by default. "
822+
+ "With dynamic batch sizing enabled, the writer adapts memory allocation per batch according to historical write sizes for the target table or partition. This ensures better memory utilization and performance under varying throughput conditions. The dynamic batch size is bounded: it will not exceed `"
823+
+ CLIENT_WRITER_BATCH_SIZE.key()
824+
+ "`, nor fall below `"
825+
+ CLIENT_WRITER_BUFFER_PAGE_SIZE.key()
826+
+ "`."
827+
+ "When disabled, the writer uses a fixed batch size (`"
828+
+ CLIENT_WRITER_BATCH_SIZE.key()
829+
+ "`) for all batches, this may lead to frequent memory waits and suboptimal write performance if the incoming data rate is inconsistent across partitions.");
830+
816831
public static final ConfigOption<Duration> CLIENT_WRITER_BATCH_TIMEOUT =
817832
key("client.writer.batch-timeout")
818833
.durationType()

website/docs/engine-flink/options.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ ALTER TABLE log_table SET ('table.log.ttl' = '7d');
143143
| client.writer.buffer.page-size | MemorySize | 128kb | Size of every page in memory buffers (`client.writer.buffer.memory-size`). |
144144
| client.writer.buffer.per-request-memory-size | MemorySize | 16mb | The minimum number of bytes that will be allocated by the writer rounded down to the closest multiple of client.writer.buffer.page-size. It must be greater than or equal to client.writer.buffer.page-size. This option allows to allocate memory in batches to have better CPU-cached friendliness due to contiguous segments. |
145145
| client.writer.batch-size | MemorySize | 2mb | The writer or walBuilder will attempt to batch records together into one batch for the same bucket. This helps performance on both the client and the server. |
146+
| client.writer.dynamic-batch-size.enabled | Boolean | true | Controls whether the client writer dynamically adjusts the batch size based on actual write throughput. Enabled by default. With dynamic batch sizing enabled, the writer adapts memory allocation per batch according to historical write sizes for the target table or partition. This ensures better memory utilization and performance under varying throughput conditions. The dynamic batch size is bounded: it will not exceed `client.writer.batch-size`, nor fall below `client.writer.buffer.page-size`. When disabled, the writer uses a fixed batch size (`client.writer.batch-size`) for all batches, this may lead to frequent memory waits and suboptimal write performance if the incoming data rate is inconsistent across partitions. |
146147
| client.writer.buffer.wait-timeout | Duration | 2^(63)-1ns | Defines how long the writer will block when waiting for segments to become available. |
147148
| client.writer.batch-timeout | Duration | 100ms | The writer groups ay rows that arrive in between request sends into a single batched request. Normally this occurs only under load when rows arrive faster than they can be sent out. However in some circumstances the writer may want to reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount of artificial delay, that is, rather than immediately sending out a row, the writer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together. This can be thought of as analogous to Nagle's algorithm in TCP. This setting gives the upper bound on the delay for batching: once we get client.writer.batch-size worth of rows for a bucket it will be sent immediately regardless of this setting, however if we have fewer than this many bytes accumulated for this bucket we will delay for the specified time waiting for more records to show up. |
148149
| client.writer.bucket.no-key-assigner | Enum | STICKY | The bucket assigner for no key table. For table with bucket key or primary key, we choose a bucket based on a hash of the key. For these table without bucket key and primary key, we can use this option to specify bucket assigner, the candidate assigner is ROUND_ROBIN, STICKY, the default assigner is STICKY.<br/>ROUND_ROBIN: this strategy will assign the bucket id for the input row by round robin.<br/>STICKY: this strategy will assign new bucket id only if the batch changed in record accumulator, otherwise the bucket id will be the same as the front record. |

0 commit comments

Comments
 (0)