Skip to content

Commit ec8919a

Browse files
yzeng1618zengyi
andauthored
[Bug][Connector-V2][File] Fix IndexOutOfBoundsException when reading empty directories (#10373)
Co-authored-by: zengyi <zengyi@chinatelecom.cn>
1 parent 48968fb commit ec8919a

File tree

8 files changed

+186
-11
lines changed

8 files changed

+186
-11
lines changed

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ public void init(HadoopConf conf) {
136136
public void setCatalogTable(CatalogTable catalogTable) {
137137
this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
138138
this.seaTunnelRowTypeWithPartition =
139-
mergePartitionTypes(fileNames.get(0), catalogTable.getSeaTunnelRowType());
139+
mergePartitionTypes(getPathForPartitionInference(null), this.seaTunnelRowType);
140140
}
141141

142142
boolean checkFileType(String path) {
@@ -427,13 +427,26 @@ protected void readProcess(
427427

428428
protected Map<String, String> parsePartitionsByPath(String path) {
429429
LinkedHashMap<String, String> partitions = new LinkedHashMap<>();
430+
if (StringUtils.isBlank(path)) {
431+
return partitions;
432+
}
430433
Arrays.stream(path.split("/", -1))
431434
.filter(split -> split.contains("="))
432435
.map(split -> split.split("=", -1))
433436
.forEach(kv -> partitions.put(kv[0], kv[1]));
434437
return partitions;
435438
}
436439

440+
protected String getPathForPartitionInference(String fallbackPath) {
441+
if (!fileNames.isEmpty()) {
442+
return fileNames.get(0);
443+
}
444+
if (StringUtils.isNotBlank(fallbackPath)) {
445+
return fallbackPath;
446+
}
447+
return sourceRootPath;
448+
}
449+
437450
protected SeaTunnelRowType mergePartitionTypes(String path, SeaTunnelRowType seaTunnelRowType) {
438451
Map<String, String> partitionsMap = parsePartitionsByPath(path);
439452
if (partitionsMap.isEmpty()) {

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ private List<String> getHeaders(CSVParser csvParser) {
223223
public SeaTunnelRowType getSeaTunnelRowTypeInfo(String path) {
224224
this.seaTunnelRowType = CatalogTableUtil.buildSimpleTextSchema();
225225
this.seaTunnelRowTypeWithPartition =
226-
mergePartitionTypes(fileNames.get(0), seaTunnelRowType);
226+
mergePartitionTypes(getPathForPartitionInference(path), seaTunnelRowType);
227227
initFormatter();
228228
if (pluginConfig.hasPath(FileBaseSourceOptions.READ_COLUMNS.key())) {
229229
throw new FileConnectorException(
@@ -256,8 +256,9 @@ private String getDelimiter() {
256256
public void setCatalogTable(CatalogTable catalogTable) {
257257
SeaTunnelRowType rowType = catalogTable.getSeaTunnelRowType();
258258
this.inputCatalogTable = catalogTable;
259+
String partitionPath = getPathForPartitionInference(null);
259260
SeaTunnelRowType userDefinedRowTypeWithPartition =
260-
mergePartitionTypes(fileNames.get(0), rowType);
261+
mergePartitionTypes(partitionPath, rowType);
261262
ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(pluginConfig);
262263
encoding =
263264
readonlyConfig
@@ -295,7 +296,7 @@ public void setCatalogTable(CatalogTable catalogTable) {
295296
}
296297
this.seaTunnelRowType = new SeaTunnelRowType(fields, types);
297298
this.seaTunnelRowTypeWithPartition =
298-
mergePartitionTypes(fileNames.get(0), this.seaTunnelRowType);
299+
mergePartitionTypes(partitionPath, this.seaTunnelRowType);
299300
} else {
300301
this.seaTunnelRowType = rowType;
301302
this.seaTunnelRowTypeWithPartition = userDefinedRowTypeWithPartition;

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ExcelReadStrategy.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -218,8 +218,9 @@ public void setCatalogTable(CatalogTable catalogTable) {
218218
CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
219219
"Schema information is not set or incorrect Schema settings");
220220
}
221+
String partitionPath = getPathForPartitionInference(null);
221222
SeaTunnelRowType userDefinedRowTypeWithPartition =
222-
mergePartitionTypes(fileNames.get(0), rowType);
223+
mergePartitionTypes(partitionPath, rowType);
223224
// column projection
224225
if (pluginConfig.hasPath(FileBaseSourceOptions.READ_COLUMNS.key())) {
225226
// get the read column index from user-defined row type
@@ -233,7 +234,7 @@ public void setCatalogTable(CatalogTable catalogTable) {
233234
}
234235
this.seaTunnelRowType = new SeaTunnelRowType(fields, types);
235236
this.seaTunnelRowTypeWithPartition =
236-
mergePartitionTypes(fileNames.get(0), this.seaTunnelRowType);
237+
mergePartitionTypes(partitionPath, this.seaTunnelRowType);
237238
} else {
238239
this.seaTunnelRowType = rowType;
239240
this.seaTunnelRowTypeWithPartition = userDefinedRowTypeWithPartition;

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ private void processLineData(
273273
public SeaTunnelRowType getSeaTunnelRowTypeInfo(String path) {
274274
this.seaTunnelRowType = CatalogTableUtil.buildSimpleTextSchema();
275275
this.seaTunnelRowTypeWithPartition =
276-
mergePartitionTypes(fileNames.get(0), seaTunnelRowType);
276+
mergePartitionTypes(getPathForPartitionInference(path), seaTunnelRowType);
277277
initFormatter();
278278
if (pluginConfig.hasPath(FileBaseSourceOptions.READ_COLUMNS.key())) {
279279
throw new FileConnectorException(
@@ -302,8 +302,9 @@ public SeaTunnelRowType getSeaTunnelRowTypeInfo(String path) {
302302
@Override
303303
public void setCatalogTable(CatalogTable catalogTable) {
304304
SeaTunnelRowType rowType = catalogTable.getSeaTunnelRowType();
305+
String partitionPath = getPathForPartitionInference(null);
305306
SeaTunnelRowType userDefinedRowTypeWithPartition =
306-
mergePartitionTypes(fileNames.get(0), rowType);
307+
mergePartitionTypes(partitionPath, rowType);
307308
ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(pluginConfig);
308309
Optional<String> fieldDelimiterOptional =
309310
readonlyConfig.getOptional(FileBaseSourceOptions.FIELD_DELIMITER);
@@ -343,7 +344,7 @@ public void setCatalogTable(CatalogTable catalogTable) {
343344
}
344345
this.seaTunnelRowType = new SeaTunnelRowType(fields, types);
345346
this.seaTunnelRowTypeWithPartition =
346-
mergePartitionTypes(fileNames.get(0), this.seaTunnelRowType);
347+
mergePartitionTypes(partitionPath, this.seaTunnelRowType);
347348
} else {
348349
this.seaTunnelRowType = rowType;
349350
this.seaTunnelRowTypeWithPartition = userDefinedRowTypeWithPartition;

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/XmlReadStrategy.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -186,9 +186,10 @@ public void setCatalogTable(CatalogTable catalogTable) {
186186
"Schema information is undefined or misconfigured, please check your configuration file.");
187187
}
188188

189+
String partitionPath = getPathForPartitionInference(null);
189190
if (readColumns.isEmpty()) {
190191
this.seaTunnelRowType = rowType;
191-
this.seaTunnelRowTypeWithPartition = mergePartitionTypes(fileNames.get(0), rowType);
192+
this.seaTunnelRowTypeWithPartition = mergePartitionTypes(partitionPath, rowType);
192193
} else {
193194
if (readColumns.retainAll(Arrays.asList(rowType.getFieldNames()))) {
194195
log.warn(
@@ -205,7 +206,7 @@ public void setCatalogTable(CatalogTable catalogTable) {
205206
}
206207
this.seaTunnelRowType = new SeaTunnelRowType(fields, types);
207208
this.seaTunnelRowTypeWithPartition =
208-
mergePartitionTypes(fileNames.get(0), this.seaTunnelRowType);
209+
mergePartitionTypes(partitionPath, this.seaTunnelRowType);
209210
}
210211
}
211212

seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategyTest.java

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,15 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
1919

20+
import org.apache.seatunnel.shade.com.typesafe.config.Config;
21+
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
22+
23+
import org.apache.seatunnel.api.table.catalog.CatalogTable;
24+
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
25+
import org.apache.seatunnel.api.table.type.BasicType;
26+
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
27+
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
28+
import org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
2029
import org.apache.seatunnel.connectors.seatunnel.file.writer.ParquetReadStrategyTest;
2130

2231
import org.apache.avro.Schema;
@@ -40,7 +49,9 @@
4049
import java.io.IOException;
4150
import java.text.SimpleDateFormat;
4251
import java.util.Date;
52+
import java.util.HashMap;
4353
import java.util.List;
54+
import java.util.Map;
4455

4556
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT;
4657

@@ -227,4 +238,86 @@ void testOnlyStartDateOutOfRange() throws Exception {
227238
Assertions.assertTrue(result);
228239
}
229240
}
241+
242+
@Test
243+
public void testSetCatalogTableShouldNotThrowWhenFileListIsEmpty() {
244+
Config pluginConfig = ConfigFactory.parseMap(buildBasePluginConfigWithPartitions());
245+
CatalogTable catalogTable = buildCatalogTable();
246+
247+
Assertions.assertAll(
248+
() -> {
249+
try (ReadStrategy strategy = new TextReadStrategy()) {
250+
assertSetCatalogTableWithEmptyFileNames(
251+
strategy, pluginConfig, catalogTable);
252+
}
253+
},
254+
() -> {
255+
try (ReadStrategy strategy = new CsvReadStrategy()) {
256+
assertSetCatalogTableWithEmptyFileNames(
257+
strategy, pluginConfig, catalogTable);
258+
}
259+
},
260+
() -> {
261+
try (ReadStrategy strategy = new ExcelReadStrategy()) {
262+
assertSetCatalogTableWithEmptyFileNames(
263+
strategy, pluginConfig, catalogTable);
264+
}
265+
},
266+
() -> {
267+
try (ReadStrategy strategy = new XmlReadStrategy()) {
268+
assertSetCatalogTableWithEmptyFileNames(
269+
strategy, pluginConfig, catalogTable);
270+
}
271+
},
272+
() -> {
273+
try (ReadStrategy strategy = new JsonReadStrategy()) {
274+
assertSetCatalogTableWithEmptyFileNames(
275+
strategy, pluginConfig, catalogTable);
276+
}
277+
});
278+
}
279+
280+
@Test
281+
public void testGetSeaTunnelRowTypeInfoShouldNotThrowWhenFileListIsEmpty() throws Exception {
282+
Config pluginConfig = ConfigFactory.parseMap(buildBasePluginConfigWithPartitions());
283+
284+
try (TextReadStrategy textReadStrategy = new TextReadStrategy()) {
285+
textReadStrategy.setPluginConfig(pluginConfig);
286+
SeaTunnelRowType textRowType =
287+
Assertions.assertDoesNotThrow(
288+
() -> textReadStrategy.getSeaTunnelRowTypeInfo("/tmp/dt=2024-01-01"));
289+
Assertions.assertEquals(
290+
"dt", textRowType.getFieldNames()[textRowType.getTotalFields() - 1]);
291+
}
292+
293+
try (CsvReadStrategy csvReadStrategy = new CsvReadStrategy()) {
294+
csvReadStrategy.setPluginConfig(pluginConfig);
295+
SeaTunnelRowType csvRowType =
296+
Assertions.assertDoesNotThrow(
297+
() -> csvReadStrategy.getSeaTunnelRowTypeInfo("/tmp/dt=2024-01-01"));
298+
Assertions.assertEquals(
299+
"dt", csvRowType.getFieldNames()[csvRowType.getTotalFields() - 1]);
300+
}
301+
}
302+
303+
private static Map<String, Object> buildBasePluginConfigWithPartitions() {
304+
Map<String, Object> config = new HashMap<>();
305+
config.put(FileBaseSourceOptions.FILE_PATH.key(), "/tmp/dt=2024-01-01");
306+
return config;
307+
}
308+
309+
private static CatalogTable buildCatalogTable() {
310+
SeaTunnelRowType rowType =
311+
new SeaTunnelRowType(
312+
new String[] {"id"}, new SeaTunnelDataType[] {BasicType.INT_TYPE});
313+
return CatalogTableUtil.getCatalogTable("test", rowType);
314+
}
315+
316+
private static void assertSetCatalogTableWithEmptyFileNames(
317+
ReadStrategy readStrategy, Config pluginConfig, CatalogTable catalogTable) {
318+
readStrategy.setPluginConfig(pluginConfig);
319+
Assertions.assertDoesNotThrow(() -> readStrategy.setCatalogTable(catalogTable));
320+
SeaTunnelRowType actualRowType = readStrategy.getActualSeaTunnelRowTypeInfo();
321+
Assertions.assertArrayEquals(new String[] {"id", "dt"}, actualRowType.getFieldNames());
322+
}
230323
}

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/hdfs/HdfsFileIT.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,19 @@ public void testHdfsRead(TestContainer container) throws IOException, Interrupte
129129
Assertions.assertEquals(0, readResult.getExitCode());
130130
}
131131

132+
@TestTemplate
133+
public void testHdfsReadEmptyTextDirectory(TestContainer container)
134+
throws IOException, InterruptedException {
135+
nameNode.execInContainer("bash", "-c", "hdfs dfs -rm -r -f /empty/text || true");
136+
org.testcontainers.containers.Container.ExecResult mkdirResult =
137+
nameNode.execInContainer("hdfs", "dfs", "-mkdir", "-p", "/empty/text");
138+
Assertions.assertEquals(0, mkdirResult.getExitCode());
139+
140+
org.testcontainers.containers.Container.ExecResult readResult =
141+
container.executeJob("/hdfs_empty_text_to_assert.conf");
142+
Assertions.assertEquals(0, readResult.getExitCode());
143+
}
144+
132145
@TestTemplate
133146
public void testHdfsBinaryUpdateModeDistcp(TestContainer container)
134147
throws IOException, InterruptedException {
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
env {
19+
parallelism = 1
20+
job.mode = "BATCH"
21+
}
22+
23+
source {
24+
HdfsFile {
25+
fs.defaultFS = "hdfs://namenode1:9000"
26+
path = "/empty/text"
27+
file_format_type = "text"
28+
schema = {
29+
fields {
30+
id = int
31+
name = string
32+
}
33+
}
34+
hadoop_conf = {
35+
"dfs.replication" = 1
36+
}
37+
}
38+
}
39+
40+
sink {
41+
Assert {
42+
rules {
43+
row_rules = [
44+
{
45+
rule_type = MAX_ROW
46+
rule_value = 0
47+
}
48+
]
49+
}
50+
}
51+
}
52+

0 commit comments

Comments
 (0)