Skip to content

Commit 238386a

Browse files
committed
[format] Fix none compression for csv and json formats
1 parent c55afa0 commit 238386a

File tree

20 files changed

+309
-456
lines changed

20 files changed

+309
-456
lines changed

paimon-common/src/main/java/org/apache/paimon/format/CompressionType.java renamed to paimon-common/src/main/java/org/apache/paimon/format/HadoopCompressionType.java

Lines changed: 26 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,15 @@
2121
import org.apache.paimon.options.description.DescribedEnum;
2222
import org.apache.paimon.options.description.InlineElement;
2323

24-
import java.util.Collections;
25-
import java.util.HashSet;
26-
import java.util.Set;
24+
import javax.annotation.Nullable;
25+
26+
import java.util.Optional;
2727

2828
import static org.apache.paimon.options.description.TextElement.text;
2929

30-
/** Compression types supported by Paimon file formats. */
31-
public enum CompressionType implements DescribedEnum {
32-
NONE("none", "No compression.", null, ""),
30+
/** Compression types supported by hadoop compression. */
31+
public enum HadoopCompressionType implements DescribedEnum {
32+
NONE("none", "No compression.", null, null),
3333
GZIP(
3434
"gzip",
3535
"GZIP compression using the deflate algorithm.",
@@ -62,25 +62,15 @@ public enum CompressionType implements DescribedEnum {
6262
"zst");
6363

6464
private final String value;
65-
private final String className;
66-
private final String fileExtension;
6765
private final String description;
68-
69-
private static final Set<String> SUPPORTED_EXTENSIONS;
70-
71-
static {
72-
Set<String> extensions = new HashSet<>();
73-
for (CompressionType type : CompressionType.values()) {
74-
if (type != CompressionType.NONE
75-
&& type.fileExtension() != null
76-
&& !type.fileExtension().isEmpty()) {
77-
extensions.add(type.fileExtension().toLowerCase());
78-
}
79-
}
80-
SUPPORTED_EXTENSIONS = Collections.unmodifiableSet(extensions);
81-
}
82-
83-
CompressionType(String value, String description, String className, String fileExtension) {
66+
private final @Nullable String className;
67+
private final @Nullable String fileExtension;
68+
69+
HadoopCompressionType(
70+
String value,
71+
String description,
72+
@Nullable String className,
73+
@Nullable String fileExtension) {
8474
this.value = value;
8575
this.description = description;
8676
this.className = className;
@@ -101,37 +91,31 @@ public String value() {
10191
return value;
10292
}
10393

94+
@Nullable
10495
public String hadoopCodecClassName() {
10596
return className;
10697
}
10798

99+
@Nullable
108100
public String fileExtension() {
109101
return fileExtension;
110102
}
111103

112-
public static CompressionType fromValue(String value) {
113-
if (value == null || value.isEmpty()) {
114-
return NONE;
115-
}
116-
117-
for (CompressionType type : CompressionType.values()) {
104+
public static Optional<HadoopCompressionType> fromValue(String value) {
105+
for (HadoopCompressionType type : HadoopCompressionType.values()) {
118106
if (type.value.equalsIgnoreCase(value)) {
119-
return type;
107+
return Optional.of(type);
120108
}
121109
}
122-
return NONE;
110+
return Optional.empty();
123111
}
124112

125-
/**
126-
* Check if the given extension is a supported compression extension.
127-
*
128-
* @param extension the file extension to check
129-
* @return true if the extension is a supported compression extension, false otherwise
130-
*/
131-
public static boolean isSupportedExtension(String extension) {
132-
if (extension == null || extension.isEmpty()) {
133-
return false;
113+
public static boolean isCompressExtension(String extension) {
114+
for (HadoopCompressionType type : HadoopCompressionType.values()) {
115+
if (extension.equalsIgnoreCase(type.fileExtension)) {
116+
return true;
117+
}
134118
}
135-
return SUPPORTED_EXTENSIONS.contains(extension.toLowerCase());
119+
return false;
136120
}
137121
}

paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
package org.apache.paimon.io;
2020

2121
import org.apache.paimon.annotation.VisibleForTesting;
22-
import org.apache.paimon.format.CompressionType;
22+
import org.apache.paimon.format.HadoopCompressionType;
2323
import org.apache.paimon.fs.ExternalPathProvider;
2424
import org.apache.paimon.fs.Path;
2525
import org.apache.paimon.manifest.FileEntry;
@@ -31,6 +31,9 @@
3131
import java.util.UUID;
3232
import java.util.concurrent.atomic.AtomicInteger;
3333

34+
import static org.apache.paimon.utils.Preconditions.checkArgument;
35+
import static org.apache.paimon.utils.StringUtils.isEmpty;
36+
3437
/** Factory which produces new {@link Path}s for data files. */
3538
@ThreadSafe
3639
public class DataFilePathFactory {
@@ -45,8 +48,8 @@ public class DataFilePathFactory {
4548
private final String dataFilePrefix;
4649
private final String changelogFilePrefix;
4750
private final boolean fileSuffixIncludeCompression;
48-
private final String fileCompression;
49-
@Nullable private final ExternalPathProvider externalPathProvider;
51+
private final @Nullable String compressExtension;
52+
private final @Nullable ExternalPathProvider externalPathProvider;
5053

5154
public DataFilePathFactory(
5255
Path parent,
@@ -63,7 +66,7 @@ public DataFilePathFactory(
6366
this.dataFilePrefix = dataFilePrefix;
6467
this.changelogFilePrefix = changelogFilePrefix;
6568
this.fileSuffixIncludeCompression = fileSuffixIncludeCompression;
66-
this.fileCompression = fileCompression;
69+
this.compressExtension = compressFileExtension(fileCompression);
6770
this.externalPathProvider = externalPathProvider;
6871
}
6972

@@ -89,14 +92,10 @@ public Path newPath(String prefix) {
8992

9093
private String newFileName(String prefix) {
9194
String extension;
92-
if (isTextFormat(formatIdentifier)) {
93-
String compressionExtension =
94-
CompressionType.fromValue(fileCompression).fileExtension();
95-
extension = "." + formatIdentifier + "." + compressionExtension;
96-
} else if (fileSuffixIncludeCompression) {
97-
String compressionExtension =
98-
CompressionType.fromValue(fileCompression).fileExtension();
99-
extension = "." + compressionExtension + "." + formatIdentifier;
95+
if (compressExtension != null && isTextFormat(formatIdentifier)) {
96+
extension = "." + formatIdentifier + "." + compressExtension;
97+
} else if (compressExtension != null && fileSuffixIncludeCompression) {
98+
extension = "." + compressExtension + "." + formatIdentifier;
10099
} else {
101100
extension = "." + formatIdentifier;
102101
}
@@ -165,20 +164,13 @@ public static Path createNewFileIndexFilePath(Path filePath) {
165164

166165
public static String formatIdentifier(String fileName) {
167166
int index = fileName.lastIndexOf('.');
168-
if (index == -1) {
169-
throw new IllegalArgumentException(fileName + " is not a legal file name.");
170-
}
167+
checkArgument(index != -1, "%s is not a legal file name.", fileName);
171168

172169
String extension = fileName.substring(index + 1);
173-
if (CompressionType.isSupportedExtension(extension)) {
170+
if (HadoopCompressionType.isCompressExtension(extension)) {
174171
int secondLastDot = fileName.lastIndexOf('.', index - 1);
175-
if (secondLastDot != -1) {
176-
String formatIdentifier = fileName.substring(secondLastDot + 1, index);
177-
// If the format is json or csv, return that instead of the compression extension
178-
if (isTextFormat(formatIdentifier)) {
179-
return formatIdentifier;
180-
}
181-
}
172+
checkArgument(secondLastDot != -1, "%s is not a legal file name.", fileName);
173+
return fileName.substring(secondLastDot + 1, index);
182174
}
183175

184176
return extension;
@@ -197,4 +189,18 @@ private static boolean isTextFormat(String formatIdentifier) {
197189
return "json".equalsIgnoreCase(formatIdentifier)
198190
|| "csv".equalsIgnoreCase(formatIdentifier);
199191
}
192+
193+
@Nullable
194+
private static String compressFileExtension(String compression) {
195+
if (isEmpty(compression)) {
196+
return null;
197+
}
198+
199+
Optional<HadoopCompressionType> hadoopOptional =
200+
HadoopCompressionType.fromValue(compression);
201+
if (hadoopOptional.isPresent()) {
202+
return hadoopOptional.get().fileExtension();
203+
}
204+
return compression;
205+
}
200206
}

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java

Lines changed: 33 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -65,36 +65,47 @@ protected List<String> ddl() {
6565

6666
@Test
6767
public void testCsvFileFormat() {
68-
sql("CREATE TABLE CSV (a INT, b INT, c INT) WITH ('file.format'='csv')");
69-
sql("INSERT INTO CSV VALUES (1, 2, 3)");
70-
assertThat(sql("SELECT * FROM CSV")).containsExactly(Row.of(1, 2, 3));
71-
72-
sql(
73-
"CREATE TABLE CSV_GZIP (a INT, b INT, c INT) WITH ('file.format'='csv', 'file.compression'='gzip')");
74-
sql("INSERT INTO CSV_GZIP VALUES (1, 2, 3)");
75-
assertThat(sql("SELECT * FROM CSV_GZIP")).containsExactly(Row.of(1, 2, 3));
76-
List<String> files =
77-
sql("select file_path from `CSV_GZIP$files`").stream()
78-
.map(r -> r.getField(0).toString())
79-
.collect(Collectors.toList());
80-
assertThat(files).allMatch(file -> file.endsWith("csv.gz"));
68+
innerTestTextFileFormat("csv");
8169
}
8270

8371
@Test
8472
public void testJsonFileFormat() {
85-
sql("CREATE TABLE JSON_T (a INT, b INT, c INT) WITH ('file.format'='json')");
86-
sql("INSERT INTO JSON_T VALUES (1, 2, 3)");
87-
assertThat(sql("SELECT * FROM JSON_T")).containsExactly(Row.of(1, 2, 3));
73+
innerTestTextFileFormat("json");
74+
}
75+
76+
private void innerTestTextFileFormat(String format) {
77+
// TODO zstd dependent on Hadoop 3.x
78+
// sql("CREATE TABLE TEXT_T (a INT, b INT, c INT) WITH ('file.format'='%s')",
79+
// format);
80+
// sql("INSERT INTO TEXT_T VALUES (1, 2, 3)");
81+
// assertThat(sql("SELECT * FROM TEXT_T")).containsExactly(Row.of(1, 2, 3));
82+
// List<String> files =
83+
// sql("select file_path from `TEXT_T$files`").stream()
84+
// .map(r -> r.getField(0).toString())
85+
// .collect(Collectors.toList());
86+
// assertThat(files).allMatch(file -> file.endsWith(format + ".zst"));
87+
88+
sql(
89+
"CREATE TABLE TEXT_NONE (a INT, b INT, c INT) WITH ('file.format'='%s', 'file.compression'='none')",
90+
format);
91+
sql("INSERT INTO TEXT_NONE VALUES (1, 2, 3)");
92+
assertThat(sql("SELECT * FROM TEXT_NONE")).containsExactly(Row.of(1, 2, 3));
93+
List<String> files =
94+
sql("select file_path from `TEXT_NONE$files`").stream()
95+
.map(r -> r.getField(0).toString())
96+
.collect(Collectors.toList());
97+
assertThat(files).allMatch(file -> file.endsWith(format));
8898

8999
sql(
90-
"CREATE TABLE JSON_GZIP (a INT, b INT, c INT) WITH ('file.format'='json', 'file.compression'='gzip')");
91-
sql("INSERT INTO JSON_GZIP VALUES (1, 2, 3)");
92-
assertThat(sql("SELECT * FROM JSON_GZIP")).containsExactly(Row.of(1, 2, 3));
93-
List<String> files =
94-
sql("select file_path from `JSON_GZIP$files`").stream()
100+
"CREATE TABLE TEXT_GZIP (a INT, b INT, c INT) WITH ('file.format'='%s', 'file.compression'='gzip')",
101+
format);
102+
sql("INSERT INTO TEXT_GZIP VALUES (1, 2, 3)");
103+
assertThat(sql("SELECT * FROM TEXT_GZIP")).containsExactly(Row.of(1, 2, 3));
104+
files =
105+
sql("select file_path from `TEXT_GZIP$files`").stream()
95106
.map(r -> r.getField(0).toString())
96107
.collect(Collectors.toList());
97-
assertThat(files).allMatch(file -> file.endsWith("json.gz"));
108+
assertThat(files).allMatch(file -> file.endsWith(format + ".gz"));
98109
}
99110

100111
@Test

0 commit comments

Comments
 (0)