Skip to content

Commit bf25292

Browse files
authored
[Improve] File souce refactor (#10758)
1 parent 0a2de13 commit bf25292

43 files changed

Lines changed: 535 additions & 862 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java

Lines changed: 0 additions & 159 deletions
This file was deleted.

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileBaseOptions.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323

2424
import java.util.List;
2525

26+
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
27+
2628
public class FileBaseOptions extends ConnectorCommonOptions {
2729

2830
public static final Option<String> FILENAME_EXTENSION =
@@ -165,4 +167,10 @@ public class FileBaseOptions extends ConnectorCommonOptions {
165167
"File split size in bytes when enable_file_split=true. Must be greater than 0. "
166168
+ "For text-like formats, the split end will be aligned to the next row_delimiter. "
167169
+ "Default is 128MB (128*1024*1024).");
170+
171+
public static final Option<String> DEFAULT_FS =
172+
Options.key(FS_DEFAULT_NAME_KEY)
173+
.stringType()
174+
.noDefaultValue()
175+
.withDescription("HDFS namenode host");
168176
}

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSource.java

Lines changed: 92 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,40 +17,125 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.file.source;
1919

20+
import org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
21+
22+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
23+
import org.apache.seatunnel.api.options.ConnectorCommonOptions;
2024
import org.apache.seatunnel.api.source.Boundedness;
2125
import org.apache.seatunnel.api.source.SeaTunnelSource;
2226
import org.apache.seatunnel.api.source.SourceReader;
2327
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
2428
import org.apache.seatunnel.api.source.SupportColumnProjection;
2529
import org.apache.seatunnel.api.source.SupportParallelism;
26-
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
30+
import org.apache.seatunnel.api.table.catalog.CatalogTable;
31+
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
2732
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2833
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
34+
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
35+
import org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
36+
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
2937
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
38+
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
39+
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
3040
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategy;
3141
import org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;
3242
import org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplitEnumerator;
3343
import org.apache.seatunnel.connectors.seatunnel.file.source.state.FileSourceState;
3444

45+
import java.io.IOException;
46+
import java.util.Collections;
3547
import java.util.List;
3648

3749
public abstract class BaseFileSource
3850
implements SeaTunnelSource<SeaTunnelRow, FileSourceSplit, FileSourceState>,
3951
SupportParallelism,
4052
SupportColumnProjection {
41-
protected SeaTunnelRowType rowType;
42-
protected ReadStrategy readStrategy;
43-
protected HadoopConf hadoopConf;
44-
protected List<String> filePaths;
53+
protected ReadonlyConfig pluginConfig;
54+
private final CatalogTable catalogTable;
55+
private final List<String> filePaths;
56+
private final ReadStrategy readStrategy;
57+
58+
/** shouldn't use this construct method. just for testing */
59+
@VisibleForTesting
60+
public BaseFileSource() {
61+
this.catalogTable = null;
62+
this.filePaths = null;
63+
this.readStrategy = null;
64+
}
65+
66+
public BaseFileSource(ReadonlyConfig pluginConfig) {
67+
this.pluginConfig = pluginConfig;
68+
HadoopConf hadoopConf = initHadoopConf();
69+
this.readStrategy =
70+
pluginConfig.get(FileBaseSourceOptions.FILE_FORMAT_TYPE).getReadStrategy();
71+
this.readStrategy.setPluginConfig(pluginConfig.toConfig());
72+
this.readStrategy.init(hadoopConf);
73+
String path = pluginConfig.get(FileBaseSourceOptions.FILE_PATH);
74+
try {
75+
filePaths = readStrategy.getFileNamesByPath(path);
76+
} catch (IOException e) {
77+
String errorMsg = String.format("Get file list from this path [%s] failed", path);
78+
throw new FileConnectorException(
79+
FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg, e);
80+
}
81+
82+
// support user-defined schema
83+
CatalogTable userDefinedCatalogTable;
84+
FileFormat fileFormat = pluginConfig.get(FileBaseSourceOptions.FILE_FORMAT_TYPE);
85+
// only json text csv type support user-defined schema now
86+
if (pluginConfig.getOptional(ConnectorCommonOptions.SCHEMA).isPresent()) {
87+
switch (fileFormat) {
88+
case CSV:
89+
case TEXT:
90+
case JSON:
91+
case EXCEL:
92+
case XML:
93+
userDefinedCatalogTable = CatalogTableUtil.buildWithConfig(pluginConfig);
94+
readStrategy.setCatalogTable(userDefinedCatalogTable);
95+
break;
96+
case ORC:
97+
case PARQUET:
98+
case BINARY:
99+
throw new FileConnectorException(
100+
CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
101+
"SeaTunnel does not support user-defined schema for [parquet, orc, binary] files");
102+
default:
103+
// never got in there
104+
throw new FileConnectorException(
105+
CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
106+
"SeaTunnel does not supported this file format");
107+
}
108+
} else {
109+
if (filePaths.isEmpty() && fileFormat != FileFormat.BINARY) {
110+
// When the directory is empty, distribute default behavior schema
111+
userDefinedCatalogTable = CatalogTableUtil.buildSimpleTextTable();
112+
} else {
113+
try {
114+
SeaTunnelRowType rowType =
115+
readStrategy.getSeaTunnelRowTypeInfo(filePaths.get(0));
116+
userDefinedCatalogTable = CatalogTableUtil.getCatalogTable("default", rowType);
117+
} catch (FileConnectorException e) {
118+
String errorMsg =
119+
String.format(
120+
"Get table schema from file [%s] failed", filePaths.get(0));
121+
throw new FileConnectorException(
122+
CommonErrorCodeDeprecated.TABLE_SCHEMA_GET_FAILED, errorMsg, e);
123+
}
124+
}
125+
}
126+
this.catalogTable = userDefinedCatalogTable;
127+
}
128+
129+
protected abstract HadoopConf initHadoopConf();
45130

46131
@Override
47132
public Boundedness getBoundedness() {
48133
return Boundedness.BOUNDED;
49134
}
50135

51136
@Override
52-
public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
53-
return rowType;
137+
public List<CatalogTable> getProducedCatalogTables() {
138+
return Collections.singletonList(catalogTable);
54139
}
55140

56141
@Override

0 commit comments

Comments
 (0)