Skip to content

Commit 21f5a61

Browse files
authored
[hotfix] Adjust the writeBatch estimate logic to avoid buffer size always tending to decrease (#1076)
1 parent 1d74fbb commit 21f5a61

File tree

2 files changed

+20
-20
lines changed

2 files changed

+20
-20
lines changed

fluss-client/src/main/java/com/alibaba/fluss/client/write/DynamicWriteBatchSizeEstimator.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public class DynamicWriteBatchSizeEstimator {
3434

3535
private static final Logger LOG = LoggerFactory.getLogger(DynamicWriteBatchSizeEstimator.class);
3636

37-
private static final double RATIO_TO_INCREASE_BATCH_SIZE = 0.9d;
37+
private static final double RATIO_TO_INCREASE_BATCH_SIZE = 0.8d;
3838
private static final double RATIO_TO_DECREASE_BATCH_SIZE = 0.5d;
3939
private final int maxBatchSize;
4040
private final int pageSize;
@@ -63,18 +63,14 @@ public void updateEstimation(PhysicalTablePath physicalTablePath, int observedBa
6363

6464
int estimatedBatchSize =
6565
estimatedBatchSizeMap.getOrDefault(physicalTablePath, maxBatchSize);
66-
int newEstimatedBatchSize;
66+
int newEstimatedBatchSize = estimatedBatchSize;
6767
if (observedBatchSize >= estimatedBatchSize
6868
|| observedBatchSize > estimatedBatchSize * RATIO_TO_INCREASE_BATCH_SIZE) {
69-
// To increase 1%
69+
// To increase 10%
7070
newEstimatedBatchSize = Math.min((int) (estimatedBatchSize * 1.1), maxBatchSize);
71-
} else if (observedBatchSize < estimatedBatchSize * RATIO_TO_INCREASE_BATCH_SIZE
72-
&& observedBatchSize > estimatedBatchSize * RATIO_TO_DECREASE_BATCH_SIZE) {
71+
} else if (observedBatchSize < estimatedBatchSize * RATIO_TO_DECREASE_BATCH_SIZE) {
7372
// To decrease 5%
74-
newEstimatedBatchSize = Math.max((int) (estimatedBatchSize * 0.95), pageSize);
75-
} else {
76-
// To decrease 10%
77-
newEstimatedBatchSize = Math.max((int) (estimatedBatchSize * 0.9), pageSize);
73+
newEstimatedBatchSize = Math.max((int) (estimatedBatchSize * 0.95), 2 * pageSize);
7874
}
7975

8076
estimatedBatchSizeMap.put(physicalTablePath, newEstimatedBatchSize);

fluss-client/src/test/java/com/alibaba/fluss/client/write/DynamicWriteBatchSizeEstimatorTest.java

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,32 +39,36 @@ void testEstimator() {
3939
assertThat(estimator.getEstimatedBatchSize(DATA1_PHYSICAL_TABLE_PATH)).isEqualTo(1000);
4040

4141
estimator = new DynamicWriteBatchSizeEstimator(true, 1000, 100);
42-
// test decrease 10%
42+
// test decrease 5%
4343
estimator.updateEstimation(DATA1_PHYSICAL_TABLE_PATH, 450);
44-
assertThat(estimator.getEstimatedBatchSize(DATA1_PHYSICAL_TABLE_PATH)).isEqualTo(900);
44+
int expectedSize = 950;
45+
assertThat(estimator.getEstimatedBatchSize(DATA1_PHYSICAL_TABLE_PATH))
46+
.isEqualTo(expectedSize);
4547

46-
// test decrease 5%
47-
estimator.updateEstimation(DATA1_PHYSICAL_TABLE_PATH, (int) (900 * 0.9) - 10);
48-
assertThat(estimator.getEstimatedBatchSize(DATA1_PHYSICAL_TABLE_PATH)).isEqualTo(855);
48+
// test increase 5%
49+
estimator.updateEstimation(DATA1_PHYSICAL_TABLE_PATH, 350);
50+
expectedSize = (int) (950 * 0.95);
51+
assertThat(estimator.getEstimatedBatchSize(DATA1_PHYSICAL_TABLE_PATH))
52+
.isEqualTo(expectedSize);
4953

50-
// test increase 1%
51-
estimator.updateEstimation(DATA1_PHYSICAL_TABLE_PATH, 852);
54+
// test increase 10%
55+
estimator.updateEstimation(DATA1_PHYSICAL_TABLE_PATH, 930);
5256
assertThat(estimator.getEstimatedBatchSize(DATA1_PHYSICAL_TABLE_PATH))
53-
.isEqualTo((int) (855 * 1.1));
57+
.isEqualTo((int) (expectedSize * 1.1));
5458
}
5559

5660
@Test
5761
void testMinDecreaseToPageSize() {
5862
int estimatedSize = estimator.getEstimatedBatchSize(DATA1_PHYSICAL_TABLE_PATH);
5963
estimator.updateEstimation(DATA1_PHYSICAL_TABLE_PATH, 1000);
60-
while (estimatedSize > 100) {
64+
while (estimatedSize > 2 * 100) {
6165
estimator.updateEstimation(DATA1_PHYSICAL_TABLE_PATH, (int) (estimatedSize * 0.5) - 10);
6266
estimatedSize = estimator.getEstimatedBatchSize(DATA1_PHYSICAL_TABLE_PATH);
6367
}
6468

65-
assertThat(estimator.getEstimatedBatchSize(DATA1_PHYSICAL_TABLE_PATH)).isEqualTo(100);
69+
assertThat(estimator.getEstimatedBatchSize(DATA1_PHYSICAL_TABLE_PATH)).isEqualTo(200);
6670
estimator.updateEstimation(DATA1_PHYSICAL_TABLE_PATH, 0);
67-
assertThat(estimator.getEstimatedBatchSize(DATA1_PHYSICAL_TABLE_PATH)).isEqualTo(100);
71+
assertThat(estimator.getEstimatedBatchSize(DATA1_PHYSICAL_TABLE_PATH)).isEqualTo(200);
6872
}
6973

7074
@Test

0 commit comments

Comments
 (0)