Skip to content

Commit cdc79e1

Browse files
authored
Revert " [improve] update localfile connector config" (#9018)
1 parent def369a commit cdc79e1

File tree

98 files changed

+1248
-1452
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

98 files changed

+1248
-1452
lines changed

Diff for: seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java

-12
Original file line numberDiff line numberDiff line change
@@ -121,18 +121,6 @@ public void checkConnectorOptionExist() {
121121
.getNameAsString()
122122
.equals(
123123
"IncrementalSource")
124-
|| extendedType
125-
.getNameAsString()
126-
.equals(
127-
"BaseMultipleTableFileSink")
128-
|| extendedType
129-
.getNameAsString()
130-
.equals(
131-
"BaseFileSource")
132-
|| extendedType
133-
.getNameAsString()
134-
.equals(
135-
"BaseFileSink")
136124
|| extendedType
137125
.getNameAsString()
138126
.equals(

Diff for: seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/BaseHdfsFileSink.java

+11-12
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import org.apache.seatunnel.common.config.CheckConfigUtil;
2525
import org.apache.seatunnel.common.config.CheckResult;
2626
import org.apache.seatunnel.common.constants.PluginType;
27-
import org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSinkOptions;
27+
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig;
2828
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
2929
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
3030
import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
@@ -51,26 +51,25 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
5151
if (Objects.isNull(hadoopConf)) {
5252
hadoopConf = new HadoopConf(pluginConfig.getString(FS_DEFAULT_NAME_KEY));
5353
}
54-
if (pluginConfig.hasPath(FileBaseSinkOptions.HDFS_SITE_PATH.key())) {
55-
hadoopConf.setHdfsSitePath(
56-
pluginConfig.getString(FileBaseSinkOptions.HDFS_SITE_PATH.key()));
54+
if (pluginConfig.hasPath(BaseSinkConfig.HDFS_SITE_PATH.key())) {
55+
hadoopConf.setHdfsSitePath(pluginConfig.getString(BaseSinkConfig.HDFS_SITE_PATH.key()));
5756
}
5857

59-
if (pluginConfig.hasPath(FileBaseSinkOptions.REMOTE_USER.key())) {
60-
hadoopConf.setRemoteUser(pluginConfig.getString(FileBaseSinkOptions.REMOTE_USER.key()));
58+
if (pluginConfig.hasPath(BaseSinkConfig.REMOTE_USER.key())) {
59+
hadoopConf.setRemoteUser(pluginConfig.getString(BaseSinkConfig.REMOTE_USER.key()));
6160
}
6261

63-
if (pluginConfig.hasPath(FileBaseSinkOptions.KRB5_PATH.key())) {
64-
hadoopConf.setKrb5Path(pluginConfig.getString(FileBaseSinkOptions.KRB5_PATH.key()));
62+
if (pluginConfig.hasPath(BaseSinkConfig.KRB5_PATH.key())) {
63+
hadoopConf.setKrb5Path(pluginConfig.getString(BaseSinkConfig.KRB5_PATH.key()));
6564
}
6665

67-
if (pluginConfig.hasPath(FileBaseSinkOptions.KERBEROS_PRINCIPAL.key())) {
66+
if (pluginConfig.hasPath(BaseSinkConfig.KERBEROS_PRINCIPAL.key())) {
6867
hadoopConf.setKerberosPrincipal(
69-
pluginConfig.getString(FileBaseSinkOptions.KERBEROS_PRINCIPAL.key()));
68+
pluginConfig.getString(BaseSinkConfig.KERBEROS_PRINCIPAL.key()));
7069
}
71-
if (pluginConfig.hasPath(FileBaseSinkOptions.KERBEROS_KEYTAB_PATH.key())) {
70+
if (pluginConfig.hasPath(BaseSinkConfig.KERBEROS_KEYTAB_PATH.key())) {
7271
hadoopConf.setKerberosKeytabPath(
73-
pluginConfig.getString(FileBaseSinkOptions.KERBEROS_KEYTAB_PATH.key()));
72+
pluginConfig.getString(BaseSinkConfig.KERBEROS_KEYTAB_PATH.key()));
7473
}
7574
}
7675
}

Diff for: seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/config/HdfsSourceConfigOptions.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@
1919

2020
import org.apache.seatunnel.api.configuration.Option;
2121
import org.apache.seatunnel.api.configuration.Options;
22-
import org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
22+
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
2323

2424
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
2525

26-
public class HdfsSourceConfigOptions extends FileBaseSourceOptions {
26+
public class HdfsSourceConfigOptions extends BaseSourceConfigOptions {
2727
public static final Option<String> DEFAULT_FS =
2828
Options.key(FS_DEFAULT_NAME_KEY)
2929
.stringType()

Diff for: seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java

+42-47
Original file line numberDiff line numberDiff line change
@@ -37,104 +37,99 @@
3737
@Data
3838
public class BaseFileSinkConfig implements DelimiterConfig, Serializable {
3939
private static final long serialVersionUID = 1L;
40-
protected CompressFormat compressFormat = FileBaseSinkOptions.COMPRESS_CODEC.defaultValue();
41-
protected String fieldDelimiter = FileBaseSinkOptions.FIELD_DELIMITER.defaultValue();
42-
protected String rowDelimiter = FileBaseSinkOptions.ROW_DELIMITER.defaultValue();
43-
protected int batchSize = FileBaseSinkOptions.BATCH_SIZE.defaultValue();
40+
protected CompressFormat compressFormat = BaseSinkConfig.COMPRESS_CODEC.defaultValue();
41+
protected String fieldDelimiter = BaseSinkConfig.FIELD_DELIMITER.defaultValue();
42+
protected String rowDelimiter = BaseSinkConfig.ROW_DELIMITER.defaultValue();
43+
protected int batchSize = BaseSinkConfig.BATCH_SIZE.defaultValue();
4444
protected String path;
45-
protected String fileNameExpression = FileBaseSinkOptions.FILE_NAME_EXPRESSION.defaultValue();
46-
protected boolean singleFileMode = FileBaseSinkOptions.SINGLE_FILE_MODE.defaultValue();
45+
protected String fileNameExpression = BaseSinkConfig.FILE_NAME_EXPRESSION.defaultValue();
46+
protected boolean singleFileMode = BaseSinkConfig.SINGLE_FILE_MODE.defaultValue();
4747
protected boolean createEmptyFileWhenNoData =
48-
FileBaseSinkOptions.CREATE_EMPTY_FILE_WHEN_NO_DATA.defaultValue();
48+
BaseSinkConfig.CREATE_EMPTY_FILE_WHEN_NO_DATA.defaultValue();
4949
protected FileFormat fileFormat;
50-
protected String filenameExtension = FileBaseSinkOptions.FILENAME_EXTENSION.defaultValue();
50+
protected String filenameExtension = BaseSinkConfig.FILENAME_EXTENSION.defaultValue();
5151
protected DateUtils.Formatter dateFormat = DateUtils.Formatter.YYYY_MM_DD;
5252
protected DateTimeUtils.Formatter datetimeFormat = DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS;
5353
protected TimeUtils.Formatter timeFormat = TimeUtils.Formatter.HH_MM_SS;
5454
protected Boolean enableHeaderWriter = false;
5555

5656
public BaseFileSinkConfig(@NonNull Config config) {
57-
if (config.hasPath(FileBaseSinkOptions.COMPRESS_CODEC.key())) {
58-
String compressCodec = config.getString(FileBaseSinkOptions.COMPRESS_CODEC.key());
57+
if (config.hasPath(BaseSinkConfig.COMPRESS_CODEC.key())) {
58+
String compressCodec = config.getString(BaseSinkConfig.COMPRESS_CODEC.key());
5959
this.compressFormat = CompressFormat.valueOf(compressCodec.toUpperCase());
6060
}
61-
if (config.hasPath(FileBaseSinkOptions.BATCH_SIZE.key())) {
62-
this.batchSize = config.getInt(FileBaseSinkOptions.BATCH_SIZE.key());
61+
if (config.hasPath(BaseSinkConfig.BATCH_SIZE.key())) {
62+
this.batchSize = config.getInt(BaseSinkConfig.BATCH_SIZE.key());
6363
}
64-
if (config.hasPath(FileBaseSinkOptions.FIELD_DELIMITER.key())
65-
&& StringUtils.isNotEmpty(
66-
config.getString(FileBaseSinkOptions.FIELD_DELIMITER.key()))) {
67-
this.fieldDelimiter = config.getString(FileBaseSinkOptions.FIELD_DELIMITER.key());
64+
if (config.hasPath(BaseSinkConfig.FIELD_DELIMITER.key())
65+
&& StringUtils.isNotEmpty(config.getString(BaseSinkConfig.FIELD_DELIMITER.key()))) {
66+
this.fieldDelimiter = config.getString(BaseSinkConfig.FIELD_DELIMITER.key());
6867
}
6968

70-
if (config.hasPath(FileBaseSinkOptions.ROW_DELIMITER.key())) {
71-
this.rowDelimiter = config.getString(FileBaseSinkOptions.ROW_DELIMITER.key());
69+
if (config.hasPath(BaseSinkConfig.ROW_DELIMITER.key())) {
70+
this.rowDelimiter = config.getString(BaseSinkConfig.ROW_DELIMITER.key());
7271
}
7372

74-
if (config.hasPath(FileBaseSinkOptions.FILE_PATH.key())
75-
&& !StringUtils.isBlank(config.getString(FileBaseSinkOptions.FILE_PATH.key()))) {
76-
this.path = config.getString(FileBaseSinkOptions.FILE_PATH.key());
73+
if (config.hasPath(BaseSinkConfig.FILE_PATH.key())
74+
&& !StringUtils.isBlank(config.getString(BaseSinkConfig.FILE_PATH.key()))) {
75+
this.path = config.getString(BaseSinkConfig.FILE_PATH.key());
7776
}
7877
checkNotNull(path);
7978

8079
if (path.equals(File.separator)) {
8180
this.path = "";
8281
}
8382

84-
if (config.hasPath(FileBaseSinkOptions.FILE_NAME_EXPRESSION.key())
83+
if (config.hasPath(BaseSinkConfig.FILE_NAME_EXPRESSION.key())
8584
&& !StringUtils.isBlank(
86-
config.getString(FileBaseSinkOptions.FILE_NAME_EXPRESSION.key()))) {
87-
this.fileNameExpression =
88-
config.getString(FileBaseSinkOptions.FILE_NAME_EXPRESSION.key());
85+
config.getString(BaseSinkConfig.FILE_NAME_EXPRESSION.key()))) {
86+
this.fileNameExpression = config.getString(BaseSinkConfig.FILE_NAME_EXPRESSION.key());
8987
}
9088

91-
if (config.hasPath(FileBaseSinkOptions.SINGLE_FILE_MODE.key())) {
92-
this.singleFileMode = config.getBoolean(FileBaseSinkOptions.SINGLE_FILE_MODE.key());
89+
if (config.hasPath(BaseSinkConfig.SINGLE_FILE_MODE.key())) {
90+
this.singleFileMode = config.getBoolean(BaseSinkConfig.SINGLE_FILE_MODE.key());
9391
}
9492

95-
if (config.hasPath(FileBaseSinkOptions.CREATE_EMPTY_FILE_WHEN_NO_DATA.key())) {
93+
if (config.hasPath(BaseSinkConfig.CREATE_EMPTY_FILE_WHEN_NO_DATA.key())) {
9694
this.createEmptyFileWhenNoData =
97-
config.getBoolean(FileBaseSinkOptions.CREATE_EMPTY_FILE_WHEN_NO_DATA.key());
95+
config.getBoolean(BaseSinkConfig.CREATE_EMPTY_FILE_WHEN_NO_DATA.key());
9896
}
9997

100-
if (config.hasPath(FileBaseSinkOptions.FILE_FORMAT_TYPE.key())
101-
&& !StringUtils.isBlank(
102-
config.getString(FileBaseSinkOptions.FILE_FORMAT_TYPE.key()))) {
98+
if (config.hasPath(BaseSinkConfig.FILE_FORMAT_TYPE.key())
99+
&& !StringUtils.isBlank(config.getString(BaseSinkConfig.FILE_FORMAT_TYPE.key()))) {
103100
this.fileFormat =
104101
FileFormat.valueOf(
105-
config.getString(FileBaseSinkOptions.FILE_FORMAT_TYPE.key())
102+
config.getString(BaseSinkConfig.FILE_FORMAT_TYPE.key())
106103
.toUpperCase(Locale.ROOT));
107104
} else {
108105
// fall back to the default
109-
this.fileFormat = FileBaseSinkOptions.FILE_FORMAT_TYPE.defaultValue();
106+
this.fileFormat = BaseSinkConfig.FILE_FORMAT_TYPE.defaultValue();
110107
}
111108

112-
if (config.hasPath(FileBaseSinkOptions.FILENAME_EXTENSION.key())
109+
if (config.hasPath(BaseSinkConfig.FILENAME_EXTENSION.key())
113110
&& !StringUtils.isBlank(
114-
config.getString(FileBaseSinkOptions.FILENAME_EXTENSION.key()))) {
115-
this.filenameExtension = config.getString(FileBaseSinkOptions.FILENAME_EXTENSION.key());
111+
config.getString(BaseSinkConfig.FILENAME_EXTENSION.key()))) {
112+
this.filenameExtension = config.getString(BaseSinkConfig.FILENAME_EXTENSION.key());
116113
}
117114

118-
if (config.hasPath(FileBaseSinkOptions.DATE_FORMAT.key())) {
115+
if (config.hasPath(BaseSinkConfig.DATE_FORMAT.key())) {
119116
dateFormat =
120-
DateUtils.Formatter.parse(
121-
config.getString(FileBaseSinkOptions.DATE_FORMAT.key()));
117+
DateUtils.Formatter.parse(config.getString(BaseSinkConfig.DATE_FORMAT.key()));
122118
}
123119

124-
if (config.hasPath(FileBaseSinkOptions.DATETIME_FORMAT.key())) {
120+
if (config.hasPath(BaseSinkConfig.DATETIME_FORMAT.key())) {
125121
datetimeFormat =
126122
DateTimeUtils.Formatter.parse(
127-
config.getString(FileBaseSinkOptions.DATETIME_FORMAT.key()));
123+
config.getString(BaseSinkConfig.DATETIME_FORMAT.key()));
128124
}
129125

130-
if (config.hasPath(FileBaseSinkOptions.TIME_FORMAT.key())) {
126+
if (config.hasPath(BaseSinkConfig.TIME_FORMAT.key())) {
131127
timeFormat =
132-
TimeUtils.Formatter.parse(
133-
config.getString(FileBaseSinkOptions.TIME_FORMAT.key()));
128+
TimeUtils.Formatter.parse(config.getString(BaseSinkConfig.TIME_FORMAT.key()));
134129
}
135130

136-
if (config.hasPath(FileBaseSinkOptions.ENABLE_HEADER_WRITE.key())) {
137-
enableHeaderWriter = config.getBoolean(FileBaseSinkOptions.ENABLE_HEADER_WRITE.key());
131+
if (config.hasPath(BaseSinkConfig.ENABLE_HEADER_WRITE.key())) {
132+
enableHeaderWriter = config.getBoolean(BaseSinkConfig.ENABLE_HEADER_WRITE.key());
138133
}
139134
}
140135

Diff for: seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSourceConfig.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public abstract class BaseFileSourceConfig implements Serializable {
5959

6060
public BaseFileSourceConfig(ReadonlyConfig readonlyConfig) {
6161
this.baseFileSourceConfig = readonlyConfig;
62-
this.fileFormat = readonlyConfig.get(FileBaseSourceOptions.FILE_FORMAT_TYPE);
62+
this.fileFormat = readonlyConfig.get(BaseSourceConfigOptions.FILE_FORMAT_TYPE);
6363
this.readStrategy = ReadStrategyFactory.of(readonlyConfig, getHadoopConfig());
6464
this.filePaths = parseFilePaths(readonlyConfig);
6565

@@ -69,7 +69,7 @@ public BaseFileSourceConfig(ReadonlyConfig readonlyConfig) {
6969
private List<String> parseFilePaths(ReadonlyConfig readonlyConfig) {
7070
String rootPath = null;
7171
try {
72-
rootPath = readonlyConfig.get(FileBaseSourceOptions.FILE_PATH);
72+
rootPath = readonlyConfig.get(BaseSourceConfigOptions.FILE_PATH);
7373
return readStrategy.getFileNamesByPath(rootPath);
7474
} catch (Exception ex) {
7575
String errorMsg = String.format("Get file list from this path [%s] failed", rootPath);
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
import static org.apache.seatunnel.api.sink.DataSaveMode.DROP_DATA;
3636
import static org.apache.seatunnel.api.sink.DataSaveMode.ERROR_WHEN_DATA_EXISTS;
3737

38-
public class FileBaseSinkOptions extends FileBaseOptions {
38+
public class BaseSinkConfig {
3939
public static final String SEATUNNEL = "seatunnel";
4040
public static final String NON_PARTITION = "NON_PARTITION";
4141
public static final String TRANSACTION_ID_SPLIT = "_";
@@ -214,6 +214,13 @@ public class FileBaseSinkOptions extends FileBaseOptions {
214214
.defaultValue(FileFormat.CSV)
215215
.withDescription("File format type, e.g. csv, orc, parquet, text");
216216

217+
public static final Option<String> FILENAME_EXTENSION =
218+
Options.key("filename_extension")
219+
.stringType()
220+
.noDefaultValue()
221+
.withDescription(
222+
"Override the default file name extensions with custom file name extensions. E.g. `.xml`, `.json`, `dat`, `.customtype`");
223+
217224
public static final Option<String> ENCODING =
218225
Options.key("encoding")
219226
.stringType()

0 commit comments

Comments
 (0)