Skip to content

[improve] update localfile connector config #8765

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 32 commits into from
Mar 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
be91708
[improve] update localfile connector config
Feb 14, 2025
a00c1df
[improve] update localfile connector config
Feb 17, 2025
5f0b016
[improve] update localfile connector config
Feb 19, 2025
5d23141
[improve] update localfile connector config
Feb 25, 2025
d5774f7
[improve] code format
Feb 25, 2025
39e2f69
[improve] code format
Feb 25, 2025
884724e
[improve] adjust parameter relationship
Feb 25, 2025
3fd73b1
[improve] adjust parameter relationship
Feb 26, 2025
694e11d
[improve] adjust parameter relationship
Feb 26, 2025
5e553ef
Merge remote-tracking branch 'upstream/dev' into feature-improve-loca…
Feb 26, 2025
5f3076d
Merge branch 'dev' into feature-improve-localfile-config
Feb 26, 2025
437f700
[improve] adjust public parameters
Feb 26, 2025
22f20df
[improve] adjust public parameters
Feb 26, 2025
e73d437
Merge branch 'dev' into feature-improve-localfile-config
misi1987107 Feb 26, 2025
d5cefd1
[improve] adjust whitelist and public params
Feb 26, 2025
e33c6bd
[improve] Parameter class modification
misi1987107 Feb 26, 2025
3460291
[improve] adjust obsfile parameters
Feb 27, 2025
b3b593c
[improve] adjust Ossfile\Cosfile parameters ,modify ConnectorOptionCh…
Mar 3, 2025
bb95982
[improve] adjust the parameter optionRule
Mar 5, 2025
3e5a5ff
[improve] remove duplicate param
Mar 6, 2025
d63a9ba
[improve] remove duplicate params
Mar 6, 2025
b3cc46b
[improve] remove duplicate params
Mar 6, 2025
8b7f54e
[improve] remove duplicate params
Mar 6, 2025
ef9d873
[improve] add exclusive params
Mar 7, 2025
5579198
[improve] code format
Mar 7, 2025
3b24695
[improve] remove duplicate params
Mar 7, 2025
4cdd287
[improve] remove duplicate params
Mar 7, 2025
3108f1d
Merge branch 'dev' into feature-improve-localfile-config
misi1987107 Mar 10, 2025
474bf08
[improve] remove duplicate params
Mar 10, 2025
7947ed2
[improve] fix with https://github.com/apache/seatunnel/pull/8885
Mar 10, 2025
aa0243c
Merge branch 'dev' into feature-improve-localfile-config
misi1987107 Mar 18, 2025
22e3166
Merge branch 'dev' into feature-improve-localfile-config
misi1987107 Mar 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,18 @@ public void checkConnectorOptionExist() {
.getNameAsString()
.equals(
"IncrementalSource")
|| extendedType
.getNameAsString()
.equals(
"BaseMultipleTableFileSink")
|| extendedType
.getNameAsString()
.equals(
"BaseFileSource")
|| extendedType
.getNameAsString()
.equals(
"BaseFileSink")
|| extendedType
.getNameAsString()
.equals(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSinkOptions;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
Expand All @@ -51,25 +51,26 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
if (Objects.isNull(hadoopConf)) {
hadoopConf = new HadoopConf(pluginConfig.getString(FS_DEFAULT_NAME_KEY));
}
if (pluginConfig.hasPath(BaseSinkConfig.HDFS_SITE_PATH.key())) {
hadoopConf.setHdfsSitePath(pluginConfig.getString(BaseSinkConfig.HDFS_SITE_PATH.key()));
if (pluginConfig.hasPath(FileBaseSinkOptions.HDFS_SITE_PATH.key())) {
hadoopConf.setHdfsSitePath(
pluginConfig.getString(FileBaseSinkOptions.HDFS_SITE_PATH.key()));
}

if (pluginConfig.hasPath(BaseSinkConfig.REMOTE_USER.key())) {
hadoopConf.setRemoteUser(pluginConfig.getString(BaseSinkConfig.REMOTE_USER.key()));
if (pluginConfig.hasPath(FileBaseSinkOptions.REMOTE_USER.key())) {
hadoopConf.setRemoteUser(pluginConfig.getString(FileBaseSinkOptions.REMOTE_USER.key()));
}

if (pluginConfig.hasPath(BaseSinkConfig.KRB5_PATH.key())) {
hadoopConf.setKrb5Path(pluginConfig.getString(BaseSinkConfig.KRB5_PATH.key()));
if (pluginConfig.hasPath(FileBaseSinkOptions.KRB5_PATH.key())) {
hadoopConf.setKrb5Path(pluginConfig.getString(FileBaseSinkOptions.KRB5_PATH.key()));
}

if (pluginConfig.hasPath(BaseSinkConfig.KERBEROS_PRINCIPAL.key())) {
if (pluginConfig.hasPath(FileBaseSinkOptions.KERBEROS_PRINCIPAL.key())) {
hadoopConf.setKerberosPrincipal(
pluginConfig.getString(BaseSinkConfig.KERBEROS_PRINCIPAL.key()));
pluginConfig.getString(FileBaseSinkOptions.KERBEROS_PRINCIPAL.key()));
}
if (pluginConfig.hasPath(BaseSinkConfig.KERBEROS_KEYTAB_PATH.key())) {
if (pluginConfig.hasPath(FileBaseSinkOptions.KERBEROS_KEYTAB_PATH.key())) {
hadoopConf.setKerberosKeytabPath(
pluginConfig.getString(BaseSinkConfig.KERBEROS_KEYTAB_PATH.key()));
pluginConfig.getString(FileBaseSinkOptions.KERBEROS_KEYTAB_PATH.key()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;

import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;

public class HdfsSourceConfigOptions extends BaseSourceConfigOptions {
public class HdfsSourceConfigOptions extends FileBaseSourceOptions {
public static final Option<String> DEFAULT_FS =
Options.key(FS_DEFAULT_NAME_KEY)
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,99 +37,104 @@
@Data
public class BaseFileSinkConfig implements DelimiterConfig, Serializable {
private static final long serialVersionUID = 1L;
protected CompressFormat compressFormat = BaseSinkConfig.COMPRESS_CODEC.defaultValue();
protected String fieldDelimiter = BaseSinkConfig.FIELD_DELIMITER.defaultValue();
protected String rowDelimiter = BaseSinkConfig.ROW_DELIMITER.defaultValue();
protected int batchSize = BaseSinkConfig.BATCH_SIZE.defaultValue();
protected CompressFormat compressFormat = FileBaseSinkOptions.COMPRESS_CODEC.defaultValue();
protected String fieldDelimiter = FileBaseSinkOptions.FIELD_DELIMITER.defaultValue();
protected String rowDelimiter = FileBaseSinkOptions.ROW_DELIMITER.defaultValue();
protected int batchSize = FileBaseSinkOptions.BATCH_SIZE.defaultValue();
protected String path;
protected String fileNameExpression = BaseSinkConfig.FILE_NAME_EXPRESSION.defaultValue();
protected boolean singleFileMode = BaseSinkConfig.SINGLE_FILE_MODE.defaultValue();
protected String fileNameExpression = FileBaseSinkOptions.FILE_NAME_EXPRESSION.defaultValue();
protected boolean singleFileMode = FileBaseSinkOptions.SINGLE_FILE_MODE.defaultValue();
protected boolean createEmptyFileWhenNoData =
BaseSinkConfig.CREATE_EMPTY_FILE_WHEN_NO_DATA.defaultValue();
FileBaseSinkOptions.CREATE_EMPTY_FILE_WHEN_NO_DATA.defaultValue();
protected FileFormat fileFormat;
protected String filenameExtension = BaseSinkConfig.FILENAME_EXTENSION.defaultValue();
protected String filenameExtension = FileBaseSinkOptions.FILENAME_EXTENSION.defaultValue();
protected DateUtils.Formatter dateFormat = DateUtils.Formatter.YYYY_MM_DD;
protected DateTimeUtils.Formatter datetimeFormat = DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS;
protected TimeUtils.Formatter timeFormat = TimeUtils.Formatter.HH_MM_SS;
protected Boolean enableHeaderWriter = false;

public BaseFileSinkConfig(@NonNull Config config) {
if (config.hasPath(BaseSinkConfig.COMPRESS_CODEC.key())) {
String compressCodec = config.getString(BaseSinkConfig.COMPRESS_CODEC.key());
if (config.hasPath(FileBaseSinkOptions.COMPRESS_CODEC.key())) {
String compressCodec = config.getString(FileBaseSinkOptions.COMPRESS_CODEC.key());
this.compressFormat = CompressFormat.valueOf(compressCodec.toUpperCase());
}
if (config.hasPath(BaseSinkConfig.BATCH_SIZE.key())) {
this.batchSize = config.getInt(BaseSinkConfig.BATCH_SIZE.key());
if (config.hasPath(FileBaseSinkOptions.BATCH_SIZE.key())) {
this.batchSize = config.getInt(FileBaseSinkOptions.BATCH_SIZE.key());
}
if (config.hasPath(BaseSinkConfig.FIELD_DELIMITER.key())
&& StringUtils.isNotEmpty(config.getString(BaseSinkConfig.FIELD_DELIMITER.key()))) {
this.fieldDelimiter = config.getString(BaseSinkConfig.FIELD_DELIMITER.key());
if (config.hasPath(FileBaseSinkOptions.FIELD_DELIMITER.key())
&& StringUtils.isNotEmpty(
config.getString(FileBaseSinkOptions.FIELD_DELIMITER.key()))) {
this.fieldDelimiter = config.getString(FileBaseSinkOptions.FIELD_DELIMITER.key());
}

if (config.hasPath(BaseSinkConfig.ROW_DELIMITER.key())) {
this.rowDelimiter = config.getString(BaseSinkConfig.ROW_DELIMITER.key());
if (config.hasPath(FileBaseSinkOptions.ROW_DELIMITER.key())) {
this.rowDelimiter = config.getString(FileBaseSinkOptions.ROW_DELIMITER.key());
}

if (config.hasPath(BaseSinkConfig.FILE_PATH.key())
&& !StringUtils.isBlank(config.getString(BaseSinkConfig.FILE_PATH.key()))) {
this.path = config.getString(BaseSinkConfig.FILE_PATH.key());
if (config.hasPath(FileBaseSinkOptions.FILE_PATH.key())
&& !StringUtils.isBlank(config.getString(FileBaseSinkOptions.FILE_PATH.key()))) {
this.path = config.getString(FileBaseSinkOptions.FILE_PATH.key());
}
checkNotNull(path);

if (path.equals(File.separator)) {
this.path = "";
}

if (config.hasPath(BaseSinkConfig.FILE_NAME_EXPRESSION.key())
if (config.hasPath(FileBaseSinkOptions.FILE_NAME_EXPRESSION.key())
&& !StringUtils.isBlank(
config.getString(BaseSinkConfig.FILE_NAME_EXPRESSION.key()))) {
this.fileNameExpression = config.getString(BaseSinkConfig.FILE_NAME_EXPRESSION.key());
config.getString(FileBaseSinkOptions.FILE_NAME_EXPRESSION.key()))) {
this.fileNameExpression =
config.getString(FileBaseSinkOptions.FILE_NAME_EXPRESSION.key());
}

if (config.hasPath(BaseSinkConfig.SINGLE_FILE_MODE.key())) {
this.singleFileMode = config.getBoolean(BaseSinkConfig.SINGLE_FILE_MODE.key());
if (config.hasPath(FileBaseSinkOptions.SINGLE_FILE_MODE.key())) {
this.singleFileMode = config.getBoolean(FileBaseSinkOptions.SINGLE_FILE_MODE.key());
}

if (config.hasPath(BaseSinkConfig.CREATE_EMPTY_FILE_WHEN_NO_DATA.key())) {
if (config.hasPath(FileBaseSinkOptions.CREATE_EMPTY_FILE_WHEN_NO_DATA.key())) {
this.createEmptyFileWhenNoData =
config.getBoolean(BaseSinkConfig.CREATE_EMPTY_FILE_WHEN_NO_DATA.key());
config.getBoolean(FileBaseSinkOptions.CREATE_EMPTY_FILE_WHEN_NO_DATA.key());
}

if (config.hasPath(BaseSinkConfig.FILE_FORMAT_TYPE.key())
&& !StringUtils.isBlank(config.getString(BaseSinkConfig.FILE_FORMAT_TYPE.key()))) {
if (config.hasPath(FileBaseSinkOptions.FILE_FORMAT_TYPE.key())
&& !StringUtils.isBlank(
config.getString(FileBaseSinkOptions.FILE_FORMAT_TYPE.key()))) {
this.fileFormat =
FileFormat.valueOf(
config.getString(BaseSinkConfig.FILE_FORMAT_TYPE.key())
config.getString(FileBaseSinkOptions.FILE_FORMAT_TYPE.key())
.toUpperCase(Locale.ROOT));
} else {
// fall back to the default
this.fileFormat = BaseSinkConfig.FILE_FORMAT_TYPE.defaultValue();
this.fileFormat = FileBaseSinkOptions.FILE_FORMAT_TYPE.defaultValue();
}

if (config.hasPath(BaseSinkConfig.FILENAME_EXTENSION.key())
if (config.hasPath(FileBaseSinkOptions.FILENAME_EXTENSION.key())
&& !StringUtils.isBlank(
config.getString(BaseSinkConfig.FILENAME_EXTENSION.key()))) {
this.filenameExtension = config.getString(BaseSinkConfig.FILENAME_EXTENSION.key());
config.getString(FileBaseSinkOptions.FILENAME_EXTENSION.key()))) {
this.filenameExtension = config.getString(FileBaseSinkOptions.FILENAME_EXTENSION.key());
}

if (config.hasPath(BaseSinkConfig.DATE_FORMAT.key())) {
if (config.hasPath(FileBaseSinkOptions.DATE_FORMAT.key())) {
dateFormat =
DateUtils.Formatter.parse(config.getString(BaseSinkConfig.DATE_FORMAT.key()));
DateUtils.Formatter.parse(
config.getString(FileBaseSinkOptions.DATE_FORMAT.key()));
}

if (config.hasPath(BaseSinkConfig.DATETIME_FORMAT.key())) {
if (config.hasPath(FileBaseSinkOptions.DATETIME_FORMAT.key())) {
datetimeFormat =
DateTimeUtils.Formatter.parse(
config.getString(BaseSinkConfig.DATETIME_FORMAT.key()));
config.getString(FileBaseSinkOptions.DATETIME_FORMAT.key()));
}

if (config.hasPath(BaseSinkConfig.TIME_FORMAT.key())) {
if (config.hasPath(FileBaseSinkOptions.TIME_FORMAT.key())) {
timeFormat =
TimeUtils.Formatter.parse(config.getString(BaseSinkConfig.TIME_FORMAT.key()));
TimeUtils.Formatter.parse(
config.getString(FileBaseSinkOptions.TIME_FORMAT.key()));
}

if (config.hasPath(BaseSinkConfig.ENABLE_HEADER_WRITE.key())) {
enableHeaderWriter = config.getBoolean(BaseSinkConfig.ENABLE_HEADER_WRITE.key());
if (config.hasPath(FileBaseSinkOptions.ENABLE_HEADER_WRITE.key())) {
enableHeaderWriter = config.getBoolean(FileBaseSinkOptions.ENABLE_HEADER_WRITE.key());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public abstract class BaseFileSourceConfig implements Serializable {

public BaseFileSourceConfig(ReadonlyConfig readonlyConfig) {
this.baseFileSourceConfig = readonlyConfig;
this.fileFormat = readonlyConfig.get(BaseSourceConfigOptions.FILE_FORMAT_TYPE);
this.fileFormat = readonlyConfig.get(FileBaseSourceOptions.FILE_FORMAT_TYPE);
this.readStrategy = ReadStrategyFactory.of(readonlyConfig, getHadoopConfig());
this.filePaths = parseFilePaths(readonlyConfig);

Expand All @@ -69,7 +69,7 @@ public BaseFileSourceConfig(ReadonlyConfig readonlyConfig) {
private List<String> parseFilePaths(ReadonlyConfig readonlyConfig) {
String rootPath = null;
try {
rootPath = readonlyConfig.get(BaseSourceConfigOptions.FILE_PATH);
rootPath = readonlyConfig.get(FileBaseSourceOptions.FILE_PATH);
return readStrategy.getFileNamesByPath(rootPath);
} catch (Exception ex) {
String errorMsg = String.format("Get file list from this path [%s] failed", rootPath);
Expand Down
Loading
Loading