Skip to content

Commit 50c6c94

Browse files
[Improve][Transform] Remove Fallback during parsing Transform process (#6644)
1 parent de4242c commit 50c6c94

File tree

7 files changed

+27
-118
lines changed

7 files changed

+27
-118
lines changed

Diff for: seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelTransform.java

+4-16
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,14 @@
1818
package org.apache.seatunnel.api.transform;
1919

2020
import org.apache.seatunnel.api.common.PluginIdentifierInterface;
21-
import org.apache.seatunnel.api.common.SeaTunnelPluginLifeCycle;
2221
import org.apache.seatunnel.api.source.SeaTunnelJobAware;
2322
import org.apache.seatunnel.api.table.catalog.CatalogTable;
2423
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
2524

2625
import java.io.Serializable;
2726

2827
public interface SeaTunnelTransform<T>
29-
extends Serializable,
30-
PluginIdentifierInterface,
31-
SeaTunnelPluginLifeCycle,
32-
SeaTunnelJobAware {
28+
extends Serializable, PluginIdentifierInterface, SeaTunnelJobAware {
3329

3430
/** call it when Transformer initialed */
3531
default void open() {}
@@ -45,22 +41,14 @@ default void setTypeInfo(SeaTunnelDataType<T> inputDataType) {
4541
throw new UnsupportedOperationException("setTypeInfo method is not supported");
4642
}
4743

48-
/**
49-
* Get the data type of the records produced by this transform.
50-
*
51-
* @deprecated Please use {@link #getProducedCatalogTable}
52-
* @return Produced data type.
53-
*/
54-
@Deprecated
55-
SeaTunnelDataType<T> getProducedType();
56-
5744
/** Get the catalog table output by this transform */
5845
CatalogTable getProducedCatalogTable();
5946

6047
/**
61-
* Transform input data to {@link this#getProducedType()} types data.
48+
* Transform input data to {@link this#getProducedCatalogTable().getSeaTunnelRowType()} types
49+
* data.
6250
*
63-
* @param row the data need be transform.
51+
* @param row the data need be transformed.
6452
* @return transformed data.
6553
*/
6654
T map(T row);

Diff for: seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java

+14-14
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
import org.apache.flink.api.common.typeinfo.TypeInformation;
3838
import org.apache.flink.streaming.api.datastream.DataStream;
3939
import org.apache.flink.types.Row;
40-
import org.apache.flink.util.Collector;
4140

4241
import java.net.URL;
4342
import java.util.Collections;
@@ -119,24 +118,25 @@ public List<DataStreamTableInfo> execute(List<DataStreamTableInfo> upstreamDataS
119118

120119
protected DataStream<Row> flinkTransform(
121120
SeaTunnelRowType sourceType, SeaTunnelTransform transform, DataStream<Row> stream) {
122-
TypeInformation rowTypeInfo = TypeConverterUtils.convert(transform.getProducedType());
121+
TypeInformation rowTypeInfo =
122+
TypeConverterUtils.convert(
123+
transform.getProducedCatalogTable().getSeaTunnelRowType());
123124
FlinkRowConverter transformInputRowConverter = new FlinkRowConverter(sourceType);
124125
FlinkRowConverter transformOutputRowConverter =
125126
new FlinkRowConverter(transform.getProducedCatalogTable().getSeaTunnelRowType());
126127
DataStream<Row> output =
127128
stream.flatMap(
128-
new FlatMapFunction<Row, Row>() {
129-
@Override
130-
public void flatMap(Row value, Collector<Row> out) throws Exception {
131-
SeaTunnelRow seaTunnelRow =
132-
transformInputRowConverter.reconvert(value);
133-
SeaTunnelRow dataRow = (SeaTunnelRow) transform.map(seaTunnelRow);
134-
if (dataRow != null) {
135-
Row copy = transformOutputRowConverter.convert(dataRow);
136-
out.collect(copy);
137-
}
138-
}
139-
},
129+
(FlatMapFunction<Row, Row>)
130+
(value, out) -> {
131+
SeaTunnelRow seaTunnelRow =
132+
transformInputRowConverter.reconvert(value);
133+
SeaTunnelRow dataRow =
134+
(SeaTunnelRow) transform.map(seaTunnelRow);
135+
if (dataRow != null) {
136+
Row copy = transformOutputRowConverter.convert(dataRow);
137+
out.collect(copy);
138+
}
139+
},
140140
rowTypeInfo);
141141
return output;
142142
}

Diff for: seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java

-35
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,15 @@
2323
import org.apache.seatunnel.api.source.SeaTunnelSource;
2424
import org.apache.seatunnel.api.table.catalog.CatalogTable;
2525
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
26-
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
2726
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2827
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
29-
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
3028
import org.apache.seatunnel.common.constants.CollectionConstants;
3129
import org.apache.seatunnel.core.starter.execution.PluginUtil;
3230
import org.apache.seatunnel.engine.common.config.JobConfig;
3331
import org.apache.seatunnel.engine.common.utils.IdGenerator;
3432
import org.apache.seatunnel.engine.core.dag.actions.Action;
3533
import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
3634
import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
37-
import org.apache.seatunnel.engine.core.dag.actions.TransformAction;
3835

3936
import org.apache.commons.lang3.tuple.ImmutablePair;
4037

@@ -99,38 +96,6 @@ public Tuple2<CatalogTable, Action> parseSource(
9996
return new Tuple2<>(catalogTable, action);
10097
}
10198

102-
public Tuple2<CatalogTable, Action> parseTransform(
103-
Config config,
104-
JobConfig jobConfig,
105-
String tableId,
106-
int parallelism,
107-
SeaTunnelRowType rowType,
108-
Set<Action> inputActions) {
109-
final ImmutablePair<SeaTunnelTransform<?>, Set<URL>> tuple =
110-
ConnectorInstanceLoader.loadTransformInstance(
111-
config, jobConfig.getJobContext(), commonPluginJars);
112-
final SeaTunnelTransform<?> transform = tuple.getLeft();
113-
// old logic: prepare(initialization) -> set job context -> set row type (There is a logical
114-
// judgment that depends on before and after, not a simple set)
115-
transform.prepare(config);
116-
transform.setJobContext(jobConfig.getJobContext());
117-
transform.setTypeInfo((SeaTunnelDataType) rowType);
118-
final String actionName = createTransformActionName(0, tuple.getLeft().getPluginName());
119-
final TransformAction action =
120-
new TransformAction(
121-
idGenerator.getNextId(),
122-
actionName,
123-
new ArrayList<>(inputActions),
124-
transform,
125-
tuple.getRight(),
126-
new HashSet<>());
127-
action.setParallelism(parallelism);
128-
CatalogTable catalogTable =
129-
CatalogTableUtil.getCatalogTable(
130-
tableId, (SeaTunnelRowType) transform.getProducedType());
131-
return new Tuple2<>(catalogTable, action);
132-
}
133-
13499
public List<SinkAction<?, ?, ?, ?>> parseSinks(
135100
int configIndex,
136101
List<List<Tuple2<CatalogTable, Action>>> inputVertices,

Diff for: seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java

+4-32
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,7 @@
3434
import org.apache.seatunnel.api.table.factory.FactoryUtil;
3535
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
3636
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
37-
import org.apache.seatunnel.api.table.factory.TableTransformFactory;
3837
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
39-
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
4038
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
4139
import org.apache.seatunnel.common.Constants;
4240
import org.apache.seatunnel.common.config.TypesafeConfigUtils;
@@ -405,35 +403,14 @@ private void parseTransform(
405403
final String tableId =
406404
readonlyConfig.getOptional(CommonOptions.RESULT_TABLE_NAME).orElse(DEFAULT_ID);
407405

408-
boolean fallback =
409-
isFallback(
410-
classLoader,
411-
TableTransformFactory.class,
412-
factoryId,
413-
(factory) -> factory.createTransform(null));
414-
415406
Set<Action> inputActions =
416407
inputs.stream()
417408
.map(Tuple2::_2)
418409
.collect(Collectors.toCollection(LinkedHashSet::new));
419-
SeaTunnelDataType<?> expectedType = getProducedType(inputs.get(0)._2());
420410
checkProducedTypeEquals(inputActions);
421411
int spareParallelism = inputs.get(0)._2().getParallelism();
422412
int parallelism =
423413
readonlyConfig.getOptional(CommonOptions.PARALLELISM).orElse(spareParallelism);
424-
if (fallback) {
425-
Tuple2<CatalogTable, Action> tuple =
426-
fallbackParser.parseTransform(
427-
config,
428-
jobConfig,
429-
tableId,
430-
parallelism,
431-
(SeaTunnelRowType) expectedType,
432-
inputActions);
433-
tableWithActionMap.put(tableId, Collections.singletonList(tuple));
434-
return;
435-
}
436-
437414
CatalogTable catalogTable = inputs.get(0)._1();
438415
SeaTunnelTransform<?> transform =
439416
FactoryUtil.createAndPrepareTransform(
@@ -470,15 +447,10 @@ public static SeaTunnelDataType<?> getProducedType(Action action) {
470447
return ((SourceAction<?, ?, ?>) action).getSource().getProducedType();
471448
}
472449
} else if (action instanceof TransformAction) {
473-
try {
474-
return ((TransformAction) action)
475-
.getTransform()
476-
.getProducedCatalogTable()
477-
.getSeaTunnelRowType();
478-
} catch (UnsupportedOperationException e) {
479-
// TODO remove it when all connector use `getProducedCatalogTables`
480-
return ((TransformAction) action).getTransform().getProducedType();
481-
}
450+
return ((TransformAction) action)
451+
.getTransform()
452+
.getProducedCatalogTable()
453+
.getSeaTunnelRowType();
482454
}
483455
throw new UnsupportedOperationException();
484456
}

Diff for: seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ public void testMetricsOnJobRestart() throws InterruptedException {
137137
server.getCoordinatorService().getJobStatus(jobId3)));
138138

139139
// check metrics
140-
await().atMost(60000, TimeUnit.MILLISECONDS)
140+
await().atMost(600000, TimeUnit.MILLISECONDS)
141141
.untilAsserted(
142142
() -> {
143143
JobMetrics jobMetrics = coordinatorService.getJobMetrics(jobId3);
@@ -161,12 +161,12 @@ public void testMetricsOnJobRestart() throws InterruptedException {
161161
server.getCoordinatorService().cancelJob(jobId3);
162162
}
163163

164-
private void startJob(Long jobid, String path, boolean isStartWithSavePoint) {
165-
LogicalDag testLogicalDag = TestUtils.createTestLogicalPlan(path, jobid.toString(), jobid);
164+
private void startJob(Long jobId, String path, boolean isStartWithSavePoint) {
165+
LogicalDag testLogicalDag = TestUtils.createTestLogicalPlan(path, jobId.toString(), jobId);
166166

167167
JobImmutableInformation jobImmutableInformation =
168168
new JobImmutableInformation(
169-
jobid,
169+
jobId,
170170
"Test",
171171
isStartWithSavePoint,
172172
nodeEngine.getSerializationService().toData(testLogicalDag),
@@ -177,7 +177,7 @@ private void startJob(Long jobid, String path, boolean isStartWithSavePoint) {
177177
Data data = nodeEngine.getSerializationService().toData(jobImmutableInformation);
178178

179179
PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
180-
server.getCoordinatorService().submitJob(jobid, data);
180+
server.getCoordinatorService().submitJob(jobId, data);
181181
voidPassiveCompletableFuture.join();
182182
}
183183
}

Diff for: seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java

-10
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@
2020
import org.apache.seatunnel.api.table.catalog.CatalogTable;
2121
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
2222
import org.apache.seatunnel.api.table.catalog.TableSchema;
23-
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
24-
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2523

2624
import lombok.NonNull;
2725

@@ -61,12 +59,4 @@ private CatalogTable transformCatalogTable() {
6159
protected abstract TableSchema transformTableSchema();
6260

6361
protected abstract TableIdentifier transformTableIdentifier();
64-
65-
@Override
66-
public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
67-
if (outputRowType != null) {
68-
return outputRowType;
69-
}
70-
return getProducedCatalogTable().getTableSchema().toPhysicalRowDataType();
71-
}
7262
}

Diff for: seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java

-6
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.seatunnel.transform.common;
1919

2020
import org.apache.seatunnel.api.table.catalog.CatalogTable;
21-
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
2221
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2322
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
2423
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
@@ -30,11 +29,6 @@ public abstract class AbstractSeaTunnelTransform implements SeaTunnelTransform<S
3029

3130
protected SeaTunnelRowType outputRowType;
3231

33-
@Override
34-
public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
35-
return outputRowType;
36-
}
37-
3832
@Override
3933
public SeaTunnelRow map(SeaTunnelRow row) {
4034
return transformRow(row);

0 commit comments

Comments
 (0)