Skip to content

Commit 441866d

Browse files
author
gepa
committed
Add in-database JdbcTableSinkOperator for SQL-level table writes
- Add JdbcTableSinkOperator abstract operator in wayang-jdbc-template - Add PostgresTableSinkOperator, Sqlite3TableSinkOperator, GenericJdbcTableSinkOperator - Add TableSinkMapping for Postgres, Sqlite3, and GenericJdbc platforms - Modify JdbcExecutor to handle sink stages with direct SQL execution - Supports CREATE TABLE AS SELECT (overwrite) and INSERT INTO SELECT (append)
1 parent b8bc2df commit 441866d

12 files changed

Lines changed: 468 additions & 10 deletions

File tree

.gitignore

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,3 @@ pom.xml.*
3535
# Scala Plugin for VSCode
3636
.metals
3737
.bloop/
38-

wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/mapping/Mappings.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ public class Mappings {
3030

3131
public static final Collection<Mapping> ALL = Arrays.asList(
3232
new FilterMapping(),
33-
new ProjectionMapping()
33+
new ProjectionMapping(),
34+
new TableSinkMapping()
3435
);
3536

3637
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.wayang.genericjdbc.mapping;
20+
21+
import org.apache.wayang.basic.operators.TableSink;
22+
import org.apache.wayang.core.mapping.Mapping;
23+
import org.apache.wayang.core.mapping.OperatorPattern;
24+
import org.apache.wayang.core.mapping.PlanTransformation;
25+
import org.apache.wayang.core.mapping.ReplacementSubplanFactory;
26+
import org.apache.wayang.core.mapping.SubplanPattern;
27+
import org.apache.wayang.genericjdbc.operators.GenericJdbcTableSinkOperator;
28+
import org.apache.wayang.genericjdbc.platform.GenericJdbcPlatform;
29+
30+
import java.util.Collection;
31+
import java.util.Collections;
32+
33+
/**
34+
* Mapping from {@link TableSink} to {@link GenericJdbcTableSinkOperator}.
35+
*/
36+
public class TableSinkMapping implements Mapping {
37+
38+
@Override
39+
public Collection<PlanTransformation> getTransformations() {
40+
return Collections.singleton(new PlanTransformation(
41+
this.createSubplanPattern(),
42+
this.createReplacementSubplanFactory(),
43+
GenericJdbcPlatform.getInstance()
44+
));
45+
}
46+
47+
private SubplanPattern createSubplanPattern() {
48+
final OperatorPattern<TableSink> operatorPattern = new OperatorPattern<>(
49+
"sink", new TableSink<>(null, null, null), false
50+
);
51+
return SubplanPattern.createSingleton(operatorPattern);
52+
}
53+
54+
private ReplacementSubplanFactory createReplacementSubplanFactory() {
55+
return new ReplacementSubplanFactory.OfSingleOperators<TableSink>(
56+
(matchedOperator, epoch) -> new GenericJdbcTableSinkOperator(matchedOperator).at(epoch)
57+
);
58+
}
59+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.wayang.genericjdbc.operators;
20+
21+
import org.apache.wayang.basic.operators.TableSink;
22+
import org.apache.wayang.basic.data.Record;
23+
import org.apache.wayang.jdbc.operators.JdbcTableSinkOperator;
24+
25+
/**
26+
* GenericJdbc implementation of the {@link JdbcTableSinkOperator}.
27+
*/
28+
public class GenericJdbcTableSinkOperator extends JdbcTableSinkOperator implements GenericJdbcExecutionOperator {
29+
30+
public GenericJdbcTableSinkOperator(String tableName, String[] columnNames) {
31+
super(tableName, columnNames);
32+
}
33+
34+
public GenericJdbcTableSinkOperator(TableSink<Record> that) {
35+
super(that);
36+
}
37+
}

wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/execution/JdbcExecutor.java

Lines changed: 90 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.apache.wayang.jdbc.operators.JdbcFilterOperator;
4141
import org.apache.wayang.jdbc.operators.JdbcJoinOperator;
4242
import org.apache.wayang.jdbc.operators.JdbcProjectionOperator;
43+
import org.apache.wayang.jdbc.operators.JdbcTableSinkOperator;
4344
import org.apache.wayang.jdbc.operators.JdbcTableSource;
4445
import org.apache.wayang.jdbc.platform.JdbcPlatformTemplate;
4546
import org.apache.logging.log4j.LogManager;
@@ -52,6 +53,7 @@
5253
import java.sql.ResultSet;
5354
import java.sql.ResultSetMetaData;
5455
import java.sql.SQLException;
56+
import java.sql.Statement;
5557
import java.util.ArrayList;
5658
import java.util.Collection;
5759
import java.util.Set;
@@ -78,14 +80,96 @@ public JdbcExecutor(final JdbcPlatformTemplate platform, final Job job) {
7880

7981
@Override
8082
public void execute(final ExecutionStage stage, final OptimizationContext optimizationContext, final ExecutionState executionState) {
81-
final Tuple2<String, SqlQueryChannel.Instance> pair = JdbcExecutor.createSqlQuery(stage, optimizationContext, this);
82-
final String query = pair.field0;
83-
final SqlQueryChannel.Instance queryChannel = pair.field1;
83+
// Check if this stage ends with a sink operator
84+
final Collection<?> termTasks = stage.getTerminalTasks();
85+
assert termTasks.size() == 1 : "Invalid JDBC stage: multiple terminal tasks are not currently supported.";
86+
final ExecutionTask termTask = (ExecutionTask) termTasks.toArray()[0];
87+
88+
if (termTask.getOperator() instanceof JdbcTableSinkOperator) {
89+
// If it is a sink stage: compose and execute SQL directly within the database
90+
JdbcExecutor.executeSinkStage(stage, optimizationContext, this);
91+
} else {
92+
//If it is normal stage: compose SQL and store in channel for downstream consumption
93+
final Tuple2<String, SqlQueryChannel.Instance> pair = JdbcExecutor.createSqlQuery(stage, optimizationContext, this);
94+
final String query = pair.field0;
95+
final SqlQueryChannel.Instance queryChannel = pair.field1;
96+
queryChannel.setSqlQuery(query);
97+
executionState.register(queryChannel);
98+
}
99+
}
84100

85-
queryChannel.setSqlQuery(query);
101+
/**
102+
* Handles execution stages that end with a {@link JdbcTableSinkOperator}.
103+
* Composes a SQL query from the stage's operators and executes it directly
104+
* on the database connection, keeping all data within the database.
105+
*
106+
* @param stage the execution stage ending with a sink
107+
* @param optimizationContext provides optimization information
108+
* @param jdbcExecutor the executor with the database connection
109+
*/
110+
private static void executeSinkStage(final ExecutionStage stage,
111+
final OptimizationContext optimizationContext,
112+
final JdbcExecutor jdbcExecutor) {
113+
final Collection<?> startTasks = stage.getStartTasks();
114+
final Collection<?> termTasks = stage.getTerminalTasks();
86115

87-
// Return the tipChannelInstance.
88-
executionState.register(queryChannel);
116+
assert startTasks.size() == 1 : "Invalid JDBC stage: multiple sources are not currently supported";
117+
final ExecutionTask startTask = (ExecutionTask) startTasks.toArray()[0];
118+
assert termTasks.size() == 1 : "Invalid JDBC stage: multiple terminal tasks are not currently supported.";
119+
final ExecutionTask termTask = (ExecutionTask) termTasks.toArray()[0];
120+
assert startTask.getOperator() instanceof TableSource
121+
: "Invalid JDBC stage: Start task has to be a TableSource";
122+
assert termTask.getOperator() instanceof JdbcTableSinkOperator
123+
: "Invalid JDBC stage: Terminal task has to be a JdbcTableSinkOperator";
124+
125+
// Extract operators from the stage
126+
final JdbcTableSource tableOp = (JdbcTableSource) startTask.getOperator();
127+
final JdbcTableSinkOperator sinkOp = (JdbcTableSinkOperator) termTask.getOperator();
128+
final Collection<JdbcFilterOperator> filterTasks = new ArrayList<>(4);
129+
JdbcProjectionOperator projectionTask = null;
130+
final Collection<JdbcJoinOperator<?>> joinTasks = new ArrayList<>();
131+
132+
// Walk through intermediate operators, stopping at the sink
133+
ExecutionTask nextTask = JdbcExecutor.findJdbcExecutionOperatorTaskInStage(startTask, stage);
134+
while (nextTask != null && !(nextTask.getOperator() instanceof JdbcTableSinkOperator)) {
135+
if (nextTask.getOperator() instanceof final JdbcFilterOperator filterOperator) {
136+
filterTasks.add(filterOperator);
137+
} else if (nextTask.getOperator() instanceof JdbcProjectionOperator projectionOperator) {
138+
assert projectionTask == null;
139+
projectionTask = projectionOperator;
140+
} else if (nextTask.getOperator() instanceof JdbcJoinOperator joinOperator) {
141+
joinTasks.add(joinOperator);
142+
} else {
143+
throw new WayangException(String.format("Unsupported JDBC execution task %s", nextTask.toString()));
144+
}
145+
nextTask = JdbcExecutor.findJdbcExecutionOperatorTaskInStage(nextTask, stage);
146+
}
147+
148+
// Compose the SELECT query
149+
final StringBuilder selectQuery = createSqlString(jdbcExecutor, tableOp, filterTasks, projectionTask, joinTasks);
150+
151+
// Remove trailing semicolon from SELECT
152+
String selectSql = selectQuery.toString();
153+
if (selectSql.endsWith(";")) {
154+
selectSql = selectSql.substring(0, selectSql.length() - 1);
155+
}
156+
157+
// Get the sink's SQL clause
158+
final String sinkClause = sinkOp.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler);
159+
160+
// Execute on the database
161+
try (Statement stmt = jdbcExecutor.connection.createStatement()) {
162+
// Handle overwrite: drop existing table first
163+
if ("overwrite".equals(sinkOp.getMode())) {
164+
stmt.execute("DROP TABLE IF EXISTS " + sinkOp.getTableName());
165+
}
166+
// Execute the composed query: CREATE TABLE x AS SELECT ... or INSERT INTO x SELECT ...
167+
final String fullSql = sinkClause + " " + selectSql;
168+
stmt.execute(fullSql);
169+
jdbcExecutor.logger.info("Executed SQL sink: {}", fullSql);
170+
} catch (SQLException e) {
171+
throw new WayangException("Failed to execute SQL sink on table: " + sinkOp.getTableName(), e);
172+
}
89173
}
90174

91175
/**
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.wayang.jdbc.operators;
20+
21+
import org.apache.wayang.basic.data.Record;
22+
import org.apache.wayang.basic.operators.TableSink;
23+
import org.apache.wayang.core.api.Configuration;
24+
import org.apache.wayang.core.optimizer.costs.LoadProfileEstimator;
25+
import org.apache.wayang.core.platform.ChannelDescriptor;
26+
import org.apache.wayang.jdbc.compiler.FunctionCompiler;
27+
28+
import java.sql.Connection;
29+
import java.util.Collections;
30+
import java.util.List;
31+
import java.util.Optional;
32+
33+
/**
34+
* Abstract JDBC-based implementation of {@link TableSink} that operates within
35+
* the {@link org.apache.wayang.jdbc.channels.SqlQueryChannel} ecosystem.
36+
* Instead of pulling data into Java/Spark memory and inserting via JDBC,
37+
* this operator wraps the composed SQL query in a CREATE TABLE AS SELECT
38+
* or INSERT INTO ... SELECT statement, keeping all data within the database.
39+
*/
40+
public abstract class JdbcTableSinkOperator extends TableSink<Record> implements JdbcExecutionOperator {
41+
42+
public JdbcTableSinkOperator(String tableName, String[] columnNames) {
43+
super(null, null, tableName, columnNames);
44+
}
45+
46+
public JdbcTableSinkOperator(TableSink<Record> that) {
47+
super(that);
48+
}
49+
50+
@Override
51+
public String createSqlClause(Connection connection, FunctionCompiler compiler) {
52+
String mode = this.getMode();
53+
if ("overwrite".equals(mode)) {
54+
return "CREATE TABLE " + this.getTableName() + " AS";
55+
}
56+
return "INSERT INTO " + this.getTableName();
57+
}
58+
59+
@Override
60+
public List<ChannelDescriptor> getSupportedInputChannels(int index) {
61+
return Collections.singletonList(this.getPlatform().getSqlQueryChannelDescriptor());
62+
}
63+
64+
@Override
65+
public List<ChannelDescriptor> getSupportedOutputChannels(int index) {
66+
throw new UnsupportedOperationException("This operator has no outputs.");
67+
}
68+
69+
@Override
70+
public String getLoadProfileEstimatorConfigurationKey() {
71+
return String.format("wayang.%s.tablesink.load", this.getPlatform().getPlatformId());
72+
}
73+
74+
@Override
75+
public Optional<LoadProfileEstimator> createLoadProfileEstimator(Configuration configuration) {
76+
return JdbcExecutionOperator.super.createLoadProfileEstimator(configuration);
77+
}
78+
}

wayang-platforms/wayang-postgres/src/main/java/org/apache/wayang/postgres/mapping/Mappings.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ public class Mappings {
3131
public static final Collection<Mapping> ALL = Arrays.asList(
3232
new FilterMapping(),
3333
new JoinMapping(),
34-
new ProjectionMapping()
34+
new ProjectionMapping(),
35+
new TableSinkMapping()
3536
);
3637

3738
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.wayang.postgres.mapping;
20+
21+
import org.apache.wayang.basic.operators.TableSink;
22+
import org.apache.wayang.core.mapping.Mapping;
23+
import org.apache.wayang.core.mapping.OperatorPattern;
24+
import org.apache.wayang.core.mapping.PlanTransformation;
25+
import org.apache.wayang.core.mapping.ReplacementSubplanFactory;
26+
import org.apache.wayang.core.mapping.SubplanPattern;
27+
import org.apache.wayang.postgres.operators.PostgresTableSinkOperator;
28+
import org.apache.wayang.postgres.platform.PostgresPlatform;
29+
30+
import java.util.Collection;
31+
import java.util.Collections;
32+
33+
/**
34+
* Mapping from {@link TableSink} to {@link PostgresTableSinkOperator}.
35+
*/
36+
public class TableSinkMapping implements Mapping {
37+
38+
@Override
39+
public Collection<PlanTransformation> getTransformations() {
40+
return Collections.singleton(new PlanTransformation(
41+
this.createSubplanPattern(),
42+
this.createReplacementSubplanFactory(),
43+
PostgresPlatform.getInstance()
44+
));
45+
}
46+
47+
private SubplanPattern createSubplanPattern() {
48+
final OperatorPattern<TableSink> operatorPattern = new OperatorPattern<>(
49+
"sink", new TableSink<>(null, null, null), false
50+
);
51+
return SubplanPattern.createSingleton(operatorPattern);
52+
}
53+
54+
private ReplacementSubplanFactory createReplacementSubplanFactory() {
55+
return new ReplacementSubplanFactory.OfSingleOperators<TableSink>(
56+
(matchedOperator, epoch) -> new PostgresTableSinkOperator(matchedOperator).at(epoch)
57+
);
58+
}
59+
}

0 commit comments

Comments
 (0)