Skip to content

Commit 6a43e8b

Browse files
committed
[Feature][Connector-V2][File] Support skip corrupt files
1 parent 8d2fafc commit 6a43e8b

File tree

12 files changed

+65
-9
lines changed

12 files changed

+65
-9
lines changed

Diff for: docs/en/connector-v2/source/CosFile.md

+10-5
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,10 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
5353
| datetime_format | string | no | yyyy-MM-dd HH:mm:ss |
5454
| time_format | string | no | HH:mm:ss |
5555
| schema | config | no | - |
56-
| common-options | | no | - |
5756
| sheet_name | string | no | - |
5857
| file_filter_pattern | string | no | - |
58+
| ignore_corrupt_files | boolean | no | true |
59+
| common-options | | no | - |
5960

6061
### path [string]
6162

@@ -240,10 +241,6 @@ The file type supported column projection as the following shown:
240241

241242
**Tips: If the user wants to use this feature when reading `text` `json` `csv` files, the schema option must be configured**
242243

243-
### common options
244-
245-
Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details.
246-
247244
### sheet_name [string]
248245

249246
Reader the sheet of the workbook,Only used when file_format is excel.
@@ -252,6 +249,14 @@ Reader the sheet of the workbook,Only used when file_format is excel.
252249

253250
Filter pattern, which used for filtering files.
254251

252+
### ignore_corrupt_files [boolean]
253+
254+
Whether corrupted [orc/parquet] files are ignored during synchronization
255+
256+
### common options
257+
258+
Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details.
259+
255260
## Example
256261

257262
```hocon

Diff for: docs/en/connector-v2/source/FtpFile.md

+13-4
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you
4949
| common-options | | no | - |
5050
| sheet_name | string | no | - |
5151
| file_filter_pattern | string | no | - |
52+
| ignore_corrupt_files | boolean | no | true |
5253

5354
### host [string]
5455

@@ -220,13 +221,21 @@ connector will generate data as the following:
220221
|---------------|-----|--------|
221222
| tyrantlucifer | 26 | male |
222223

223-
### common options
224+
### sheet_name [string]
224225

225-
Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details.
226+
Reader the sheet of the workbook,Only used when file_format is excel.
226227

227-
### sheet_name [string]
228+
### file_filter_pattern [string]
229+
230+
Filter pattern, which used for filtering files.
228231

229-
Reader the sheet of the workbook,Only used when file_format_type is excel.
232+
### ignore_corrupt_files [boolean]
233+
234+
Whether corrupted [orc/parquet] files are ignored during synchronization
235+
236+
### common options
237+
238+
Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details.
230239

231240
## Example
232241

Diff for: docs/en/connector-v2/source/HdfsFile.md

+1
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ Read data from hdfs file system.
5757
| schema | config | no | - | the schema fields of upstream data |
5858
| common-options | | no | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details. |
5959
| sheet_name | string | no | - | Reader the sheet of the workbook,Only used when file_format is excel. |
60+
| ignore_corrupt_files | boolean | no | true | Whether corrupted [orc/parquet] files are ignored during synchronization |
6061

6162
### Tips
6263

Diff for: docs/en/connector-v2/source/LocalFile.md

+5
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
5050
| common-options | | no | - |
5151
| sheet_name | string | no | - |
5252
| file_filter_pattern | string | no | - |
53+
| ignore_corrupt_files | boolean | no | true |
5354

5455
### path [string]
5556

@@ -230,6 +231,10 @@ Reader the sheet of the workbook,Only used when file_format_type is excel.
230231

231232
Filter pattern, which used for filtering files.
232233

234+
### ignore_corrupt_files [boolean]
235+
236+
Whether corrupted [orc/parquet] files are ignored during synchronization
237+
233238
## Example
234239

235240
```hocon

Diff for: docs/en/connector-v2/source/OssFile.md

+5
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
5757
| common-options | | no | - |
5858
| sheet_name | string | no | - |
5959
| file_filter_pattern | string | no | - |
60+
| ignore_corrupt_files | boolean | no | true |
6061

6162
### path [string]
6263

@@ -249,6 +250,10 @@ Source plugin common parameters, please refer to [Source Common Options](common-
249250

250251
Reader the sheet of the workbook,Only used when file_format_type is excel.
251252

253+
### ignore_corrupt_files [boolean]
254+
255+
Whether corrupted [orc/parquet] files are ignored during synchronization
256+
252257
## Example
253258

254259
```hocon

Diff for: docs/en/connector-v2/source/OssJindoFile.md

+5
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
6060
| common-options | | no | - |
6161
| sheet_name | string | no | - |
6262
| file_filter_pattern | string | no | - |
63+
| ignore_corrupt_files | boolean | no | true |
6364

6465
### path [string]
6566

@@ -256,6 +257,10 @@ Reader the sheet of the workbook,Only used when file_format_type is excel.
256257

257258
Filter pattern, which used for filtering files.
258259

260+
### ignore_corrupt_files [boolean]
261+
262+
Whether corrupted [orc/parquet] files are ignored during synchronization
263+
259264
## Example
260265

261266
```hocon

Diff for: docs/en/connector-v2/source/S3File.md

+1
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,7 @@ If you assign file type to `parquet` `orc`, schema option not required, connecto
214214
| schema | config | no | - | The schema of upstream data. |
215215
| common-options | | no | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details. |
216216
| sheet_name | string | no | - | Reader the sheet of the workbook,Only used when file_format is excel. |
217+
| ignore_corrupt_files | boolean | no | true | Whether corrupted [orc/parquet] files are ignored during synchronization |
217218

218219
## Example
219220

Diff for: docs/en/connector-v2/source/SftpFile.md

+5
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you
4848
| common-options | | no | - |
4949
| sheet_name | string | no | - |
5050
| file_filter_pattern | string | no | - |
51+
| ignore_corrupt_files | boolean | no | true |
5152

5253
### host [string]
5354

@@ -231,6 +232,10 @@ Reader the sheet of the workbook,Only used when file_format_type is excel.
231232

232233
Filter pattern, which used for filtering files.
233234

235+
### ignore_corrupt_files [boolean]
236+
237+
Whether corrupted [orc/parquet] files are ignored during synchronization.
238+
234239
## Example
235240

236241
```hocon

Diff for: seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java

+7
Original file line numberDiff line numberDiff line change
@@ -119,4 +119,11 @@ public class BaseSourceConfig {
119119
.noDefaultValue()
120120
.withDescription(
121121
"File pattern. The connector will filter some files base on the pattern.");
122+
123+
public static final Option<Boolean> IGNORE_CORRUPT_FILES =
124+
Options.key("ignore_corrupt_files")
125+
.booleanType()
126+
.defaultValue(true)
127+
.withDescription(
128+
"Whether corrupted [orc/parquet] files are ignored during synchronization");
122129
}

Diff for: seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java

+5
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ public abstract class AbstractReadStrategy implements ReadStrategy {
7777
protected List<String> readColumns = new ArrayList<>();
7878
protected boolean isMergePartition = true;
7979
protected long skipHeaderNumber = BaseSourceConfig.SKIP_HEADER_ROW_NUMBER.defaultValue();
80+
protected boolean ignoreCorruptFiles = BaseSourceConfig.IGNORE_CORRUPT_FILES.defaultValue();
8081
protected transient boolean isKerberosAuthorization = false;
8182

8283
protected Pattern pattern;
@@ -187,6 +188,10 @@ public void setPluginConfig(Config pluginConfig) {
187188
pluginConfig.getString(BaseSourceConfig.FILE_FILTER_PATTERN.key());
188189
this.pattern = Pattern.compile(Matcher.quoteReplacement(filterPattern));
189190
}
191+
if (pluginConfig.hasPath(BaseSourceConfig.IGNORE_CORRUPT_FILES.key())) {
192+
this.ignoreCorruptFiles =
193+
pluginConfig.getBoolean(BaseSourceConfig.IGNORE_CORRUPT_FILES.key());
194+
}
190195
}
191196

192197
@Override

Diff for: seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java

+4
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,10 @@ public class OrcReadStrategy extends AbstractReadStrategy {
7777
public void read(String path, Collector<SeaTunnelRow> output)
7878
throws FileConnectorException, IOException {
7979
if (Boolean.FALSE.equals(checkFileType(path))) {
80+
if (ignoreCorruptFiles) {
81+
log.warn("This file [{}] is corrupted, it will be ignored", path);
82+
return;
83+
}
8084
String errorMsg =
8185
String.format(
8286
"This file [%s] is not a orc file, please check the format of this file",

Diff for: seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java

+4
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,10 @@ public class ParquetReadStrategy extends AbstractReadStrategy {
8484
public void read(String path, Collector<SeaTunnelRow> output)
8585
throws FileConnectorException, IOException {
8686
if (Boolean.FALSE.equals(checkFileType(path))) {
87+
if (ignoreCorruptFiles) {
88+
log.warn("This file [{}] is corrupted, it will be ignored", path);
89+
return;
90+
}
8791
String errorMsg =
8892
String.format(
8993
"This file [%s] is not a parquet file, please check the format of this file",

0 commit comments

Comments
 (0)