Skip to content

Commit c0baac8

Browse files
author
misi
committed
Revert "[improve] modify deprecated method"
This reverts commit a9fc8ca.
1 parent a9fc8ca commit c0baac8

File tree

4 files changed

+46
-17
lines changed

4 files changed

+46
-17
lines changed

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,12 @@ public void setJobContext(JobContext jobContext) {
7575
preCheckConfig();
7676
}
7777

78+
@Override
79+
public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
80+
this.seaTunnelRowType = seaTunnelRowType;
81+
this.fileSinkConfig = new FileSinkConfig(pluginConfig, seaTunnelRowType);
82+
}
83+
7884
@Override
7985
public SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState> restoreWriter(
8086
SinkWriter.Context context, List<FileSinkState> states) {

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSource.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
2424
import org.apache.seatunnel.api.source.SupportColumnProjection;
2525
import org.apache.seatunnel.api.source.SupportParallelism;
26-
import org.apache.seatunnel.api.table.catalog.CatalogTable;
26+
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
2727
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2828
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
2929
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
@@ -48,6 +48,11 @@ public Boundedness getBoundedness() {
4848
return Boundedness.BOUNDED;
4949
}
5050

51+
@Override
52+
public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
53+
return rowType;
54+
}
55+
5156
@Override
5257
public SourceReader<SeaTunnelRow, FileSourceSplit> createReader(
5358
SourceReader.Context readerContext) {
@@ -67,9 +72,4 @@ public SourceSplitEnumerator<FileSourceSplit, FileSourceState> restoreEnumerator
6772
throws Exception {
6873
return new FileSourceSplitEnumerator(enumeratorContext, filePaths, checkpointState);
6974
}
70-
71-
@Override
72-
public List<CatalogTable> getProducedCatalogTables() {
73-
return SeaTunnelSource.super.getProducedCatalogTables();
74-
}
7575
}

seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSource.java

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,21 @@
1919

2020
import org.apache.seatunnel.shade.com.typesafe.config.Config;
2121

22-
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
22+
import org.apache.seatunnel.api.common.PrepareFailException;
23+
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
2324
import org.apache.seatunnel.api.options.ConnectorCommonOptions;
2425
import org.apache.seatunnel.api.table.catalog.CatalogTable;
2526
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
27+
import org.apache.seatunnel.common.config.CheckConfigUtil;
28+
import org.apache.seatunnel.common.config.CheckResult;
29+
import org.apache.seatunnel.common.constants.PluginType;
2630
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
31+
import org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseOptions;
2732
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
2833
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
2934
import org.apache.seatunnel.connectors.seatunnel.file.cos.config.CosConf;
3035
import org.apache.seatunnel.connectors.seatunnel.file.cos.config.CosFileBaseOptions;
36+
import org.apache.seatunnel.connectors.seatunnel.file.cos.config.CosFileSourceOptions;
3137
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
3238
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
3339
import org.apache.seatunnel.connectors.seatunnel.file.source.BaseFileSource;
@@ -41,9 +47,33 @@ public class CosFileSource extends BaseFileSource {
4147

4248
private final CatalogTable catalogTable;
4349

44-
public CosFileSource(ReadonlyConfig readonlyConfig, CatalogTable catalogTable) {
50+
public CosFileSource(CatalogTable catalogTable) {
4551
this.catalogTable = catalogTable;
46-
Config pluginConfig = readonlyConfig.toConfig();
52+
}
53+
54+
@Override
55+
public String getPluginName() {
56+
return FileSystemType.COS.getFileSystemPluginName();
57+
}
58+
59+
@Override
60+
public void prepare(Config pluginConfig) throws PrepareFailException {
61+
CheckResult result =
62+
CheckConfigUtil.checkAllExists(
63+
pluginConfig,
64+
FileBaseOptions.FILE_PATH.key(),
65+
CosFileSourceOptions.FILE_FORMAT_TYPE.key(),
66+
CosFileSourceOptions.SECRET_ID.key(),
67+
CosFileSourceOptions.SECRET_KEY.key(),
68+
CosFileSourceOptions.REGION.key(),
69+
CosFileSourceOptions.BUCKET.key());
70+
if (!result.isSuccess()) {
71+
throw new FileConnectorException(
72+
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
73+
String.format(
74+
"PluginName: %s, PluginType: %s, Message: %s",
75+
getPluginName(), PluginType.SOURCE, result.getMsg()));
76+
}
4777
String path = pluginConfig.getString(CosFileBaseOptions.FILE_PATH.key());
4878
hadoopConf = CosConf.buildWithConfig(pluginConfig);
4979
readStrategy =
@@ -106,11 +136,6 @@ public CosFileSource(ReadonlyConfig readonlyConfig, CatalogTable catalogTable) {
106136
}
107137
}
108138

109-
@Override
110-
public String getPluginName() {
111-
return FileSystemType.COS.getFileSystemPluginName();
112-
}
113-
114139
@Override
115140
public List<CatalogTable> getProducedCatalogTables() {
116141
return Collections.singletonList(catalogTable);

seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSourceFactory.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,6 @@ public Class<? extends SeaTunnelSource> getSourceClass() {
108108
TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) {
109109
return () ->
110110
(SeaTunnelSource<T, SplitT, StateT>)
111-
new CosFileSource(
112-
context.getOptions(),
113-
CatalogTableUtil.buildWithConfig(context.getOptions()));
111+
new CosFileSource(CatalogTableUtil.buildWithConfig(context.getOptions()));
114112
}
115113
}

0 commit comments

Comments
 (0)