Skip to content

Commit c55afa0

Browse files
authored
[core] text formats support compress (apache#6103)
1 parent f970245 commit c55afa0

File tree

21 files changed

+821
-77
lines changed

21 files changed

+821
-77
lines changed
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.format;
20+
21+
import org.apache.paimon.options.description.DescribedEnum;
22+
import org.apache.paimon.options.description.InlineElement;
23+
24+
import java.util.Collections;
25+
import java.util.HashSet;
26+
import java.util.Set;
27+
28+
import static org.apache.paimon.options.description.TextElement.text;
29+
30+
/** Compression types supported by Paimon file formats. */
31+
public enum CompressionType implements DescribedEnum {
32+
NONE("none", "No compression.", null, ""),
33+
GZIP(
34+
"gzip",
35+
"GZIP compression using the deflate algorithm.",
36+
"org.apache.hadoop.io.compress.GzipCodec",
37+
"gz"),
38+
BZIP2(
39+
"bzip2",
40+
"BZIP2 compression using the Burrows-Wheeler algorithm.",
41+
"org.apache.hadoop.io.compress.BZip2Codec",
42+
"bz2"),
43+
DEFLATE(
44+
"deflate",
45+
"DEFLATE compression using the deflate algorithm.",
46+
"org.apache.hadoop.io.compress.DeflateCodec",
47+
"deflate"),
48+
SNAPPY(
49+
"snappy",
50+
"Snappy compression for fast compression and decompression.",
51+
"org.apache.hadoop.io.compress.SnappyCodec",
52+
"snappy"),
53+
LZ4(
54+
"lz4",
55+
"LZ4 compression for very fast compression and decompression.",
56+
"org.apache.hadoop.io.compress.Lz4Codec",
57+
"lz4"),
58+
ZSTD(
59+
"zstd",
60+
"Zstandard compression for high compression ratio and speed.",
61+
"org.apache.hadoop.io.compress.ZStandardCodec",
62+
"zst");
63+
64+
private final String value;
65+
private final String className;
66+
private final String fileExtension;
67+
private final String description;
68+
69+
private static final Set<String> SUPPORTED_EXTENSIONS;
70+
71+
static {
72+
Set<String> extensions = new HashSet<>();
73+
for (CompressionType type : CompressionType.values()) {
74+
if (type != CompressionType.NONE
75+
&& type.fileExtension() != null
76+
&& !type.fileExtension().isEmpty()) {
77+
extensions.add(type.fileExtension().toLowerCase());
78+
}
79+
}
80+
SUPPORTED_EXTENSIONS = Collections.unmodifiableSet(extensions);
81+
}
82+
83+
CompressionType(String value, String description, String className, String fileExtension) {
84+
this.value = value;
85+
this.description = description;
86+
this.className = className;
87+
this.fileExtension = fileExtension;
88+
}
89+
90+
@Override
91+
public String toString() {
92+
return value;
93+
}
94+
95+
@Override
96+
public InlineElement getDescription() {
97+
return text(description);
98+
}
99+
100+
public String value() {
101+
return value;
102+
}
103+
104+
public String hadoopCodecClassName() {
105+
return className;
106+
}
107+
108+
public String fileExtension() {
109+
return fileExtension;
110+
}
111+
112+
public static CompressionType fromValue(String value) {
113+
if (value == null || value.isEmpty()) {
114+
return NONE;
115+
}
116+
117+
for (CompressionType type : CompressionType.values()) {
118+
if (type.value.equalsIgnoreCase(value)) {
119+
return type;
120+
}
121+
}
122+
return NONE;
123+
}
124+
125+
/**
126+
* Check if the given extension is a supported compression extension.
127+
*
128+
* @param extension the file extension to check
129+
* @return true if the extension is a supported compression extension, false otherwise
130+
*/
131+
public static boolean isSupportedExtension(String extension) {
132+
if (extension == null || extension.isEmpty()) {
133+
return false;
134+
}
135+
return SUPPORTED_EXTENSIONS.contains(extension.toLowerCase());
136+
}
137+
}

paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,10 @@ public boolean supportNestedReadPruning() {
140140
return true;
141141
}
142142

143+
public String compression() {
144+
return "zstd";
145+
}
146+
143147
@Test
144148
public void testNestedReadPruning() throws Exception {
145149
if (!supportNestedReadPruning()) {
@@ -250,10 +254,10 @@ protected void write(FormatWriterFactory factory, Path file, InternalRow... rows
250254
FormatWriter writer;
251255
PositionOutputStream out = null;
252256
if (factory instanceof SupportsDirectWrite) {
253-
writer = ((SupportsDirectWrite) factory).create(fileIO, file, "zstd");
257+
writer = ((SupportsDirectWrite) factory).create(fileIO, file, this.compression());
254258
} else {
255259
out = fileIO.newOutputStream(file, false);
256-
writer = factory.create(out, "zstd");
260+
writer = factory.create(out, this.compression());
257261
}
258262
for (InternalRow row : rows) {
259263
writer.addElement(row);
@@ -381,7 +385,7 @@ public void testWriteAndReadFileWithoutExtension() throws IOException {
381385
Path filePath = new Path(parent, UUID.randomUUID().toString());
382386
FormatWriterFactory writerFactory = jsonFormat.createWriterFactory(rowType);
383387
try (FormatWriter writer =
384-
writerFactory.create(fileIO.newOutputStream(filePath, false), "none")) {
388+
writerFactory.create(fileIO.newOutputStream(filePath, false), compression())) {
385389
for (InternalRow row : testData) {
386390
writer.addElement(row);
387391
}

paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.paimon.io;
2020

2121
import org.apache.paimon.annotation.VisibleForTesting;
22+
import org.apache.paimon.format.CompressionType;
2223
import org.apache.paimon.fs.ExternalPathProvider;
2324
import org.apache.paimon.fs.Path;
2425
import org.apache.paimon.manifest.FileEntry;
@@ -88,8 +89,14 @@ public Path newPath(String prefix) {
8889

8990
private String newFileName(String prefix) {
9091
String extension;
91-
if (fileSuffixIncludeCompression) {
92-
extension = "." + fileCompression + "." + formatIdentifier;
92+
if (isTextFormat(formatIdentifier)) {
93+
String compressionExtension =
94+
CompressionType.fromValue(fileCompression).fileExtension();
95+
extension = "." + formatIdentifier + "." + compressionExtension;
96+
} else if (fileSuffixIncludeCompression) {
97+
String compressionExtension =
98+
CompressionType.fromValue(fileCompression).fileExtension();
99+
extension = "." + compressionExtension + "." + formatIdentifier;
93100
} else {
94101
extension = "." + formatIdentifier;
95102
}
@@ -162,7 +169,19 @@ public static String formatIdentifier(String fileName) {
162169
throw new IllegalArgumentException(fileName + " is not a legal file name.");
163170
}
164171

165-
return fileName.substring(index + 1);
172+
String extension = fileName.substring(index + 1);
173+
if (CompressionType.isSupportedExtension(extension)) {
174+
int secondLastDot = fileName.lastIndexOf('.', index - 1);
175+
if (secondLastDot != -1) {
176+
String formatIdentifier = fileName.substring(secondLastDot + 1, index);
177+
// If the format is json or csv, return that instead of the compression extension
178+
if (isTextFormat(formatIdentifier)) {
179+
return formatIdentifier;
180+
}
181+
}
182+
}
183+
184+
return extension;
166185
}
167186

168187
public boolean isExternalPath() {
@@ -173,4 +192,9 @@ public boolean isExternalPath() {
173192
String uuid() {
174193
return uuid;
175194
}
195+
196+
private static boolean isTextFormat(String formatIdentifier) {
197+
return "json".equalsIgnoreCase(formatIdentifier)
198+
|| "csv".equalsIgnoreCase(formatIdentifier);
199+
}
176200
}

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,13 +68,33 @@ public void testCsvFileFormat() {
6868
sql("CREATE TABLE CSV (a INT, b INT, c INT) WITH ('file.format'='csv')");
6969
sql("INSERT INTO CSV VALUES (1, 2, 3)");
7070
assertThat(sql("SELECT * FROM CSV")).containsExactly(Row.of(1, 2, 3));
71+
72+
sql(
73+
"CREATE TABLE CSV_GZIP (a INT, b INT, c INT) WITH ('file.format'='csv', 'file.compression'='gzip')");
74+
sql("INSERT INTO CSV_GZIP VALUES (1, 2, 3)");
75+
assertThat(sql("SELECT * FROM CSV_GZIP")).containsExactly(Row.of(1, 2, 3));
76+
List<String> files =
77+
sql("select file_path from `CSV_GZIP$files`").stream()
78+
.map(r -> r.getField(0).toString())
79+
.collect(Collectors.toList());
80+
assertThat(files).allMatch(file -> file.endsWith("csv.gz"));
7181
}
7282

7383
@Test
7484
public void testJsonFileFormat() {
7585
sql("CREATE TABLE JSON_T (a INT, b INT, c INT) WITH ('file.format'='json')");
7686
sql("INSERT INTO JSON_T VALUES (1, 2, 3)");
7787
assertThat(sql("SELECT * FROM JSON_T")).containsExactly(Row.of(1, 2, 3));
88+
89+
sql(
90+
"CREATE TABLE JSON_GZIP (a INT, b INT, c INT) WITH ('file.format'='json', 'file.compression'='gzip')");
91+
sql("INSERT INTO JSON_GZIP VALUES (1, 2, 3)");
92+
assertThat(sql("SELECT * FROM JSON_GZIP")).containsExactly(Row.of(1, 2, 3));
93+
List<String> files =
94+
sql("select file_path from `JSON_GZIP$files`").stream()
95+
.map(r -> r.getField(0).toString())
96+
.collect(Collectors.toList());
97+
assertThat(files).allMatch(file -> file.endsWith("json.gz"));
7898
}
7999

80100
@Test

paimon-format/src/main/java/org/apache/paimon/format/BaseTextFileReader.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.paimon.data.InternalRow;
2222
import org.apache.paimon.fs.FileIO;
2323
import org.apache.paimon.fs.Path;
24+
import org.apache.paimon.options.Options;
2425
import org.apache.paimon.reader.FileRecordIterator;
2526
import org.apache.paimon.reader.FileRecordReader;
2627
import org.apache.paimon.types.RowType;
@@ -29,6 +30,7 @@
2930

3031
import java.io.BufferedReader;
3132
import java.io.IOException;
33+
import java.io.InputStream;
3234
import java.io.InputStreamReader;
3335
import java.nio.charset.StandardCharsets;
3436

@@ -37,17 +39,21 @@ public abstract class BaseTextFileReader implements FileRecordReader<InternalRow
3739

3840
protected final Path filePath;
3941
protected final RowType rowType;
42+
protected final InputStream decompressedStream;
4043
protected final BufferedReader bufferedReader;
4144
protected boolean readerClosed = false;
4245
protected BaseTextRecordIterator reader;
4346

44-
protected BaseTextFileReader(FileIO fileIO, Path filePath, RowType rowType) throws IOException {
47+
protected BaseTextFileReader(FileIO fileIO, Path filePath, RowType rowType, Options options)
48+
throws IOException {
4549
this.filePath = filePath;
4650
this.rowType = rowType;
51+
this.decompressedStream =
52+
TextCompression.createDecompressedInputStream(
53+
fileIO.newInputStream(filePath), filePath, options);
4754
this.bufferedReader =
4855
new BufferedReader(
49-
new InputStreamReader(
50-
fileIO.newInputStream(filePath), StandardCharsets.UTF_8));
56+
new InputStreamReader(this.decompressedStream, StandardCharsets.UTF_8));
5157
this.reader = createRecordIterator();
5258
}
5359

@@ -89,8 +95,15 @@ public FileRecordIterator<InternalRow> readBatch() throws IOException {
8995

9096
@Override
9197
public void close() throws IOException {
92-
if (!readerClosed && bufferedReader != null) {
93-
bufferedReader.close();
98+
if (!readerClosed) {
99+
// Close the buffered reader first
100+
if (bufferedReader != null) {
101+
bufferedReader.close();
102+
}
103+
// Explicitly close the decompressed stream to prevent resource leaks
104+
if (decompressedStream != null) {
105+
decompressedStream.close();
106+
}
94107
readerClosed = true;
95108
}
96109
}

paimon-format/src/main/java/org/apache/paimon/format/BaseTextFileWriter.java

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,12 @@
2020

2121
import org.apache.paimon.data.InternalRow;
2222
import org.apache.paimon.fs.PositionOutputStream;
23+
import org.apache.paimon.options.Options;
2324
import org.apache.paimon.types.RowType;
2425

2526
import java.io.BufferedWriter;
2627
import java.io.IOException;
28+
import java.io.OutputStream;
2729
import java.io.OutputStreamWriter;
2830
import java.nio.charset.StandardCharsets;
2931

@@ -34,11 +36,21 @@ public abstract class BaseTextFileWriter implements FormatWriter {
3436
protected final BufferedWriter writer;
3537
protected final RowType rowType;
3638

37-
protected BaseTextFileWriter(PositionOutputStream outputStream, RowType rowType) {
39+
protected BaseTextFileWriter(
40+
PositionOutputStream outputStream,
41+
RowType rowType,
42+
Options formatOptions,
43+
CompressionType compressionType)
44+
throws IOException {
3845
this.outputStream = outputStream;
39-
this.rowType = rowType;
46+
OutputStream compressedStream =
47+
TextCompression.createCompressedOutputStream(
48+
outputStream, compressionType, formatOptions);
4049
this.writer =
41-
new BufferedWriter(new OutputStreamWriter(outputStream, StandardCharsets.UTF_8));
50+
new BufferedWriter(
51+
new OutputStreamWriter(compressedStream, StandardCharsets.UTF_8),
52+
getOptimalBufferSize(compressionType));
53+
this.rowType = rowType;
4254
}
4355

4456
/**
@@ -61,4 +73,21 @@ public boolean reachTargetSize(boolean suggestedCheck, long targetSize) throws I
6173
}
6274
return false;
6375
}
76+
77+
private int getOptimalBufferSize(CompressionType compressionType) {
78+
switch (compressionType) {
79+
case GZIP:
80+
case DEFLATE:
81+
return 65536; // 64KB for deflate-based compression
82+
case SNAPPY:
83+
case LZ4:
84+
return 131072; // 128KB for fast compression
85+
case ZSTD:
86+
return 262144; // 256KB for high compression ratio
87+
case BZIP2:
88+
return 65536; // 64KB for bzip2
89+
default:
90+
return 65536; // Default 64KB buffer size
91+
}
92+
}
6493
}

0 commit comments

Comments
 (0)