Skip to content

Revert " [improve] update localfile connector config" #9018

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -121,18 +121,6 @@ public void checkConnectorOptionExist() {
.getNameAsString()
.equals(
"IncrementalSource")
|| extendedType
.getNameAsString()
.equals(
"BaseMultipleTableFileSink")
|| extendedType
.getNameAsString()
.equals(
"BaseFileSource")
|| extendedType
.getNameAsString()
.equals(
"BaseFileSink")
|| extendedType
.getNameAsString()
.equals(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSinkOptions;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
Expand All @@ -51,26 +51,25 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
if (Objects.isNull(hadoopConf)) {
hadoopConf = new HadoopConf(pluginConfig.getString(FS_DEFAULT_NAME_KEY));
}
if (pluginConfig.hasPath(FileBaseSinkOptions.HDFS_SITE_PATH.key())) {
hadoopConf.setHdfsSitePath(
pluginConfig.getString(FileBaseSinkOptions.HDFS_SITE_PATH.key()));
if (pluginConfig.hasPath(BaseSinkConfig.HDFS_SITE_PATH.key())) {
hadoopConf.setHdfsSitePath(pluginConfig.getString(BaseSinkConfig.HDFS_SITE_PATH.key()));
}

if (pluginConfig.hasPath(FileBaseSinkOptions.REMOTE_USER.key())) {
hadoopConf.setRemoteUser(pluginConfig.getString(FileBaseSinkOptions.REMOTE_USER.key()));
if (pluginConfig.hasPath(BaseSinkConfig.REMOTE_USER.key())) {
hadoopConf.setRemoteUser(pluginConfig.getString(BaseSinkConfig.REMOTE_USER.key()));
}

if (pluginConfig.hasPath(FileBaseSinkOptions.KRB5_PATH.key())) {
hadoopConf.setKrb5Path(pluginConfig.getString(FileBaseSinkOptions.KRB5_PATH.key()));
if (pluginConfig.hasPath(BaseSinkConfig.KRB5_PATH.key())) {
hadoopConf.setKrb5Path(pluginConfig.getString(BaseSinkConfig.KRB5_PATH.key()));
}

if (pluginConfig.hasPath(FileBaseSinkOptions.KERBEROS_PRINCIPAL.key())) {
if (pluginConfig.hasPath(BaseSinkConfig.KERBEROS_PRINCIPAL.key())) {
hadoopConf.setKerberosPrincipal(
pluginConfig.getString(FileBaseSinkOptions.KERBEROS_PRINCIPAL.key()));
pluginConfig.getString(BaseSinkConfig.KERBEROS_PRINCIPAL.key()));
}
if (pluginConfig.hasPath(FileBaseSinkOptions.KERBEROS_KEYTAB_PATH.key())) {
if (pluginConfig.hasPath(BaseSinkConfig.KERBEROS_KEYTAB_PATH.key())) {
hadoopConf.setKerberosKeytabPath(
pluginConfig.getString(FileBaseSinkOptions.KERBEROS_KEYTAB_PATH.key()));
pluginConfig.getString(BaseSinkConfig.KERBEROS_KEYTAB_PATH.key()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;

import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;

public class HdfsSourceConfigOptions extends FileBaseSourceOptions {
public class HdfsSourceConfigOptions extends BaseSourceConfigOptions {
public static final Option<String> DEFAULT_FS =
Options.key(FS_DEFAULT_NAME_KEY)
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,104 +37,99 @@
@Data
public class BaseFileSinkConfig implements DelimiterConfig, Serializable {
private static final long serialVersionUID = 1L;
protected CompressFormat compressFormat = FileBaseSinkOptions.COMPRESS_CODEC.defaultValue();
protected String fieldDelimiter = FileBaseSinkOptions.FIELD_DELIMITER.defaultValue();
protected String rowDelimiter = FileBaseSinkOptions.ROW_DELIMITER.defaultValue();
protected int batchSize = FileBaseSinkOptions.BATCH_SIZE.defaultValue();
protected CompressFormat compressFormat = BaseSinkConfig.COMPRESS_CODEC.defaultValue();
protected String fieldDelimiter = BaseSinkConfig.FIELD_DELIMITER.defaultValue();
protected String rowDelimiter = BaseSinkConfig.ROW_DELIMITER.defaultValue();
protected int batchSize = BaseSinkConfig.BATCH_SIZE.defaultValue();
protected String path;
protected String fileNameExpression = FileBaseSinkOptions.FILE_NAME_EXPRESSION.defaultValue();
protected boolean singleFileMode = FileBaseSinkOptions.SINGLE_FILE_MODE.defaultValue();
protected String fileNameExpression = BaseSinkConfig.FILE_NAME_EXPRESSION.defaultValue();
protected boolean singleFileMode = BaseSinkConfig.SINGLE_FILE_MODE.defaultValue();
protected boolean createEmptyFileWhenNoData =
FileBaseSinkOptions.CREATE_EMPTY_FILE_WHEN_NO_DATA.defaultValue();
BaseSinkConfig.CREATE_EMPTY_FILE_WHEN_NO_DATA.defaultValue();
protected FileFormat fileFormat;
protected String filenameExtension = FileBaseSinkOptions.FILENAME_EXTENSION.defaultValue();
protected String filenameExtension = BaseSinkConfig.FILENAME_EXTENSION.defaultValue();
protected DateUtils.Formatter dateFormat = DateUtils.Formatter.YYYY_MM_DD;
protected DateTimeUtils.Formatter datetimeFormat = DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS;
protected TimeUtils.Formatter timeFormat = TimeUtils.Formatter.HH_MM_SS;
protected Boolean enableHeaderWriter = false;

public BaseFileSinkConfig(@NonNull Config config) {
if (config.hasPath(FileBaseSinkOptions.COMPRESS_CODEC.key())) {
String compressCodec = config.getString(FileBaseSinkOptions.COMPRESS_CODEC.key());
if (config.hasPath(BaseSinkConfig.COMPRESS_CODEC.key())) {
String compressCodec = config.getString(BaseSinkConfig.COMPRESS_CODEC.key());
this.compressFormat = CompressFormat.valueOf(compressCodec.toUpperCase());
}
if (config.hasPath(FileBaseSinkOptions.BATCH_SIZE.key())) {
this.batchSize = config.getInt(FileBaseSinkOptions.BATCH_SIZE.key());
if (config.hasPath(BaseSinkConfig.BATCH_SIZE.key())) {
this.batchSize = config.getInt(BaseSinkConfig.BATCH_SIZE.key());
}
if (config.hasPath(FileBaseSinkOptions.FIELD_DELIMITER.key())
&& StringUtils.isNotEmpty(
config.getString(FileBaseSinkOptions.FIELD_DELIMITER.key()))) {
this.fieldDelimiter = config.getString(FileBaseSinkOptions.FIELD_DELIMITER.key());
if (config.hasPath(BaseSinkConfig.FIELD_DELIMITER.key())
&& StringUtils.isNotEmpty(config.getString(BaseSinkConfig.FIELD_DELIMITER.key()))) {
this.fieldDelimiter = config.getString(BaseSinkConfig.FIELD_DELIMITER.key());
}

if (config.hasPath(FileBaseSinkOptions.ROW_DELIMITER.key())) {
this.rowDelimiter = config.getString(FileBaseSinkOptions.ROW_DELIMITER.key());
if (config.hasPath(BaseSinkConfig.ROW_DELIMITER.key())) {
this.rowDelimiter = config.getString(BaseSinkConfig.ROW_DELIMITER.key());
}

if (config.hasPath(FileBaseSinkOptions.FILE_PATH.key())
&& !StringUtils.isBlank(config.getString(FileBaseSinkOptions.FILE_PATH.key()))) {
this.path = config.getString(FileBaseSinkOptions.FILE_PATH.key());
if (config.hasPath(BaseSinkConfig.FILE_PATH.key())
&& !StringUtils.isBlank(config.getString(BaseSinkConfig.FILE_PATH.key()))) {
this.path = config.getString(BaseSinkConfig.FILE_PATH.key());
}
checkNotNull(path);

if (path.equals(File.separator)) {
this.path = "";
}

if (config.hasPath(FileBaseSinkOptions.FILE_NAME_EXPRESSION.key())
if (config.hasPath(BaseSinkConfig.FILE_NAME_EXPRESSION.key())
&& !StringUtils.isBlank(
config.getString(FileBaseSinkOptions.FILE_NAME_EXPRESSION.key()))) {
this.fileNameExpression =
config.getString(FileBaseSinkOptions.FILE_NAME_EXPRESSION.key());
config.getString(BaseSinkConfig.FILE_NAME_EXPRESSION.key()))) {
this.fileNameExpression = config.getString(BaseSinkConfig.FILE_NAME_EXPRESSION.key());
}

if (config.hasPath(FileBaseSinkOptions.SINGLE_FILE_MODE.key())) {
this.singleFileMode = config.getBoolean(FileBaseSinkOptions.SINGLE_FILE_MODE.key());
if (config.hasPath(BaseSinkConfig.SINGLE_FILE_MODE.key())) {
this.singleFileMode = config.getBoolean(BaseSinkConfig.SINGLE_FILE_MODE.key());
}

if (config.hasPath(FileBaseSinkOptions.CREATE_EMPTY_FILE_WHEN_NO_DATA.key())) {
if (config.hasPath(BaseSinkConfig.CREATE_EMPTY_FILE_WHEN_NO_DATA.key())) {
this.createEmptyFileWhenNoData =
config.getBoolean(FileBaseSinkOptions.CREATE_EMPTY_FILE_WHEN_NO_DATA.key());
config.getBoolean(BaseSinkConfig.CREATE_EMPTY_FILE_WHEN_NO_DATA.key());
}

if (config.hasPath(FileBaseSinkOptions.FILE_FORMAT_TYPE.key())
&& !StringUtils.isBlank(
config.getString(FileBaseSinkOptions.FILE_FORMAT_TYPE.key()))) {
if (config.hasPath(BaseSinkConfig.FILE_FORMAT_TYPE.key())
&& !StringUtils.isBlank(config.getString(BaseSinkConfig.FILE_FORMAT_TYPE.key()))) {
this.fileFormat =
FileFormat.valueOf(
config.getString(FileBaseSinkOptions.FILE_FORMAT_TYPE.key())
config.getString(BaseSinkConfig.FILE_FORMAT_TYPE.key())
.toUpperCase(Locale.ROOT));
} else {
// fall back to the default
this.fileFormat = FileBaseSinkOptions.FILE_FORMAT_TYPE.defaultValue();
this.fileFormat = BaseSinkConfig.FILE_FORMAT_TYPE.defaultValue();
}

if (config.hasPath(FileBaseSinkOptions.FILENAME_EXTENSION.key())
if (config.hasPath(BaseSinkConfig.FILENAME_EXTENSION.key())
&& !StringUtils.isBlank(
config.getString(FileBaseSinkOptions.FILENAME_EXTENSION.key()))) {
this.filenameExtension = config.getString(FileBaseSinkOptions.FILENAME_EXTENSION.key());
config.getString(BaseSinkConfig.FILENAME_EXTENSION.key()))) {
this.filenameExtension = config.getString(BaseSinkConfig.FILENAME_EXTENSION.key());
}

if (config.hasPath(FileBaseSinkOptions.DATE_FORMAT.key())) {
if (config.hasPath(BaseSinkConfig.DATE_FORMAT.key())) {
dateFormat =
DateUtils.Formatter.parse(
config.getString(FileBaseSinkOptions.DATE_FORMAT.key()));
DateUtils.Formatter.parse(config.getString(BaseSinkConfig.DATE_FORMAT.key()));
}

if (config.hasPath(FileBaseSinkOptions.DATETIME_FORMAT.key())) {
if (config.hasPath(BaseSinkConfig.DATETIME_FORMAT.key())) {
datetimeFormat =
DateTimeUtils.Formatter.parse(
config.getString(FileBaseSinkOptions.DATETIME_FORMAT.key()));
config.getString(BaseSinkConfig.DATETIME_FORMAT.key()));
}

if (config.hasPath(FileBaseSinkOptions.TIME_FORMAT.key())) {
if (config.hasPath(BaseSinkConfig.TIME_FORMAT.key())) {
timeFormat =
TimeUtils.Formatter.parse(
config.getString(FileBaseSinkOptions.TIME_FORMAT.key()));
TimeUtils.Formatter.parse(config.getString(BaseSinkConfig.TIME_FORMAT.key()));
}

if (config.hasPath(FileBaseSinkOptions.ENABLE_HEADER_WRITE.key())) {
enableHeaderWriter = config.getBoolean(FileBaseSinkOptions.ENABLE_HEADER_WRITE.key());
if (config.hasPath(BaseSinkConfig.ENABLE_HEADER_WRITE.key())) {
enableHeaderWriter = config.getBoolean(BaseSinkConfig.ENABLE_HEADER_WRITE.key());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public abstract class BaseFileSourceConfig implements Serializable {

public BaseFileSourceConfig(ReadonlyConfig readonlyConfig) {
this.baseFileSourceConfig = readonlyConfig;
this.fileFormat = readonlyConfig.get(FileBaseSourceOptions.FILE_FORMAT_TYPE);
this.fileFormat = readonlyConfig.get(BaseSourceConfigOptions.FILE_FORMAT_TYPE);
this.readStrategy = ReadStrategyFactory.of(readonlyConfig, getHadoopConfig());
this.filePaths = parseFilePaths(readonlyConfig);

Expand All @@ -69,7 +69,7 @@ public BaseFileSourceConfig(ReadonlyConfig readonlyConfig) {
private List<String> parseFilePaths(ReadonlyConfig readonlyConfig) {
String rootPath = null;
try {
rootPath = readonlyConfig.get(FileBaseSourceOptions.FILE_PATH);
rootPath = readonlyConfig.get(BaseSourceConfigOptions.FILE_PATH);
return readStrategy.getFileNamesByPath(rootPath);
} catch (Exception ex) {
String errorMsg = String.format("Get file list from this path [%s] failed", rootPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import static org.apache.seatunnel.api.sink.DataSaveMode.DROP_DATA;
import static org.apache.seatunnel.api.sink.DataSaveMode.ERROR_WHEN_DATA_EXISTS;

public class FileBaseSinkOptions extends FileBaseOptions {
public class BaseSinkConfig {
public static final String SEATUNNEL = "seatunnel";
public static final String NON_PARTITION = "NON_PARTITION";
public static final String TRANSACTION_ID_SPLIT = "_";
Expand Down Expand Up @@ -214,6 +214,13 @@ public class FileBaseSinkOptions extends FileBaseOptions {
.defaultValue(FileFormat.CSV)
.withDescription("File format type, e.g. csv, orc, parquet, text");

public static final Option<String> FILENAME_EXTENSION =
Options.key("filename_extension")
.stringType()
.noDefaultValue()
.withDescription(
"Override the default file name extensions with custom file name extensions. E.g. `.xml`, `.json`, `dat`, `.customtype`");

public static final Option<String> ENCODING =
Options.key("encoding")
.stringType()
Expand Down
Loading
Loading