-
Notifications
You must be signed in to change notification settings - Fork 2.2k
[improve] Adjust CosFile connector instantiation #9121
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
8e86af1
776e8b7
b99661f
a9fc8ca
c0baac8
98fac43
7b8edfc
37f68aa
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,7 +23,7 @@ | |
| import org.apache.seatunnel.api.source.SourceSplitEnumerator; | ||
| import org.apache.seatunnel.api.source.SupportColumnProjection; | ||
| import org.apache.seatunnel.api.source.SupportParallelism; | ||
| import org.apache.seatunnel.api.table.type.SeaTunnelDataType; | ||
| import org.apache.seatunnel.api.table.catalog.CatalogTable; | ||
| import org.apache.seatunnel.api.table.type.SeaTunnelRow; | ||
| import org.apache.seatunnel.api.table.type.SeaTunnelRowType; | ||
| import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf; | ||
|
|
@@ -48,11 +48,6 @@ public Boundedness getBoundedness() { | |
| return Boundedness.BOUNDED; | ||
| } | ||
|
|
||
| @Override | ||
| public SeaTunnelDataType<SeaTunnelRow> getProducedType() { | ||
| return rowType; | ||
| } | ||
|
|
||
| @Override | ||
| public SourceReader<SeaTunnelRow, FileSourceSplit> createReader( | ||
| SourceReader.Context readerContext) { | ||
|
|
@@ -72,4 +67,9 @@ public SourceSplitEnumerator<FileSourceSplit, FileSourceState> restoreEnumerator | |
| throws Exception { | ||
| return new FileSourceSplitEnumerator(enumeratorContext, filePaths, checkpointState); | ||
| } | ||
|
|
||
| @Override | ||
| public List<CatalogTable> getProducedCatalogTables() { | ||
| return SeaTunnelSource.super.getProducedCatalogTables(); | ||
| } | ||
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,8 +20,12 @@ | |
| import org.apache.seatunnel.api.configuration.util.OptionRule; | ||
| import org.apache.seatunnel.api.options.ConnectorCommonOptions; | ||
| import org.apache.seatunnel.api.source.SeaTunnelSource; | ||
| import org.apache.seatunnel.api.source.SourceSplit; | ||
| import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; | ||
| import org.apache.seatunnel.api.table.connector.TableSource; | ||
| import org.apache.seatunnel.api.table.factory.Factory; | ||
| import org.apache.seatunnel.api.table.factory.TableSourceFactory; | ||
| import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext; | ||
| import org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseOptions; | ||
| import org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions; | ||
| import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat; | ||
|
|
@@ -30,6 +34,7 @@ | |
|
|
||
| import com.google.auto.service.AutoService; | ||
|
|
||
| import java.io.Serializable; | ||
| import java.util.Arrays; | ||
|
|
||
| @AutoService(Factory.class) | ||
|
|
@@ -97,4 +102,14 @@ public OptionRule optionRule() { | |
| public Class<? extends SeaTunnelSource> getSourceClass() { | ||
| return CosFileSource.class; | ||
| } | ||
|
|
||
| @Override | ||
| public <T, SplitT extends SourceSplit, StateT extends Serializable> | ||
| TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) { | ||
| return () -> | ||
| (SeaTunnelSource<T, SplitT, StateT>) | ||
| new CosFileSource( | ||
| context.getOptions(), | ||
| CatalogTableUtil.buildWithConfig(context.getOptions())); | ||
|
||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why delete this? We still have some sink should invoke it like
OssFileSink,HdfsFileSinketc.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry,I neet to make adjustments according to purpose2 ,move the connector creation logic from Connector to ConnectorFactory.I don't know how to adjust it
(#8576 (comment))