Skip to content

Commit def369a

Browse files
authored
[improve] update localfile connector config (#8765)
Co-authored-by: zeng <gientech.com>
1 parent bde2350 commit def369a

File tree

98 files changed

+1452
-1248
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

98 files changed

+1452
-1248
lines changed

Diff for: seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java

+12
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,18 @@ public void checkConnectorOptionExist() {
121121
.getNameAsString()
122122
.equals(
123123
"IncrementalSource")
124+
|| extendedType
125+
.getNameAsString()
126+
.equals(
127+
"BaseMultipleTableFileSink")
128+
|| extendedType
129+
.getNameAsString()
130+
.equals(
131+
"BaseFileSource")
132+
|| extendedType
133+
.getNameAsString()
134+
.equals(
135+
"BaseFileSink")
124136
|| extendedType
125137
.getNameAsString()
126138
.equals(

Diff for: seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/BaseHdfsFileSink.java

+12-11
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import org.apache.seatunnel.common.config.CheckConfigUtil;
2525
import org.apache.seatunnel.common.config.CheckResult;
2626
import org.apache.seatunnel.common.constants.PluginType;
27-
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig;
27+
import org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSinkOptions;
2828
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
2929
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
3030
import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
@@ -51,25 +51,26 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
5151
if (Objects.isNull(hadoopConf)) {
5252
hadoopConf = new HadoopConf(pluginConfig.getString(FS_DEFAULT_NAME_KEY));
5353
}
54-
if (pluginConfig.hasPath(BaseSinkConfig.HDFS_SITE_PATH.key())) {
55-
hadoopConf.setHdfsSitePath(pluginConfig.getString(BaseSinkConfig.HDFS_SITE_PATH.key()));
54+
if (pluginConfig.hasPath(FileBaseSinkOptions.HDFS_SITE_PATH.key())) {
55+
hadoopConf.setHdfsSitePath(
56+
pluginConfig.getString(FileBaseSinkOptions.HDFS_SITE_PATH.key()));
5657
}
5758

58-
if (pluginConfig.hasPath(BaseSinkConfig.REMOTE_USER.key())) {
59-
hadoopConf.setRemoteUser(pluginConfig.getString(BaseSinkConfig.REMOTE_USER.key()));
59+
if (pluginConfig.hasPath(FileBaseSinkOptions.REMOTE_USER.key())) {
60+
hadoopConf.setRemoteUser(pluginConfig.getString(FileBaseSinkOptions.REMOTE_USER.key()));
6061
}
6162

62-
if (pluginConfig.hasPath(BaseSinkConfig.KRB5_PATH.key())) {
63-
hadoopConf.setKrb5Path(pluginConfig.getString(BaseSinkConfig.KRB5_PATH.key()));
63+
if (pluginConfig.hasPath(FileBaseSinkOptions.KRB5_PATH.key())) {
64+
hadoopConf.setKrb5Path(pluginConfig.getString(FileBaseSinkOptions.KRB5_PATH.key()));
6465
}
6566

66-
if (pluginConfig.hasPath(BaseSinkConfig.KERBEROS_PRINCIPAL.key())) {
67+
if (pluginConfig.hasPath(FileBaseSinkOptions.KERBEROS_PRINCIPAL.key())) {
6768
hadoopConf.setKerberosPrincipal(
68-
pluginConfig.getString(BaseSinkConfig.KERBEROS_PRINCIPAL.key()));
69+
pluginConfig.getString(FileBaseSinkOptions.KERBEROS_PRINCIPAL.key()));
6970
}
70-
if (pluginConfig.hasPath(BaseSinkConfig.KERBEROS_KEYTAB_PATH.key())) {
71+
if (pluginConfig.hasPath(FileBaseSinkOptions.KERBEROS_KEYTAB_PATH.key())) {
7172
hadoopConf.setKerberosKeytabPath(
72-
pluginConfig.getString(BaseSinkConfig.KERBEROS_KEYTAB_PATH.key()));
73+
pluginConfig.getString(FileBaseSinkOptions.KERBEROS_KEYTAB_PATH.key()));
7374
}
7475
}
7576
}

Diff for: seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/config/HdfsSourceConfigOptions.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@
1919

2020
import org.apache.seatunnel.api.configuration.Option;
2121
import org.apache.seatunnel.api.configuration.Options;
22-
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
22+
import org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
2323

2424
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
2525

26-
public class HdfsSourceConfigOptions extends BaseSourceConfigOptions {
26+
public class HdfsSourceConfigOptions extends FileBaseSourceOptions {
2727
public static final Option<String> DEFAULT_FS =
2828
Options.key(FS_DEFAULT_NAME_KEY)
2929
.stringType()

Diff for: seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java

+47-42
Original file line numberDiff line numberDiff line change
@@ -37,99 +37,104 @@
3737
@Data
3838
public class BaseFileSinkConfig implements DelimiterConfig, Serializable {
3939
private static final long serialVersionUID = 1L;
40-
protected CompressFormat compressFormat = BaseSinkConfig.COMPRESS_CODEC.defaultValue();
41-
protected String fieldDelimiter = BaseSinkConfig.FIELD_DELIMITER.defaultValue();
42-
protected String rowDelimiter = BaseSinkConfig.ROW_DELIMITER.defaultValue();
43-
protected int batchSize = BaseSinkConfig.BATCH_SIZE.defaultValue();
40+
protected CompressFormat compressFormat = FileBaseSinkOptions.COMPRESS_CODEC.defaultValue();
41+
protected String fieldDelimiter = FileBaseSinkOptions.FIELD_DELIMITER.defaultValue();
42+
protected String rowDelimiter = FileBaseSinkOptions.ROW_DELIMITER.defaultValue();
43+
protected int batchSize = FileBaseSinkOptions.BATCH_SIZE.defaultValue();
4444
protected String path;
45-
protected String fileNameExpression = BaseSinkConfig.FILE_NAME_EXPRESSION.defaultValue();
46-
protected boolean singleFileMode = BaseSinkConfig.SINGLE_FILE_MODE.defaultValue();
45+
protected String fileNameExpression = FileBaseSinkOptions.FILE_NAME_EXPRESSION.defaultValue();
46+
protected boolean singleFileMode = FileBaseSinkOptions.SINGLE_FILE_MODE.defaultValue();
4747
protected boolean createEmptyFileWhenNoData =
48-
BaseSinkConfig.CREATE_EMPTY_FILE_WHEN_NO_DATA.defaultValue();
48+
FileBaseSinkOptions.CREATE_EMPTY_FILE_WHEN_NO_DATA.defaultValue();
4949
protected FileFormat fileFormat;
50-
protected String filenameExtension = BaseSinkConfig.FILENAME_EXTENSION.defaultValue();
50+
protected String filenameExtension = FileBaseSinkOptions.FILENAME_EXTENSION.defaultValue();
5151
protected DateUtils.Formatter dateFormat = DateUtils.Formatter.YYYY_MM_DD;
5252
protected DateTimeUtils.Formatter datetimeFormat = DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS;
5353
protected TimeUtils.Formatter timeFormat = TimeUtils.Formatter.HH_MM_SS;
5454
protected Boolean enableHeaderWriter = false;
5555

5656
public BaseFileSinkConfig(@NonNull Config config) {
57-
if (config.hasPath(BaseSinkConfig.COMPRESS_CODEC.key())) {
58-
String compressCodec = config.getString(BaseSinkConfig.COMPRESS_CODEC.key());
57+
if (config.hasPath(FileBaseSinkOptions.COMPRESS_CODEC.key())) {
58+
String compressCodec = config.getString(FileBaseSinkOptions.COMPRESS_CODEC.key());
5959
this.compressFormat = CompressFormat.valueOf(compressCodec.toUpperCase());
6060
}
61-
if (config.hasPath(BaseSinkConfig.BATCH_SIZE.key())) {
62-
this.batchSize = config.getInt(BaseSinkConfig.BATCH_SIZE.key());
61+
if (config.hasPath(FileBaseSinkOptions.BATCH_SIZE.key())) {
62+
this.batchSize = config.getInt(FileBaseSinkOptions.BATCH_SIZE.key());
6363
}
64-
if (config.hasPath(BaseSinkConfig.FIELD_DELIMITER.key())
65-
&& StringUtils.isNotEmpty(config.getString(BaseSinkConfig.FIELD_DELIMITER.key()))) {
66-
this.fieldDelimiter = config.getString(BaseSinkConfig.FIELD_DELIMITER.key());
64+
if (config.hasPath(FileBaseSinkOptions.FIELD_DELIMITER.key())
65+
&& StringUtils.isNotEmpty(
66+
config.getString(FileBaseSinkOptions.FIELD_DELIMITER.key()))) {
67+
this.fieldDelimiter = config.getString(FileBaseSinkOptions.FIELD_DELIMITER.key());
6768
}
6869

69-
if (config.hasPath(BaseSinkConfig.ROW_DELIMITER.key())) {
70-
this.rowDelimiter = config.getString(BaseSinkConfig.ROW_DELIMITER.key());
70+
if (config.hasPath(FileBaseSinkOptions.ROW_DELIMITER.key())) {
71+
this.rowDelimiter = config.getString(FileBaseSinkOptions.ROW_DELIMITER.key());
7172
}
7273

73-
if (config.hasPath(BaseSinkConfig.FILE_PATH.key())
74-
&& !StringUtils.isBlank(config.getString(BaseSinkConfig.FILE_PATH.key()))) {
75-
this.path = config.getString(BaseSinkConfig.FILE_PATH.key());
74+
if (config.hasPath(FileBaseSinkOptions.FILE_PATH.key())
75+
&& !StringUtils.isBlank(config.getString(FileBaseSinkOptions.FILE_PATH.key()))) {
76+
this.path = config.getString(FileBaseSinkOptions.FILE_PATH.key());
7677
}
7778
checkNotNull(path);
7879

7980
if (path.equals(File.separator)) {
8081
this.path = "";
8182
}
8283

83-
if (config.hasPath(BaseSinkConfig.FILE_NAME_EXPRESSION.key())
84+
if (config.hasPath(FileBaseSinkOptions.FILE_NAME_EXPRESSION.key())
8485
&& !StringUtils.isBlank(
85-
config.getString(BaseSinkConfig.FILE_NAME_EXPRESSION.key()))) {
86-
this.fileNameExpression = config.getString(BaseSinkConfig.FILE_NAME_EXPRESSION.key());
86+
config.getString(FileBaseSinkOptions.FILE_NAME_EXPRESSION.key()))) {
87+
this.fileNameExpression =
88+
config.getString(FileBaseSinkOptions.FILE_NAME_EXPRESSION.key());
8789
}
8890

89-
if (config.hasPath(BaseSinkConfig.SINGLE_FILE_MODE.key())) {
90-
this.singleFileMode = config.getBoolean(BaseSinkConfig.SINGLE_FILE_MODE.key());
91+
if (config.hasPath(FileBaseSinkOptions.SINGLE_FILE_MODE.key())) {
92+
this.singleFileMode = config.getBoolean(FileBaseSinkOptions.SINGLE_FILE_MODE.key());
9193
}
9294

93-
if (config.hasPath(BaseSinkConfig.CREATE_EMPTY_FILE_WHEN_NO_DATA.key())) {
95+
if (config.hasPath(FileBaseSinkOptions.CREATE_EMPTY_FILE_WHEN_NO_DATA.key())) {
9496
this.createEmptyFileWhenNoData =
95-
config.getBoolean(BaseSinkConfig.CREATE_EMPTY_FILE_WHEN_NO_DATA.key());
97+
config.getBoolean(FileBaseSinkOptions.CREATE_EMPTY_FILE_WHEN_NO_DATA.key());
9698
}
9799

98-
if (config.hasPath(BaseSinkConfig.FILE_FORMAT_TYPE.key())
99-
&& !StringUtils.isBlank(config.getString(BaseSinkConfig.FILE_FORMAT_TYPE.key()))) {
100+
if (config.hasPath(FileBaseSinkOptions.FILE_FORMAT_TYPE.key())
101+
&& !StringUtils.isBlank(
102+
config.getString(FileBaseSinkOptions.FILE_FORMAT_TYPE.key()))) {
100103
this.fileFormat =
101104
FileFormat.valueOf(
102-
config.getString(BaseSinkConfig.FILE_FORMAT_TYPE.key())
105+
config.getString(FileBaseSinkOptions.FILE_FORMAT_TYPE.key())
103106
.toUpperCase(Locale.ROOT));
104107
} else {
105108
// fall back to the default
106-
this.fileFormat = BaseSinkConfig.FILE_FORMAT_TYPE.defaultValue();
109+
this.fileFormat = FileBaseSinkOptions.FILE_FORMAT_TYPE.defaultValue();
107110
}
108111

109-
if (config.hasPath(BaseSinkConfig.FILENAME_EXTENSION.key())
112+
if (config.hasPath(FileBaseSinkOptions.FILENAME_EXTENSION.key())
110113
&& !StringUtils.isBlank(
111-
config.getString(BaseSinkConfig.FILENAME_EXTENSION.key()))) {
112-
this.filenameExtension = config.getString(BaseSinkConfig.FILENAME_EXTENSION.key());
114+
config.getString(FileBaseSinkOptions.FILENAME_EXTENSION.key()))) {
115+
this.filenameExtension = config.getString(FileBaseSinkOptions.FILENAME_EXTENSION.key());
113116
}
114117

115-
if (config.hasPath(BaseSinkConfig.DATE_FORMAT.key())) {
118+
if (config.hasPath(FileBaseSinkOptions.DATE_FORMAT.key())) {
116119
dateFormat =
117-
DateUtils.Formatter.parse(config.getString(BaseSinkConfig.DATE_FORMAT.key()));
120+
DateUtils.Formatter.parse(
121+
config.getString(FileBaseSinkOptions.DATE_FORMAT.key()));
118122
}
119123

120-
if (config.hasPath(BaseSinkConfig.DATETIME_FORMAT.key())) {
124+
if (config.hasPath(FileBaseSinkOptions.DATETIME_FORMAT.key())) {
121125
datetimeFormat =
122126
DateTimeUtils.Formatter.parse(
123-
config.getString(BaseSinkConfig.DATETIME_FORMAT.key()));
127+
config.getString(FileBaseSinkOptions.DATETIME_FORMAT.key()));
124128
}
125129

126-
if (config.hasPath(BaseSinkConfig.TIME_FORMAT.key())) {
130+
if (config.hasPath(FileBaseSinkOptions.TIME_FORMAT.key())) {
127131
timeFormat =
128-
TimeUtils.Formatter.parse(config.getString(BaseSinkConfig.TIME_FORMAT.key()));
132+
TimeUtils.Formatter.parse(
133+
config.getString(FileBaseSinkOptions.TIME_FORMAT.key()));
129134
}
130135

131-
if (config.hasPath(BaseSinkConfig.ENABLE_HEADER_WRITE.key())) {
132-
enableHeaderWriter = config.getBoolean(BaseSinkConfig.ENABLE_HEADER_WRITE.key());
136+
if (config.hasPath(FileBaseSinkOptions.ENABLE_HEADER_WRITE.key())) {
137+
enableHeaderWriter = config.getBoolean(FileBaseSinkOptions.ENABLE_HEADER_WRITE.key());
133138
}
134139
}
135140

Diff for: seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSourceConfig.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public abstract class BaseFileSourceConfig implements Serializable {
5959

6060
public BaseFileSourceConfig(ReadonlyConfig readonlyConfig) {
6161
this.baseFileSourceConfig = readonlyConfig;
62-
this.fileFormat = readonlyConfig.get(BaseSourceConfigOptions.FILE_FORMAT_TYPE);
62+
this.fileFormat = readonlyConfig.get(FileBaseSourceOptions.FILE_FORMAT_TYPE);
6363
this.readStrategy = ReadStrategyFactory.of(readonlyConfig, getHadoopConfig());
6464
this.filePaths = parseFilePaths(readonlyConfig);
6565

@@ -69,7 +69,7 @@ public BaseFileSourceConfig(ReadonlyConfig readonlyConfig) {
6969
private List<String> parseFilePaths(ReadonlyConfig readonlyConfig) {
7070
String rootPath = null;
7171
try {
72-
rootPath = readonlyConfig.get(BaseSourceConfigOptions.FILE_PATH);
72+
rootPath = readonlyConfig.get(FileBaseSourceOptions.FILE_PATH);
7373
return readStrategy.getFileNamesByPath(rootPath);
7474
} catch (Exception ex) {
7575
String errorMsg = String.format("Get file list from this path [%s] failed", rootPath);

0 commit comments

Comments
 (0)