Skip to content

Commit 99fa19d

Browse files
authored
[Refactor][core] Unify transformFactory creation logic (#8574)
1 parent 6468a1b commit 99fa19d

File tree

26 files changed

+225
-215
lines changed

26 files changed

+225
-215
lines changed

seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java

+39-2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.seatunnel.api.table.factory;
1919

2020
import org.apache.seatunnel.api.common.CommonOptions;
21+
import org.apache.seatunnel.api.common.JobContext;
2122
import org.apache.seatunnel.api.common.PluginIdentifier;
2223
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2324
import org.apache.seatunnel.api.configuration.util.ConfigValidator;
@@ -37,6 +38,9 @@
3738
import org.apache.seatunnel.api.table.connector.TableSource;
3839
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
3940
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
41+
import org.apache.seatunnel.common.constants.EngineType;
42+
import org.apache.seatunnel.common.constants.JobMode;
43+
import org.apache.seatunnel.common.constants.PluginType;
4044
import org.apache.seatunnel.common.utils.ExceptionUtils;
4145

4246
import org.slf4j.Logger;
@@ -106,7 +110,10 @@ Tuple2<SeaTunnelSource<T, SplitT, StateT>, List<CatalogTable>> restoreAndPrepare
106110
if (fallback) {
107111
source =
108112
fallbackCreateSource.apply(
109-
PluginIdentifier.of("seatunnel", "source", factoryId));
113+
PluginIdentifier.of(
114+
EngineType.SEATUNNEL.getEngine(),
115+
PluginType.SOURCE.getType(),
116+
factoryId));
110117
source.prepare(options.toConfig());
111118

112119
} else {
@@ -205,7 +212,10 @@ SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> createAndPrepareSi
205212
if (fallback) {
206213
SeaTunnelSink sink =
207214
fallbackCreateSink.apply(
208-
PluginIdentifier.of("seatunnel", "sink", factoryId));
215+
PluginIdentifier.of(
216+
EngineType.SEATUNNEL.getEngine(),
217+
PluginType.SINK.getType(),
218+
factoryId));
209219
sink.prepare(config.toConfig());
210220
sink.setTypeInfo(catalogTable.getSeaTunnelRowType());
211221

@@ -273,6 +283,23 @@ public static <T extends Factory> URL getFactoryUrl(T factory) {
273283
return factory.getClass().getProtectionDomain().getCodeSource().getLocation();
274284
}
275285

286+
public static <T extends Factory> Optional<T> discoverOptionalFactory(
287+
ClassLoader classLoader,
288+
Class<T> factoryClass,
289+
String factoryIdentifier,
290+
Function<String, T> discoverOptionalFactoryFunction) {
291+
292+
if (discoverOptionalFactoryFunction != null) {
293+
T apply = discoverOptionalFactoryFunction.apply(factoryIdentifier);
294+
if (apply != null) {
295+
return Optional.of(apply);
296+
} else {
297+
return Optional.empty();
298+
}
299+
}
300+
return discoverOptionalFactory(classLoader, factoryClass, factoryIdentifier);
301+
}
302+
276303
public static <T extends Factory> Optional<T> discoverOptionalFactory(
277304
ClassLoader classLoader, Class<T> factoryClass, String factoryIdentifier) {
278305
final List<T> foundFactories = discoverFactories(classLoader, factoryClass);
@@ -436,4 +463,14 @@ private static <T extends Factory> boolean isFallback(
436463
}
437464
return false;
438465
}
466+
467+
public static void ensureJobModeMatch(JobContext jobContext, SeaTunnelSource source) {
468+
if (jobContext.getJobMode() == JobMode.BATCH
469+
&& source.getBoundedness()
470+
== org.apache.seatunnel.api.source.Boundedness.UNBOUNDED) {
471+
throw new UnsupportedOperationException(
472+
String.format(
473+
"'%s' source don't support off-line job.", source.getPluginName()));
474+
}
475+
}
439476
}
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.seatunnel.core.starter.enums;
18+
package org.apache.seatunnel.common.constants;
1919

2020
/** Engine type enum */
2121
public enum EngineType {

seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/enums/PluginType.java

-35
This file was deleted.

seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java

-87
This file was deleted.

seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
package org.apache.seatunnel.core.starter.flink;
1919

2020
import org.apache.seatunnel.common.config.Common;
21+
import org.apache.seatunnel.common.constants.EngineType;
2122
import org.apache.seatunnel.core.starter.Starter;
22-
import org.apache.seatunnel.core.starter.enums.EngineType;
2323
import org.apache.seatunnel.core.starter.enums.MasterType;
2424
import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
2525
import org.apache.seatunnel.core.starter.utils.CommandLineUtils;

seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/SeaTunnelFlink.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717

1818
package org.apache.seatunnel.core.starter.flink;
1919

20+
import org.apache.seatunnel.common.constants.EngineType;
2021
import org.apache.seatunnel.core.starter.SeaTunnel;
21-
import org.apache.seatunnel.core.starter.enums.EngineType;
2222
import org.apache.seatunnel.core.starter.exception.CommandException;
2323
import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
2424
import org.apache.seatunnel.core.starter.utils.CommandLineUtils;

seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java

+30-8
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.seatunnel.core.starter.flink.execution;
1919

20+
import org.apache.seatunnel.shade.com.google.common.collect.Lists;
2021
import org.apache.seatunnel.shade.com.typesafe.config.Config;
2122

2223
import org.apache.seatunnel.api.common.CommonOptions;
@@ -35,9 +36,10 @@
3536
import org.apache.seatunnel.api.table.factory.FactoryUtil;
3637
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
3738
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
39+
import org.apache.seatunnel.common.constants.EngineType;
40+
import org.apache.seatunnel.common.constants.PluginType;
3841
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
3942
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
40-
import org.apache.seatunnel.core.starter.execution.PluginUtil;
4143
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
4244
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
4345
import org.apache.seatunnel.translation.flink.sink.FlinkSink;
@@ -56,6 +58,7 @@
5658

5759
import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
5860
import static org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;
61+
import static org.apache.seatunnel.api.table.factory.FactoryUtil.discoverOptionalFactory;
5962

6063
@SuppressWarnings({"unchecked", "rawtypes"})
6164
@Slf4j
@@ -77,14 +80,34 @@ protected List<Optional<? extends Factory>> initializePlugins(
7780
new SeaTunnelFactoryDiscovery(TableSinkFactory.class, ADD_URL_TO_CLASSLOADER);
7881
SeaTunnelSinkPluginDiscovery sinkPluginDiscovery =
7982
new SeaTunnelSinkPluginDiscovery(ADD_URL_TO_CLASSLOADER);
83+
Function<String, TableSinkFactory> discoverOptionalFactoryFunction =
84+
pluginName ->
85+
(TableSinkFactory)
86+
factoryDiscovery
87+
.createOptionalPluginInstance(
88+
PluginIdentifier.of(
89+
EngineType.SEATUNNEL.getEngine(),
90+
PluginType.SINK.getType(),
91+
pluginName))
92+
.orElse(null);
93+
8094
return pluginConfigs.stream()
8195
.map(
82-
sinkConfig ->
83-
PluginUtil.createSinkFactory(
84-
factoryDiscovery,
85-
sinkPluginDiscovery,
86-
sinkConfig,
87-
jarPaths))
96+
sinkConfig -> {
97+
jarPaths.addAll(
98+
sinkPluginDiscovery.getPluginJarPaths(
99+
Lists.newArrayList(
100+
PluginIdentifier.of(
101+
EngineType.SEATUNNEL.getEngine(),
102+
PluginType.SINK.getType(),
103+
sinkConfig.getString(
104+
PLUGIN_NAME.key())))));
105+
return discoverOptionalFactory(
106+
classLoader,
107+
TableSinkFactory.class,
108+
sinkConfig.getString(PLUGIN_NAME.key()),
109+
discoverOptionalFactoryFunction);
110+
})
88111
.distinct()
89112
.collect(Collectors.toList());
90113
}
@@ -95,7 +118,6 @@ public List<DataStreamTableInfo> execute(List<DataStreamTableInfo> upstreamDataS
95118
SeaTunnelSinkPluginDiscovery sinkPluginDiscovery =
96119
new SeaTunnelSinkPluginDiscovery(ADD_URL_TO_CLASSLOADER);
97120
DataStreamTableInfo input = upstreamDataStreams.get(upstreamDataStreams.size() - 1);
98-
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
99121
Function<PluginIdentifier, SeaTunnelSink> fallbackCreateSink =
100122
sinkPluginDiscovery::createPluginInstance;
101123
for (int i = 0; i < plugins.size(); i++) {

seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
package org.apache.seatunnel.core.starter.flink;
1919

2020
import org.apache.seatunnel.common.config.Common;
21+
import org.apache.seatunnel.common.constants.EngineType;
2122
import org.apache.seatunnel.core.starter.Starter;
22-
import org.apache.seatunnel.core.starter.enums.EngineType;
2323
import org.apache.seatunnel.core.starter.enums.MasterType;
2424
import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
2525
import org.apache.seatunnel.core.starter.utils.CommandLineUtils;

seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/SeaTunnelFlink.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717

1818
package org.apache.seatunnel.core.starter.flink;
1919

20+
import org.apache.seatunnel.common.constants.EngineType;
2021
import org.apache.seatunnel.core.starter.SeaTunnel;
21-
import org.apache.seatunnel.core.starter.enums.EngineType;
2222
import org.apache.seatunnel.core.starter.exception.CommandException;
2323
import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
2424
import org.apache.seatunnel.core.starter.utils.CommandLineUtils;

seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,6 @@
3636
public abstract class FlinkAbstractPluginExecuteProcessor<T>
3737
implements PluginExecuteProcessor<DataStreamTableInfo, FlinkRuntimeEnvironment> {
3838

39-
protected static final String ENGINE_TYPE = "seatunnel";
40-
4139
protected static final BiConsumer<ClassLoader, URL> ADD_URL_TO_CLASSLOADER =
4240
(classLoader, url) -> {
4341
if (classLoader.getClass().getName().endsWith("SafetyNetWrapperClassLoader")) {
@@ -57,6 +55,7 @@ public abstract class FlinkAbstractPluginExecuteProcessor<T>
5755
protected JobContext jobContext;
5856
protected final List<T> plugins;
5957
protected final Config envConfig;
58+
protected final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
6059

6160
protected FlinkAbstractPluginExecuteProcessor(
6261
List<URL> jarPaths,

0 commit comments

Comments
 (0)