Skip to content

Commit 744f636

Browse files
TyrantLuciferchao.tian
authored and
chao.tian
committed
[Feature][Connector-V2][File] Support skip corrupt files
1 parent 59cccb6 commit 744f636

File tree

4 files changed

+20
-0
lines changed

4 files changed

+20
-0
lines changed

Diff for: seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java

+7
Original file line numberDiff line numberDiff line change
@@ -126,4 +126,11 @@ public class BaseSourceConfig {
126126
.enumType(CompressFormat.class)
127127
.defaultValue(CompressFormat.NONE)
128128
.withDescription("Compression codec");
129+
130+
public static final Option<Boolean> IGNORE_CORRUPT_FILES =
131+
Options.key("ignore_corrupt_files")
132+
.booleanType()
133+
.defaultValue(true)
134+
.withDescription(
135+
"Whether corrupted [orc/parquet] files are ignored during synchronization");
129136
}

Diff for: seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java

+5
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ public abstract class AbstractReadStrategy implements ReadStrategy {
7575
protected List<String> readColumns = new ArrayList<>();
7676
protected boolean isMergePartition = true;
7777
protected long skipHeaderNumber = BaseSourceConfig.SKIP_HEADER_ROW_NUMBER.defaultValue();
78+
protected boolean ignoreCorruptFiles = BaseSourceConfig.IGNORE_CORRUPT_FILES.defaultValue();
7879
protected transient boolean isKerberosAuthorization = false;
7980

8081
protected Pattern pattern;
@@ -177,6 +178,10 @@ public void setPluginConfig(Config pluginConfig) {
177178
pluginConfig.getString(BaseSourceConfig.FILE_FILTER_PATTERN.key());
178179
this.pattern = Pattern.compile(Matcher.quoteReplacement(filterPattern));
179180
}
181+
if (pluginConfig.hasPath(BaseSourceConfig.IGNORE_CORRUPT_FILES.key())) {
182+
this.ignoreCorruptFiles =
183+
pluginConfig.getBoolean(BaseSourceConfig.IGNORE_CORRUPT_FILES.key());
184+
}
180185
}
181186

182187
@Override

Diff for: seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java

+4
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,10 @@ public class OrcReadStrategy extends AbstractReadStrategy {
7777
public void read(String path, Collector<SeaTunnelRow> output)
7878
throws FileConnectorException, IOException {
7979
if (Boolean.FALSE.equals(checkFileType(path))) {
80+
if (ignoreCorruptFiles) {
81+
log.warn("This file [{}] is corrupted, it will be ignored", path);
82+
return;
83+
}
8084
String errorMsg =
8185
String.format(
8286
"This file [%s] is not a orc file, please check the format of this file",

Diff for: seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java

+4
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,10 @@ public class ParquetReadStrategy extends AbstractReadStrategy {
8484
public void read(String path, Collector<SeaTunnelRow> output)
8585
throws FileConnectorException, IOException {
8686
if (Boolean.FALSE.equals(checkFileType(path))) {
87+
if (ignoreCorruptFiles) {
88+
log.warn("This file [{}] is corrupted, it will be ignored", path);
89+
return;
90+
}
8791
String errorMsg =
8892
String.format(
8993
"This file [%s] is not a parquet file, please check the format of this file",

0 commit comments

Comments
 (0)