Skip to content

Commit e6901a3

Browse files
author
gepa
committed
Add tests for JdbcTableSinkOperator and createSqlSuffix extension point
- Add HsqldbTableSinkOperator for testing JDBC sink against HSQLDB - Add JdbcTableSinkExecutorTest with 5 tests: overwrite creates new table, overwrite replaces existing table, append inserts into existing table, and SQL clause generation for both modes - Add createSqlSuffix() extension point in JdbcTableSinkOperator for dialect-specific syntax (HSQLDB requires parenthesized subquery form) - Default suffix is empty, preserving standard SQL behavior for PostgreSQL, SQLite, MySQL, and other production databases
1 parent 441866d commit e6901a3

4 files changed

Lines changed: 335 additions & 1 deletion

File tree

wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/execution/JdbcExecutor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ private static void executeSinkStage(final ExecutionStage stage,
164164
stmt.execute("DROP TABLE IF EXISTS " + sinkOp.getTableName());
165165
}
166166
// Execute the composed query: CREATE TABLE x AS SELECT ... or INSERT INTO x SELECT ...
167-
final String fullSql = sinkClause + " " + selectSql;
167+
final String fullSql = sinkClause + " " + selectSql + sinkOp.createSqlSuffix();
168168
stmt.execute(fullSql);
169169
jdbcExecutor.logger.info("Executed SQL sink: {}", fullSql);
170170
} catch (SQLException e) {

wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/operators/JdbcTableSinkOperator.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,16 @@ public String createSqlClause(Connection connection, FunctionCompiler compiler)
5656
return "INSERT INTO " + this.getTableName();
5757
}
5858

59+
/**
60+
* Returns a SQL suffix appended after the composed SELECT query.
61+
* Default is empty, which works for most databases (PostgreSQL, SQLite, MySQL).
62+
* Subclasses can potentiallyoverride for dialect-specific syntax (e.g., HSQLDB that we used for the tests requires
63+
* parenthesized subquery form: {@code CREATE TABLE x AS (SELECT ...)}).
64+
*/
65+
public String createSqlSuffix() {
66+
return "";
67+
}
68+
5969
@Override
6070
public List<ChannelDescriptor> getSupportedInputChannels(int index) {
6171
return Collections.singletonList(this.getPlatform().getSqlQueryChannelDescriptor());
Lines changed: 255 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,255 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.wayang.jdbc.execution;
20+
21+
import org.apache.wayang.core.api.Configuration;
22+
import org.apache.wayang.core.api.Job;
23+
import org.apache.wayang.core.optimizer.DefaultOptimizationContext;
24+
import org.apache.wayang.core.plan.executionplan.ExecutionStage;
25+
import org.apache.wayang.core.plan.executionplan.ExecutionTask;
26+
import org.apache.wayang.core.platform.CrossPlatformExecutor;
27+
import org.apache.wayang.core.profiling.NoInstrumentationStrategy;
28+
import org.apache.wayang.jdbc.channels.SqlQueryChannel;
29+
import org.apache.wayang.jdbc.operators.JdbcTableSinkOperator;
30+
import org.apache.wayang.jdbc.operators.JdbcTableSource;
31+
import org.apache.wayang.jdbc.test.HsqldbPlatform;
32+
import org.apache.wayang.jdbc.test.HsqldbTableSinkOperator;
33+
import org.apache.wayang.jdbc.test.HsqldbTableSource;
34+
import org.junit.jupiter.api.Test;
35+
36+
import java.sql.Connection;
37+
import java.sql.ResultSet;
38+
import java.sql.SQLException;
39+
import java.sql.Statement;
40+
import java.util.Collections;
41+
42+
import static org.junit.jupiter.api.Assertions.assertEquals;
43+
import static org.mockito.Mockito.mock;
44+
import static org.mockito.Mockito.when;
45+
46+
/**
47+
* Test suite for in-database table sink execution via {@link JdbcExecutor#executeSinkStage}.
48+
*/
49+
class JdbcTableSinkExecutorTest {
50+
51+
@Test
52+
void testOverwriteModeCreatesNewTable() throws SQLException {
53+
Configuration configuration = new Configuration();
54+
HsqldbPlatform hsqldbPlatform = new HsqldbPlatform();
55+
56+
// Create source table with data
57+
try (Connection conn = hsqldbPlatform.createDatabaseDescriptor(configuration).createJdbcConnection()) {
58+
Statement stmt = conn.createStatement();
59+
stmt.execute("DROP TABLE IF EXISTS source_overwrite;");
60+
stmt.execute("DROP TABLE IF EXISTS target_overwrite;");
61+
stmt.execute("CREATE TABLE source_overwrite (id INT, name VARCHAR(50));");
62+
stmt.execute("INSERT INTO source_overwrite VALUES (1, 'Jenny');");
63+
stmt.execute("INSERT INTO source_overwrite VALUES (2, 'Nick');");
64+
stmt.execute("INSERT INTO source_overwrite VALUES (3, 'Klaudia');");
65+
}
66+
67+
Job job = mock(Job.class);
68+
when(job.getConfiguration()).thenReturn(configuration);
69+
when(job.getCrossPlatformExecutor()).thenReturn(
70+
new CrossPlatformExecutor(job, new NoInstrumentationStrategy()));
71+
SqlQueryChannel.Descriptor sqlChannelDescriptor =
72+
HsqldbPlatform.getInstance().getSqlQueryChannelDescriptor();
73+
74+
//Build the execution stage source to sink
75+
ExecutionStage sqlStage = mock(ExecutionStage.class);
76+
77+
JdbcTableSource tableSource = new HsqldbTableSource("source_overwrite");
78+
ExecutionTask tableSourceTask = new ExecutionTask(tableSource);
79+
tableSourceTask.setOutputChannel(0,
80+
new SqlQueryChannel(sqlChannelDescriptor, tableSource.getOutput(0)));
81+
tableSourceTask.setStage(sqlStage);
82+
83+
JdbcTableSinkOperator sinkOp = new HsqldbTableSinkOperator(
84+
"target_overwrite", new String[]{"id", "name"});
85+
sinkOp.setMode("overwrite");
86+
ExecutionTask sinkTask = new ExecutionTask(sinkOp);
87+
sinkTask.setStage(sqlStage);
88+
tableSourceTask.getOutputChannel(0).addConsumer(sinkTask, 0);
89+
90+
when(sqlStage.getStartTasks()).thenReturn(Collections.singleton(tableSourceTask));
91+
when(sqlStage.getTerminalTasks()).thenReturn(Collections.singleton(sinkTask));
92+
93+
// Execute
94+
JdbcExecutor executor = new JdbcExecutor(HsqldbPlatform.getInstance(), job);
95+
executor.execute(sqlStage, new DefaultOptimizationContext(job), job.getCrossPlatformExecutor());
96+
97+
// Verify table was created and contains all 3 rows
98+
try (Connection conn = hsqldbPlatform.createDatabaseDescriptor(configuration).createJdbcConnection()) {
99+
Statement stmt = conn.createStatement();
100+
ResultSet rs = stmt.executeQuery("SELECT COUNT(*) FROM target_overwrite;");
101+
rs.next();
102+
assertEquals(3, rs.getInt(1));
103+
104+
rs = stmt.executeQuery("SELECT id, name FROM target_overwrite ORDER BY id;");
105+
rs.next();
106+
assertEquals(1, rs.getInt("id"));
107+
assertEquals("Jenny", rs.getString("name"));
108+
rs.next();
109+
assertEquals(2, rs.getInt("id"));
110+
assertEquals("Nick", rs.getString("name"));
111+
rs.next();
112+
assertEquals(3, rs.getInt("id"));
113+
assertEquals("Klaudia", rs.getString("name"));
114+
}
115+
}
116+
117+
@Test
118+
void testOverwriteModeReplacesExistingTable() throws SQLException {
119+
Configuration configuration = new Configuration();
120+
HsqldbPlatform hsqldbPlatform = new HsqldbPlatform();
121+
122+
// Create source table and a pre-existing target table
123+
try (Connection conn = hsqldbPlatform.createDatabaseDescriptor(configuration).createJdbcConnection()) {
124+
Statement stmt = conn.createStatement();
125+
stmt.execute("DROP TABLE IF EXISTS source_replace;");
126+
stmt.execute("DROP TABLE IF EXISTS target_replace;");
127+
stmt.execute("CREATE TABLE source_replace (id INT, val VARCHAR(50));");
128+
stmt.execute("INSERT INTO source_replace VALUES (1, 'new_data');");
129+
// Pre existing target table with different schema and data
130+
stmt.execute("CREATE TABLE target_replace (x INT, y INT, z INT);");
131+
stmt.execute("INSERT INTO target_replace VALUES (50, 20, 10);");
132+
}
133+
134+
Job job = mock(Job.class);
135+
when(job.getConfiguration()).thenReturn(configuration);
136+
when(job.getCrossPlatformExecutor()).thenReturn(
137+
new CrossPlatformExecutor(job, new NoInstrumentationStrategy()));
138+
SqlQueryChannel.Descriptor sqlChannelDescriptor =
139+
HsqldbPlatform.getInstance().getSqlQueryChannelDescriptor();
140+
141+
ExecutionStage sqlStage = mock(ExecutionStage.class);
142+
143+
JdbcTableSource tableSource = new HsqldbTableSource("source_replace");
144+
ExecutionTask tableSourceTask = new ExecutionTask(tableSource);
145+
tableSourceTask.setOutputChannel(0,
146+
new SqlQueryChannel(sqlChannelDescriptor, tableSource.getOutput(0)));
147+
tableSourceTask.setStage(sqlStage);
148+
149+
JdbcTableSinkOperator sinkOp = new HsqldbTableSinkOperator(
150+
"target_replace", new String[]{"id", "val"});
151+
sinkOp.setMode("overwrite");
152+
ExecutionTask sinkTask = new ExecutionTask(sinkOp);
153+
sinkTask.setStage(sqlStage);
154+
tableSourceTask.getOutputChannel(0).addConsumer(sinkTask, 0);
155+
156+
when(sqlStage.getStartTasks()).thenReturn(Collections.singleton(tableSourceTask));
157+
when(sqlStage.getTerminalTasks()).thenReturn(Collections.singleton(sinkTask));
158+
159+
JdbcExecutor executor = new JdbcExecutor(HsqldbPlatform.getInstance(), job);
160+
executor.execute(sqlStage, new DefaultOptimizationContext(job), job.getCrossPlatformExecutor());
161+
162+
// Verify target was replaced. Old data should be gone, new schema and data present
163+
try (Connection conn = hsqldbPlatform.createDatabaseDescriptor(configuration).createJdbcConnection()) {
164+
Statement stmt = conn.createStatement();
165+
ResultSet rs = stmt.executeQuery("SELECT COUNT(*) FROM target_replace;");
166+
rs.next();
167+
assertEquals(1, rs.getInt(1));
168+
169+
rs = stmt.executeQuery("SELECT id, val FROM target_replace;");
170+
rs.next();
171+
assertEquals(1, rs.getInt("id"));
172+
assertEquals("new_data", rs.getString("val"));
173+
}
174+
}
175+
176+
@Test
177+
void testAppendModeInsertsIntoExistingTable() throws SQLException {
178+
Configuration configuration = new Configuration();
179+
HsqldbPlatform hsqldbPlatform = new HsqldbPlatform();
180+
181+
//Create source and target table. Target has existing data.
182+
try (Connection conn = hsqldbPlatform.createDatabaseDescriptor(configuration).createJdbcConnection()) {
183+
Statement stmt = conn.createStatement();
184+
stmt.execute("DROP TABLE IF EXISTS source_append;");
185+
stmt.execute("DROP TABLE IF EXISTS target_append;");
186+
stmt.execute("CREATE TABLE source_append (id INT, name VARCHAR(50));");
187+
stmt.execute("INSERT INTO source_append VALUES (10, 'Ten');");
188+
stmt.execute("INSERT INTO source_append VALUES (20, 'Twenty');");
189+
stmt.execute("CREATE TABLE target_append (id INT, name VARCHAR(50));");
190+
stmt.execute("INSERT INTO target_append VALUES (1, 'Existing');");
191+
}
192+
193+
Job job = mock(Job.class);
194+
when(job.getConfiguration()).thenReturn(configuration);
195+
when(job.getCrossPlatformExecutor()).thenReturn(
196+
new CrossPlatformExecutor(job, new NoInstrumentationStrategy()));
197+
SqlQueryChannel.Descriptor sqlChannelDescriptor =
198+
HsqldbPlatform.getInstance().getSqlQueryChannelDescriptor();
199+
200+
ExecutionStage sqlStage = mock(ExecutionStage.class);
201+
202+
JdbcTableSource tableSource = new HsqldbTableSource("source_append");
203+
ExecutionTask tableSourceTask = new ExecutionTask(tableSource);
204+
tableSourceTask.setOutputChannel(0,
205+
new SqlQueryChannel(sqlChannelDescriptor, tableSource.getOutput(0)));
206+
tableSourceTask.setStage(sqlStage);
207+
208+
JdbcTableSinkOperator sinkOp = new HsqldbTableSinkOperator(
209+
"target_append", new String[]{"id", "name"});
210+
sinkOp.setMode("append");
211+
ExecutionTask sinkTask = new ExecutionTask(sinkOp);
212+
sinkTask.setStage(sqlStage);
213+
tableSourceTask.getOutputChannel(0).addConsumer(sinkTask, 0);
214+
215+
when(sqlStage.getStartTasks()).thenReturn(Collections.singleton(tableSourceTask));
216+
when(sqlStage.getTerminalTasks()).thenReturn(Collections.singleton(sinkTask));
217+
218+
JdbcExecutor executor = new JdbcExecutor(HsqldbPlatform.getInstance(), job);
219+
executor.execute(sqlStage, new DefaultOptimizationContext(job), job.getCrossPlatformExecutor());
220+
221+
// Verify existing data remains and new data is appended
222+
try (Connection conn = hsqldbPlatform.createDatabaseDescriptor(configuration).createJdbcConnection()) {
223+
Statement stmt = conn.createStatement();
224+
ResultSet rs = stmt.executeQuery("SELECT COUNT(*) FROM target_append;");
225+
rs.next();
226+
assertEquals(3, rs.getInt(1));
227+
228+
// Verify the pre-existing row is still there
229+
rs = stmt.executeQuery("SELECT COUNT(*) FROM target_append WHERE id = 1;");
230+
rs.next();
231+
assertEquals(1, rs.getInt(1));
232+
233+
// Verify the new rows were appended
234+
rs = stmt.executeQuery("SELECT COUNT(*) FROM target_append WHERE id >= 10;");
235+
rs.next();
236+
assertEquals(2, rs.getInt(1));
237+
}
238+
}
239+
240+
@Test
241+
void testOverwriteClauseGeneration() {
242+
JdbcTableSinkOperator sinkOp = new HsqldbTableSinkOperator(
243+
"my_table", new String[]{"col1"});
244+
sinkOp.setMode("overwrite");
245+
assertEquals("CREATE TABLE my_table AS (", sinkOp.createSqlClause(null, null));
246+
}
247+
248+
@Test
249+
void testAppendClauseGeneration() {
250+
JdbcTableSinkOperator sinkOp = new HsqldbTableSinkOperator(
251+
"my_table", new String[]{"col1"});
252+
sinkOp.setMode("append");
253+
assertEquals("INSERT INTO my_table", sinkOp.createSqlClause(null, null));
254+
}
255+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.wayang.jdbc.test;
20+
21+
import org.apache.wayang.core.platform.ChannelDescriptor;
22+
import org.apache.wayang.jdbc.compiler.FunctionCompiler;
23+
import org.apache.wayang.jdbc.operators.JdbcTableSinkOperator;
24+
25+
import java.sql.Connection;
26+
import java.util.List;
27+
28+
/**
29+
* Test implementation of {@link JdbcTableSinkOperator}.
30+
* Overrides SQL generation to use HSQLDB's CREATE TABLE AS syntax with parentheses.
31+
*/
32+
public class HsqldbTableSinkOperator extends JdbcTableSinkOperator {
33+
34+
public HsqldbTableSinkOperator(String tableName, String[] columnNames) {
35+
super(tableName, columnNames);
36+
}
37+
38+
@Override
39+
public HsqldbPlatform getPlatform() {
40+
return HsqldbPlatform.getInstance();
41+
}
42+
43+
@Override
44+
public String createSqlClause(Connection connection, FunctionCompiler compiler) {
45+
String mode = this.getMode();
46+
if ("overwrite".equals(mode)) {
47+
return "CREATE TABLE " + this.getTableName() + " AS (";
48+
}
49+
return "INSERT INTO " + this.getTableName();
50+
}
51+
52+
@Override
53+
public String createSqlSuffix() {
54+
if ("overwrite".equals(this.getMode())) {
55+
return ") WITH DATA"; // HSQLDB requires this suffix for CREATE TABLE AS SELECT
56+
}
57+
return "";
58+
}
59+
60+
@Override
61+
public List<ChannelDescriptor> getSupportedInputChannels(int index) {
62+
throw new UnsupportedOperationException();
63+
}
64+
65+
@Override
66+
public List<ChannelDescriptor> getSupportedOutputChannels(int index) {
67+
throw new UnsupportedOperationException();
68+
}
69+
}

0 commit comments

Comments
 (0)