Skip to content

Commit 3f19a6a

Browse files
committed
[format] Fix none compression for csv and json formats
1 parent c55afa0 commit 3f19a6a

File tree

19 files changed

+184
-251
lines changed

19 files changed

+184
-251
lines changed

paimon-common/src/main/java/org/apache/paimon/format/CompressionType.java renamed to paimon-common/src/main/java/org/apache/paimon/format/TextCompressionType.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@
2727

2828
import static org.apache.paimon.options.description.TextElement.text;
2929

30-
/** Compression types supported by Paimon file formats. */
31-
public enum CompressionType implements DescribedEnum {
30+
/** Compression types supported by text (csv, json) file formats. */
31+
public enum TextCompressionType implements DescribedEnum {
3232
NONE("none", "No compression.", null, ""),
3333
GZIP(
3434
"gzip",
@@ -70,8 +70,8 @@ public enum CompressionType implements DescribedEnum {
7070

7171
static {
7272
Set<String> extensions = new HashSet<>();
73-
for (CompressionType type : CompressionType.values()) {
74-
if (type != CompressionType.NONE
73+
for (TextCompressionType type : TextCompressionType.values()) {
74+
if (type != TextCompressionType.NONE
7575
&& type.fileExtension() != null
7676
&& !type.fileExtension().isEmpty()) {
7777
extensions.add(type.fileExtension().toLowerCase());
@@ -80,7 +80,7 @@ public enum CompressionType implements DescribedEnum {
8080
SUPPORTED_EXTENSIONS = Collections.unmodifiableSet(extensions);
8181
}
8282

83-
CompressionType(String value, String description, String className, String fileExtension) {
83+
TextCompressionType(String value, String description, String className, String fileExtension) {
8484
this.value = value;
8585
this.description = description;
8686
this.className = className;
@@ -109,12 +109,12 @@ public String fileExtension() {
109109
return fileExtension;
110110
}
111111

112-
public static CompressionType fromValue(String value) {
112+
public static TextCompressionType fromValue(String value) {
113113
if (value == null || value.isEmpty()) {
114114
return NONE;
115115
}
116116

117-
for (CompressionType type : CompressionType.values()) {
117+
for (TextCompressionType type : TextCompressionType.values()) {
118118
if (type.value.equalsIgnoreCase(value)) {
119119
return type;
120120
}
@@ -123,12 +123,12 @@ public static CompressionType fromValue(String value) {
123123
}
124124

125125
/**
126-
* Check if the given extension is a supported compression extension.
126+
* Check if the given extension is a supported text compression extension.
127127
*
128128
* @param extension the file extension to check
129-
* @return true if the extension is a supported compression extension, false otherwise
129+
* @return true if the extension is a supported text compression extension, false otherwise
130130
*/
131-
public static boolean isSupportedExtension(String extension) {
131+
public static boolean isTextCompression(String extension) {
132132
if (extension == null || extension.isEmpty()) {
133133
return false;
134134
}

paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,11 @@
1919
package org.apache.paimon.io;
2020

2121
import org.apache.paimon.annotation.VisibleForTesting;
22-
import org.apache.paimon.format.CompressionType;
22+
import org.apache.paimon.format.TextCompressionType;
2323
import org.apache.paimon.fs.ExternalPathProvider;
2424
import org.apache.paimon.fs.Path;
2525
import org.apache.paimon.manifest.FileEntry;
26+
import org.apache.paimon.utils.StringUtils;
2627

2728
import javax.annotation.Nullable;
2829
import javax.annotation.concurrent.ThreadSafe;
@@ -31,6 +32,9 @@
3132
import java.util.UUID;
3233
import java.util.concurrent.atomic.AtomicInteger;
3334

35+
import static org.apache.paimon.format.TextCompressionType.isTextCompression;
36+
import static org.apache.paimon.utils.Preconditions.checkArgument;
37+
3438
/** Factory which produces new {@link Path}s for data files. */
3539
@ThreadSafe
3640
public class DataFilePathFactory {
@@ -90,15 +94,17 @@ public Path newPath(String prefix) {
9094
private String newFileName(String prefix) {
9195
String extension;
9296
if (isTextFormat(formatIdentifier)) {
93-
String compressionExtension =
94-
CompressionType.fromValue(fileCompression).fileExtension();
95-
extension = "." + formatIdentifier + "." + compressionExtension;
96-
} else if (fileSuffixIncludeCompression) {
97-
String compressionExtension =
98-
CompressionType.fromValue(fileCompression).fileExtension();
99-
extension = "." + compressionExtension + "." + formatIdentifier;
97+
String compressExtension =
98+
TextCompressionType.fromValue(fileCompression).fileExtension();
99+
extension =
100+
StringUtils.isEmpty(compressExtension)
101+
? "." + formatIdentifier
102+
: "." + formatIdentifier + "." + compressExtension;
100103
} else {
101-
extension = "." + formatIdentifier;
104+
extension =
105+
fileSuffixIncludeCompression
106+
? "." + fileCompression + "." + formatIdentifier
107+
: "." + formatIdentifier;
102108
}
103109
return newFileName(prefix, extension);
104110
}
@@ -165,20 +171,13 @@ public static Path createNewFileIndexFilePath(Path filePath) {
165171

166172
public static String formatIdentifier(String fileName) {
167173
int index = fileName.lastIndexOf('.');
168-
if (index == -1) {
169-
throw new IllegalArgumentException(fileName + " is not a legal file name.");
170-
}
174+
checkArgument(index != -1, "%s is not a legal file name.", fileName);
171175

172176
String extension = fileName.substring(index + 1);
173-
if (CompressionType.isSupportedExtension(extension)) {
177+
if (isTextCompression(extension)) {
174178
int secondLastDot = fileName.lastIndexOf('.', index - 1);
175-
if (secondLastDot != -1) {
176-
String formatIdentifier = fileName.substring(secondLastDot + 1, index);
177-
// If the format is json or csv, return that instead of the compression extension
178-
if (isTextFormat(formatIdentifier)) {
179-
return formatIdentifier;
180-
}
181-
}
179+
checkArgument(secondLastDot != -1, "%s is not a legal file name.", fileName);
180+
return fileName.substring(secondLastDot + 1, index);
182181
}
183182

184183
return extension;

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java

Lines changed: 30 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -65,36 +65,45 @@ protected List<String> ddl() {
6565

6666
@Test
6767
public void testCsvFileFormat() {
68-
sql("CREATE TABLE CSV (a INT, b INT, c INT) WITH ('file.format'='csv')");
69-
sql("INSERT INTO CSV VALUES (1, 2, 3)");
70-
assertThat(sql("SELECT * FROM CSV")).containsExactly(Row.of(1, 2, 3));
68+
innerTestTextFileFormat("csv");
69+
}
7170

72-
sql(
73-
"CREATE TABLE CSV_GZIP (a INT, b INT, c INT) WITH ('file.format'='csv', 'file.compression'='gzip')");
74-
sql("INSERT INTO CSV_GZIP VALUES (1, 2, 3)");
75-
assertThat(sql("SELECT * FROM CSV_GZIP")).containsExactly(Row.of(1, 2, 3));
71+
@Test
72+
public void testJsonFileFormat() {
73+
innerTestTextFileFormat("json");
74+
}
75+
76+
private void innerTestTextFileFormat(String format) {
77+
sql("CREATE TABLE TEXT_T (a INT, b INT, c INT) WITH ('file.format'='%s')", format);
78+
sql("INSERT INTO TEXT_T VALUES (1, 2, 3)");
79+
assertThat(sql("SELECT * FROM TEXT_T")).containsExactly(Row.of(1, 2, 3));
7680
List<String> files =
77-
sql("select file_path from `CSV_GZIP$files`").stream()
81+
sql("select file_path from `TEXT_T$files`").stream()
7882
.map(r -> r.getField(0).toString())
7983
.collect(Collectors.toList());
80-
assertThat(files).allMatch(file -> file.endsWith("csv.gz"));
81-
}
84+
assertThat(files).allMatch(file -> file.endsWith(format + ".zst"));
8285

83-
@Test
84-
public void testJsonFileFormat() {
85-
sql("CREATE TABLE JSON_T (a INT, b INT, c INT) WITH ('file.format'='json')");
86-
sql("INSERT INTO JSON_T VALUES (1, 2, 3)");
87-
assertThat(sql("SELECT * FROM JSON_T")).containsExactly(Row.of(1, 2, 3));
86+
sql(
87+
"CREATE TABLE TEXT_NONE (a INT, b INT, c INT) WITH ('file.format'='%s', 'file.compression'='none')",
88+
format);
89+
sql("INSERT INTO TEXT_NONE VALUES (1, 2, 3)");
90+
assertThat(sql("SELECT * FROM TEXT_NONE")).containsExactly(Row.of(1, 2, 3));
91+
files =
92+
sql("select file_path from `TEXT_NONE$files`").stream()
93+
.map(r -> r.getField(0).toString())
94+
.collect(Collectors.toList());
95+
assertThat(files).allMatch(file -> file.endsWith(format));
8896

8997
sql(
90-
"CREATE TABLE JSON_GZIP (a INT, b INT, c INT) WITH ('file.format'='json', 'file.compression'='gzip')");
91-
sql("INSERT INTO JSON_GZIP VALUES (1, 2, 3)");
92-
assertThat(sql("SELECT * FROM JSON_GZIP")).containsExactly(Row.of(1, 2, 3));
93-
List<String> files =
94-
sql("select file_path from `JSON_GZIP$files`").stream()
98+
"CREATE TABLE TEXT_GZIP (a INT, b INT, c INT) WITH ('file.format'='%s', 'file.compression'='gzip')",
99+
format);
100+
sql("INSERT INTO TEXT_GZIP VALUES (1, 2, 3)");
101+
assertThat(sql("SELECT * FROM TEXT_GZIP")).containsExactly(Row.of(1, 2, 3));
102+
files =
103+
sql("select file_path from `TEXT_GZIP$files`").stream()
95104
.map(r -> r.getField(0).toString())
96105
.collect(Collectors.toList());
97-
assertThat(files).allMatch(file -> file.endsWith("json.gz"));
106+
assertThat(files).allMatch(file -> file.endsWith(format + ".gz"));
98107
}
99108

100109
@Test

paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormat.java

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,17 @@
1919
package org.apache.paimon.format.csv;
2020

2121
import org.apache.paimon.data.InternalRow;
22-
import org.apache.paimon.format.CompressionType;
2322
import org.apache.paimon.format.FileFormat;
2423
import org.apache.paimon.format.FileFormatFactory.FormatContext;
2524
import org.apache.paimon.format.FormatReaderFactory;
2625
import org.apache.paimon.format.FormatWriter;
2726
import org.apache.paimon.format.FormatWriterFactory;
28-
import org.apache.paimon.format.TextCompression;
27+
import org.apache.paimon.format.TextCompressionType;
28+
import org.apache.paimon.format.text.TextCompression;
2929
import org.apache.paimon.fs.CloseShieldOutputStream;
3030
import org.apache.paimon.fs.PositionOutputStream;
31-
import org.apache.paimon.options.Options;
3231
import org.apache.paimon.predicate.Predicate;
32+
import org.apache.paimon.reader.FileRecordReader;
3333
import org.apache.paimon.types.DataType;
3434
import org.apache.paimon.types.DataTypeRoot;
3535
import org.apache.paimon.types.RowType;
@@ -44,15 +44,11 @@ public class CsvFileFormat extends FileFormat {
4444

4545
public static final String CSV_IDENTIFIER = "csv";
4646

47-
private final Options options;
47+
private final CsvOptions options;
4848

4949
public CsvFileFormat(FormatContext context) {
50-
this(context, CSV_IDENTIFIER);
51-
}
52-
53-
public CsvFileFormat(FormatContext context, String identifier) {
54-
super(identifier);
55-
this.options = context.options();
50+
super(CSV_IDENTIFIER);
51+
this.options = new CsvOptions(context.options());
5652
}
5753

5854
@Override
@@ -102,22 +98,39 @@ private void validateDataType(DataType dataType) {
10298
}
10399
}
104100

101+
/** CSV {@link FormatReaderFactory} implementation. */
102+
private static class CsvReaderFactory implements FormatReaderFactory {
103+
104+
private final RowType rowType;
105+
private final CsvOptions options;
106+
107+
public CsvReaderFactory(RowType rowType, CsvOptions options) {
108+
this.rowType = rowType;
109+
this.options = options;
110+
}
111+
112+
@Override
113+
public FileRecordReader<InternalRow> createReader(Context context) throws IOException {
114+
return new CsvFileReader(context.fileIO(), context.filePath(), rowType, options);
115+
}
116+
}
117+
105118
/** A {@link FormatWriterFactory} to write {@link InternalRow} to CSV. */
106119
private static class CsvWriterFactory implements FormatWriterFactory {
107120

108121
private final RowType rowType;
109-
private final Options options;
122+
private final CsvOptions options;
110123

111-
public CsvWriterFactory(RowType rowType, Options options) {
124+
public CsvWriterFactory(RowType rowType, CsvOptions options) {
112125
this.rowType = rowType;
113126
this.options = options;
114127
}
115128

116129
@Override
117130
public FormatWriter create(PositionOutputStream out, String compression)
118131
throws IOException {
119-
CompressionType compressionType =
120-
TextCompression.getTextCompressionType(compression, options);
132+
TextCompressionType compressionType =
133+
TextCompression.getTextCompressionType(compression);
121134
return new CsvFormatWriter(
122135
new CloseShieldOutputStream(out), rowType, options, compressionType);
123136
}

paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,9 @@
2323
import org.apache.paimon.data.BinaryString;
2424
import org.apache.paimon.data.GenericRow;
2525
import org.apache.paimon.data.InternalRow;
26-
import org.apache.paimon.format.BaseTextFileReader;
26+
import org.apache.paimon.format.text.BaseTextFileReader;
2727
import org.apache.paimon.fs.FileIO;
2828
import org.apache.paimon.fs.Path;
29-
import org.apache.paimon.options.Options;
3029
import org.apache.paimon.types.DataType;
3130
import org.apache.paimon.types.DataTypeRoot;
3231
import org.apache.paimon.types.DataTypes;
@@ -54,10 +53,10 @@ public class CsvFileReader extends BaseTextFileReader {
5453
private final CsvSchema schema;
5554
private boolean headerSkipped = false;
5655

57-
public CsvFileReader(FileIO fileIO, Path filePath, RowType rowType, Options options)
56+
public CsvFileReader(FileIO fileIO, Path filePath, RowType rowType, CsvOptions options)
5857
throws IOException {
59-
super(fileIO, filePath, rowType, options);
60-
this.formatOptions = new CsvOptions(options);
58+
super(fileIO, filePath, rowType);
59+
this.formatOptions = options;
6160
this.schema =
6261
CsvSchema.emptySchema()
6362
.withQuoteChar(formatOptions.quoteCharacter().charAt(0))

paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,9 @@
2121
import org.apache.paimon.casting.CastExecutor;
2222
import org.apache.paimon.casting.CastExecutors;
2323
import org.apache.paimon.data.InternalRow;
24-
import org.apache.paimon.format.BaseTextFileWriter;
25-
import org.apache.paimon.format.CompressionType;
24+
import org.apache.paimon.format.TextCompressionType;
25+
import org.apache.paimon.format.text.BaseTextFileWriter;
2626
import org.apache.paimon.fs.PositionOutputStream;
27-
import org.apache.paimon.options.Options;
2827
import org.apache.paimon.types.DataType;
2928
import org.apache.paimon.types.DataTypeRoot;
3029
import org.apache.paimon.types.RowType;
@@ -47,10 +46,13 @@ public class CsvFormatWriter extends BaseTextFileWriter {
4746
private final StringBuilder stringBuilder;
4847

4948
public CsvFormatWriter(
50-
PositionOutputStream out, RowType rowType, Options options, CompressionType compression)
49+
PositionOutputStream out,
50+
RowType rowType,
51+
CsvOptions options,
52+
TextCompressionType compression)
5153
throws IOException {
52-
super(out, rowType, options, compression);
53-
this.csvOptions = new CsvOptions(options);
54+
super(out, rowType, compression);
55+
this.csvOptions = options;
5456
this.stringBuilder = new StringBuilder();
5557
}
5658

paimon-format/src/main/java/org/apache/paimon/format/csv/CsvReaderFactory.java

Lines changed: 0 additions & 44 deletions
This file was deleted.

0 commit comments

Comments
 (0)