Skip to content

Commit 5e21051

Browse files
authored
[log] Enable ZSTD compression for Arrow log format by default (#496)
1 parent b235303 commit 5e21051

File tree

15 files changed

+119
-151
lines changed

15 files changed

+119
-151
lines changed

fluss-client/src/test/java/com/alibaba/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
import java.util.Map;
4949

5050
import static com.alibaba.fluss.client.utils.ClientRpcMessageUtils.toByteBuffer;
51-
import static com.alibaba.fluss.compression.ArrowCompressionInfo.NO_COMPRESSION;
51+
import static com.alibaba.fluss.compression.ArrowCompressionInfo.DEFAULT_COMPRESSION;
5252
import static com.alibaba.fluss.record.TestData.DATA2;
5353
import static com.alibaba.fluss.record.TestData.DATA2_ROW_TYPE;
5454
import static com.alibaba.fluss.record.TestData.DATA2_TABLE_ID;
@@ -240,7 +240,7 @@ private MemoryLogRecords genRecordsWithProjection(List<Object[]> objects, Projec
240240

241241
FileLogProjection fileLogProjection = new FileLogProjection();
242242
fileLogProjection.setCurrentProjection(
243-
DATA2_TABLE_ID, rowType, NO_COMPRESSION, projection.getProjectionInOrder());
243+
DATA2_TABLE_ID, rowType, DEFAULT_COMPRESSION, projection.getProjectionInOrder());
244244
ByteBuffer buffer =
245245
toByteBuffer(
246246
fileLogProjection

fluss-client/src/test/java/com/alibaba/fluss/client/write/ArrowLogWriteBatchTest.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616

1717
package com.alibaba.fluss.client.write;
1818

19-
import com.alibaba.fluss.compression.ArrowCompressionInfo;
20-
import com.alibaba.fluss.compression.ArrowCompressionType;
2119
import com.alibaba.fluss.memory.MemorySegment;
2220
import com.alibaba.fluss.memory.PreAllocatedPagedOutputView;
2321
import com.alibaba.fluss.memory.TestingMemorySegmentPool;
@@ -43,6 +41,7 @@
4341
import java.util.ArrayList;
4442
import java.util.List;
4543

44+
import static com.alibaba.fluss.compression.ArrowCompressionInfo.DEFAULT_COMPRESSION;
4645
import static com.alibaba.fluss.record.LogRecordReadContext.createArrowReadContext;
4746
import static com.alibaba.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH;
4847
import static com.alibaba.fluss.record.TestData.DATA1_ROW_TYPE;
@@ -128,7 +127,7 @@ void testAppendWithPreAllocatedMemorySegments() throws Exception {
128127
DATA1_TABLE_INFO.getSchemaId(),
129128
maxSizeInBytes,
130129
DATA1_ROW_TYPE,
131-
ArrowCompressionInfo.NO_COMPRESSION),
130+
DEFAULT_COMPRESSION),
132131
new PreAllocatedPagedOutputView(memorySegmentList),
133132
System.currentTimeMillis());
134133
assertThat(arrowLogWriteBatch.pooledMemorySegments()).isEqualTo(memorySegmentList);
@@ -176,8 +175,6 @@ void testArrowCompressionRatioEstimated() throws Exception {
176175
}
177176

178177
TableBucket tb = new TableBucket(DATA1_TABLE_ID, bucketId);
179-
ArrowCompressionInfo compressionInfo =
180-
new ArrowCompressionInfo(ArrowCompressionType.ZSTD, 3);
181178

182179
// The compression rate increases slowly, with an increment of only 0.005
183180
// (COMPRESSION_RATIO_IMPROVING_STEP#COMPRESSION_RATIO_IMPROVING_STEP) each time. Therefore,
@@ -194,7 +191,7 @@ void testArrowCompressionRatioEstimated() throws Exception {
194191
DATA1_TABLE_INFO.getSchemaId(),
195192
maxSizeInBytes,
196193
DATA1_ROW_TYPE,
197-
compressionInfo);
194+
DEFAULT_COMPRESSION);
198195

199196
ArrowLogWriteBatch arrowLogWriteBatch =
200197
new ArrowLogWriteBatch(
@@ -248,7 +245,7 @@ private ArrowLogWriteBatch createArrowLogWriteBatch(TableBucket tb, int maxSizeI
248245
DATA1_TABLE_INFO.getSchemaId(),
249246
maxSizeInBytes,
250247
DATA1_ROW_TYPE,
251-
ArrowCompressionInfo.NO_COMPRESSION),
248+
DEFAULT_COMPRESSION),
252249
new UnmanagedPagedOutputView(128),
253250
System.currentTimeMillis());
254251
}

fluss-common/src/main/java/com/alibaba/fluss/compression/ArrowCompressionInfo.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
/** Compression information for Arrow record batches. */
2626
public class ArrowCompressionInfo {
2727

28+
public static final ArrowCompressionInfo DEFAULT_COMPRESSION =
29+
new ArrowCompressionInfo(ArrowCompressionType.ZSTD, 3);
2830
public static final ArrowCompressionInfo NO_COMPRESSION =
2931
new ArrowCompressionInfo(ArrowCompressionType.NONE, -1);
3032

fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -929,8 +929,7 @@ public class ConfigOptions {
929929
public static final ConfigOption<ArrowCompressionType> TABLE_LOG_ARROW_COMPRESSION_TYPE =
930930
key("table.log.arrow.compression.type")
931931
.enumType(ArrowCompressionType.class)
932-
// TODO: change to ZSTD by default when it is stable
933-
.defaultValue(ArrowCompressionType.NONE)
932+
.defaultValue(ArrowCompressionType.ZSTD)
934933
.withDescription(
935934
"The compression type of the log records if the log format is set to 'ARROW'. "
936935
+ "The candidate compression type is "

fluss-common/src/test/java/com/alibaba/fluss/record/FileLogProjectionTest.java

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,14 @@
2828
import org.junit.jupiter.params.provider.Arguments;
2929
import org.junit.jupiter.params.provider.MethodSource;
3030

31+
import java.io.EOFException;
3132
import java.io.File;
3233
import java.nio.ByteOrder;
3334
import java.util.ArrayList;
3435
import java.util.List;
3536
import java.util.stream.Stream;
3637

37-
import static com.alibaba.fluss.compression.ArrowCompressionInfo.NO_COMPRESSION;
38+
import static com.alibaba.fluss.compression.ArrowCompressionInfo.DEFAULT_COMPRESSION;
3839
import static com.alibaba.fluss.record.LogRecordReadContext.createArrowReadContext;
3940
import static com.alibaba.fluss.record.TestData.DEFAULT_SCHEMA_ID;
4041
import static com.alibaba.fluss.testutils.DataTestUtils.createRecordsWithoutBaseLogOffset;
@@ -52,7 +53,7 @@ class FileLogProjectionTest {
5253
void testSetCurrentProjection() {
5354
FileLogProjection projection = new FileLogProjection();
5455
projection.setCurrentProjection(
55-
1L, TestData.DATA2_ROW_TYPE, NO_COMPRESSION, new int[] {0, 2});
56+
1L, TestData.DATA2_ROW_TYPE, DEFAULT_COMPRESSION, new int[] {0, 2});
5657
FileLogProjection.ProjectionInfo info1 = projection.currentProjection;
5758
assertThat(info1).isNotNull();
5859
assertThat(info1.nodesProjection.stream().toArray()).isEqualTo(new int[] {0, 2});
@@ -61,7 +62,8 @@ void testSetCurrentProjection() {
6162
assertThat(projection.projectionsCache).hasSize(1);
6263
assertThat(projection.projectionsCache.get(1L)).isSameAs(info1);
6364

64-
projection.setCurrentProjection(2L, TestData.DATA2_ROW_TYPE, NO_COMPRESSION, new int[] {1});
65+
projection.setCurrentProjection(
66+
2L, TestData.DATA2_ROW_TYPE, DEFAULT_COMPRESSION, new int[] {1});
6567
FileLogProjection.ProjectionInfo info2 = projection.currentProjection;
6668
assertThat(info2).isNotNull();
6769
assertThat(info2.nodesProjection.stream().toArray()).isEqualTo(new int[] {1});
@@ -71,13 +73,16 @@ void testSetCurrentProjection() {
7173
assertThat(projection.projectionsCache.get(2L)).isSameAs(info2);
7274

7375
projection.setCurrentProjection(
74-
1L, TestData.DATA2_ROW_TYPE, NO_COMPRESSION, new int[] {0, 2});
76+
1L, TestData.DATA2_ROW_TYPE, DEFAULT_COMPRESSION, new int[] {0, 2});
7577
assertThat(projection.currentProjection).isNotNull().isSameAs(info1);
7678

7779
assertThatThrownBy(
7880
() ->
7981
projection.setCurrentProjection(
80-
1L, TestData.DATA1_ROW_TYPE, NO_COMPRESSION, new int[] {1}))
82+
1L,
83+
TestData.DATA1_ROW_TYPE,
84+
DEFAULT_COMPRESSION,
85+
new int[] {1}))
8186
.isInstanceOf(InvalidColumnProjectionException.class)
8287
.hasMessage("The schema and projection should be identical for the same table id.");
8388
}
@@ -89,7 +94,10 @@ void testIllegalSetCurrentProjection() {
8994
assertThatThrownBy(
9095
() ->
9196
projection.setCurrentProjection(
92-
1L, TestData.DATA2_ROW_TYPE, NO_COMPRESSION, new int[] {3}))
97+
1L,
98+
TestData.DATA2_ROW_TYPE,
99+
DEFAULT_COMPRESSION,
100+
new int[] {3}))
93101
.isInstanceOf(InvalidColumnProjectionException.class)
94102
.hasMessage("Projected fields [3] is out of bound for schema with 3 fields.");
95103

@@ -98,7 +106,7 @@ void testIllegalSetCurrentProjection() {
98106
projection.setCurrentProjection(
99107
1L,
100108
TestData.DATA2_ROW_TYPE,
101-
NO_COMPRESSION,
109+
DEFAULT_COMPRESSION,
102110
new int[] {1, 0}))
103111
.isInstanceOf(InvalidColumnProjectionException.class)
104112
.hasMessage("The projection indexes should be in field order, but is [1, 0]");
@@ -108,7 +116,7 @@ void testIllegalSetCurrentProjection() {
108116
projection.setCurrentProjection(
109117
1L,
110118
TestData.DATA2_ROW_TYPE,
111-
NO_COMPRESSION,
119+
DEFAULT_COMPRESSION,
112120
new int[] {0, 0, 0}))
113121
.isInstanceOf(InvalidColumnProjectionException.class)
114122
.hasMessage(
@@ -158,15 +166,17 @@ void testIllegalByteOrder() throws Exception {
158166
FileLogProjection projection = new FileLogProjection();
159167
// overwrite the wrong decoding byte order endian
160168
projection.getLogHeaderBuffer().order(ByteOrder.BIG_ENDIAN);
161-
// should return empty results as no record batch can be decoded.
162-
List<Object[]> results =
163-
doProjection(
164-
projection,
165-
fileLogRecords,
166-
TestData.DATA1_ROW_TYPE,
167-
new int[] {0},
168-
Integer.MAX_VALUE);
169-
assertThat(results).isEmpty();
169+
// should throw exception.
170+
assertThatThrownBy(
171+
() ->
172+
doProjection(
173+
projection,
174+
fileLogRecords,
175+
TestData.DATA1_ROW_TYPE,
176+
new int[] {0, 1},
177+
Integer.MAX_VALUE))
178+
.isInstanceOf(EOFException.class)
179+
.hasMessageContaining("Failed to read `arrow header` from file channel");
170180
}
171181

172182
@Test
@@ -234,7 +244,7 @@ private List<Object[]> doProjection(
234244
int[] projectedFields,
235245
int fetchMaxBytes)
236246
throws Exception {
237-
projection.setCurrentProjection(1L, rowType, NO_COMPRESSION, projectedFields);
247+
projection.setCurrentProjection(1L, rowType, DEFAULT_COMPRESSION, projectedFields);
238248
RowType projectedType = rowType.project(projectedFields);
239249
LogRecords project =
240250
projection.project(

fluss-common/src/test/java/com/alibaba/fluss/record/MemoryLogRecordsArrowBuilderTest.java

Lines changed: 11 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@
4646
import java.util.List;
4747
import java.util.stream.Collectors;
4848

49+
import static com.alibaba.fluss.compression.ArrowCompressionInfo.DEFAULT_COMPRESSION;
50+
import static com.alibaba.fluss.compression.ArrowCompressionInfo.NO_COMPRESSION;
4951
import static com.alibaba.fluss.record.TestData.DATA1;
5052
import static com.alibaba.fluss.record.TestData.DATA1_ROW_TYPE;
5153
import static com.alibaba.fluss.record.TestData.DEFAULT_SCHEMA_ID;
@@ -79,11 +81,7 @@ void testAppendWithEmptyRecord() throws Exception {
7981
int maxSizeInBytes = 1024;
8082
ArrowWriter writer =
8183
provider.getOrCreateWriter(
82-
1L,
83-
DEFAULT_SCHEMA_ID,
84-
maxSizeInBytes,
85-
DATA1_ROW_TYPE,
86-
ArrowCompressionInfo.NO_COMPRESSION);
84+
1L, DEFAULT_SCHEMA_ID, maxSizeInBytes, DATA1_ROW_TYPE, DEFAULT_COMPRESSION);
8785
MemoryLogRecordsArrowBuilder builder =
8886
createMemoryLogRecordsArrowBuilder(0, writer, 10, 100);
8987
assertThat(builder.isFull()).isFalse();
@@ -105,11 +103,7 @@ void testAppend() throws Exception {
105103
int maxSizeInBytes = 1024;
106104
ArrowWriter writer =
107105
provider.getOrCreateWriter(
108-
1L,
109-
DEFAULT_SCHEMA_ID,
110-
maxSizeInBytes,
111-
DATA1_ROW_TYPE,
112-
ArrowCompressionInfo.NO_COMPRESSION);
106+
1L, DEFAULT_SCHEMA_ID, maxSizeInBytes, DATA1_ROW_TYPE, DEFAULT_COMPRESSION);
113107
MemoryLogRecordsArrowBuilder builder =
114108
createMemoryLogRecordsArrowBuilder(0, writer, 10, 1024);
115109
List<RowKind> rowKinds =
@@ -160,11 +154,7 @@ void testCompression(ArrowCompressionInfo compressionInfo) throws Exception {
160154
// first create an un-compression batch.
161155
ArrowWriter writer1 =
162156
provider.getOrCreateWriter(
163-
1L,
164-
DEFAULT_SCHEMA_ID,
165-
maxSizeInBytes,
166-
DATA1_ROW_TYPE,
167-
ArrowCompressionInfo.NO_COMPRESSION);
157+
1L, DEFAULT_SCHEMA_ID, maxSizeInBytes, DATA1_ROW_TYPE, NO_COMPRESSION);
168158
MemoryLogRecordsArrowBuilder builder =
169159
createMemoryLogRecordsArrowBuilder(0, writer1, 10, 1024);
170160
for (Object[] data : dataSet) {
@@ -204,7 +194,7 @@ void testIllegalArgument() {
204194
DEFAULT_SCHEMA_ID,
205195
maxSizeInBytes,
206196
DATA1_ROW_TYPE,
207-
ArrowCompressionInfo.NO_COMPRESSION)) {
197+
DEFAULT_COMPRESSION)) {
208198
createMemoryLogRecordsArrowBuilder(0, writer, 10, 30);
209199
}
210200
})
@@ -218,11 +208,7 @@ void testClose() throws Exception {
218208
int maxSizeInBytes = 1024;
219209
ArrowWriter writer =
220210
provider.getOrCreateWriter(
221-
1L,
222-
DEFAULT_SCHEMA_ID,
223-
1024,
224-
DATA1_ROW_TYPE,
225-
ArrowCompressionInfo.NO_COMPRESSION);
211+
1L, DEFAULT_SCHEMA_ID, 1024, DATA1_ROW_TYPE, NO_COMPRESSION);
226212
MemoryLogRecordsArrowBuilder builder =
227213
createMemoryLogRecordsArrowBuilder(0, writer, 10, 1024);
228214
List<RowKind> rowKinds =
@@ -247,11 +233,7 @@ void testClose() throws Exception {
247233
// get writer again, writer will be initial.
248234
ArrowWriter writer1 =
249235
provider.getOrCreateWriter(
250-
1L,
251-
DEFAULT_SCHEMA_ID,
252-
maxSizeInBytes,
253-
DATA1_ROW_TYPE,
254-
ArrowCompressionInfo.NO_COMPRESSION);
236+
1L, DEFAULT_SCHEMA_ID, maxSizeInBytes, DATA1_ROW_TYPE, NO_COMPRESSION);
255237
assertThat(provider.freeWriters().get(tableSchemaId).size()).isEqualTo(0);
256238

257239
// Even if the writer has re-initialized, the sizeInBytes should be the same.
@@ -266,11 +248,7 @@ void testNoRecordAppend() throws Exception {
266248
// 1. no record append with base offset as 0.
267249
ArrowWriter writer =
268250
provider.getOrCreateWriter(
269-
1L,
270-
DEFAULT_SCHEMA_ID,
271-
1024 * 10,
272-
DATA1_ROW_TYPE,
273-
ArrowCompressionInfo.NO_COMPRESSION);
251+
1L, DEFAULT_SCHEMA_ID, 1024 * 10, DATA1_ROW_TYPE, DEFAULT_COMPRESSION);
274252
MemoryLogRecordsArrowBuilder builder =
275253
createMemoryLogRecordsArrowBuilder(0, writer, 10, 1024 * 10);
276254
MemoryLogRecords memoryLogRecords = MemoryLogRecords.pointToBytesView(builder.build());
@@ -296,11 +274,7 @@ void testNoRecordAppend() throws Exception {
296274
// 2. no record append with base offset as 0.
297275
ArrowWriter writer2 =
298276
provider.getOrCreateWriter(
299-
1L,
300-
DEFAULT_SCHEMA_ID,
301-
1024 * 10,
302-
DATA1_ROW_TYPE,
303-
ArrowCompressionInfo.NO_COMPRESSION);
277+
1L, DEFAULT_SCHEMA_ID, 1024 * 10, DATA1_ROW_TYPE, DEFAULT_COMPRESSION);
304278
builder = createMemoryLogRecordsArrowBuilder(100, writer2, 10, 1024 * 10);
305279
memoryLogRecords = MemoryLogRecords.pointToBytesView(builder.build());
306280
// only contains batch header.
@@ -328,11 +302,7 @@ void testResetWriterState() throws Exception {
328302
int maxSizeInBytes = 1024;
329303
ArrowWriter writer =
330304
provider.getOrCreateWriter(
331-
1L,
332-
DEFAULT_SCHEMA_ID,
333-
maxSizeInBytes,
334-
DATA1_ROW_TYPE,
335-
ArrowCompressionInfo.NO_COMPRESSION);
305+
1L, DEFAULT_SCHEMA_ID, maxSizeInBytes, DATA1_ROW_TYPE, DEFAULT_COMPRESSION);
336306
MemoryLogRecordsArrowBuilder builder =
337307
createMemoryLogRecordsArrowBuilder(0, writer, 10, 1024);
338308
List<RowKind> rowKinds =

fluss-common/src/test/java/com/alibaba/fluss/row/arrow/ArrowReaderWriterTest.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package com.alibaba.fluss.row.arrow;
1818

19-
import com.alibaba.fluss.compression.ArrowCompressionInfo;
2019
import com.alibaba.fluss.memory.AbstractPagedOutputView;
2120
import com.alibaba.fluss.memory.ManagedPagedOutputView;
2221
import com.alibaba.fluss.memory.MemorySegment;
@@ -42,6 +41,8 @@
4241
import java.util.Arrays;
4342
import java.util.List;
4443

44+
import static com.alibaba.fluss.compression.ArrowCompressionInfo.DEFAULT_COMPRESSION;
45+
import static com.alibaba.fluss.compression.ArrowCompressionInfo.NO_COMPRESSION;
4546
import static com.alibaba.fluss.record.DefaultLogRecordBatch.ARROW_ROWKIND_OFFSET;
4647
import static com.alibaba.fluss.record.TestData.DATA1;
4748
import static com.alibaba.fluss.record.TestData.DATA1_ROW_TYPE;
@@ -136,11 +137,7 @@ void testReaderWriter() throws IOException {
136137
ArrowWriterPool provider = new ArrowWriterPool(allocator);
137138
ArrowWriter writer =
138139
provider.getOrCreateWriter(
139-
1L,
140-
1,
141-
Integer.MAX_VALUE,
142-
rowType,
143-
ArrowCompressionInfo.NO_COMPRESSION)) {
140+
1L, 1, Integer.MAX_VALUE, rowType, NO_COMPRESSION)) {
144141
for (InternalRow row : TEST_DATA) {
145142
writer.writeRow(row);
146143
}
@@ -173,7 +170,7 @@ void testWriterExceedMaxSizeInBytes() {
173170
ArrowWriterPool provider = new ArrowWriterPool(allocator);
174171
ArrowWriter writer =
175172
provider.getOrCreateWriter(
176-
1L, 1, 1024, DATA1_ROW_TYPE, ArrowCompressionInfo.NO_COMPRESSION)) {
173+
1L, 1, 1024, DATA1_ROW_TYPE, DEFAULT_COMPRESSION)) {
177174
while (!writer.isFull()) {
178175
writer.writeRow(row(DATA1.get(0)));
179176
}

0 commit comments

Comments
 (0)