Skip to content

Commit 0cdab3e

Browse files
authored
Merge pull request #385 from juripetersen/384-postgres-join
Add JdbcJoinOperator and PostgresJoinOperator
2 parents 5e6a07e + 6554ed9 commit 0cdab3e

File tree

9 files changed

+446
-3
lines changed

9 files changed

+446
-3
lines changed

wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/function/TransformationDescriptor.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.wayang.core.function;
2020

21+
import org.apache.wayang.core.util.Tuple;
2122
import org.apache.wayang.core.optimizer.costs.LoadEstimator;
2223
import org.apache.wayang.core.optimizer.costs.LoadProfileEstimator;
2324
import org.apache.wayang.core.optimizer.costs.NestableLoadProfileEstimator;
@@ -39,6 +40,8 @@ public class TransformationDescriptor<Input, Output> extends FunctionDescriptor
3940

4041
private final FunctionDescriptor.SerializableFunction<Input, Output> javaImplementation;
4142

43+
private Tuple<String, String> sqlImplementation;
44+
4245
public TransformationDescriptor(FunctionDescriptor.SerializableFunction<Input, Output> javaImplementation,
4346
Class<Input> inputTypeClass,
4447
Class<Output> outputTypeClass) {
@@ -88,6 +91,28 @@ public Function<Input, Output> getJavaImplementation() {
8891
return this.javaImplementation;
8992
}
9093

94+
/**
95+
* This function is not built to last. It is thought to help out devising programs while we are still figuring
96+
* out how to express functions in a platform-independent way.
97+
*
98+
* @return a Tuple holding tableName and a SQL predicate applicable in a {@code JOIN} clause
99+
*/
100+
public Tuple<String, String> getSqlImplementation() {
101+
return this.sqlImplementation;
102+
}
103+
104+
/**
105+
* This function is not built to last. It is thought to help out devising programs while we are still figuring
106+
* out how to express functions in a platform-independent way.
107+
*
108+
* @param tableName a SQL table name applicable in a {@code JOIN TABLE ON}.
109+
* @param sqlImplementation a SQL predicate applicable in a {@code WHERE} clause representing this predicate
110+
*/
111+
public TransformationDescriptor<Input, Output> withSqlImplementation(String tableName, String sqlImplementation) {
112+
this.sqlImplementation = new Tuple<String, String>(tableName, sqlImplementation);
113+
return this;
114+
}
115+
91116
/**
92117
* In generic code, we do not have the type parameter values of operators, functions etc. This method avoids casting issues.
93118
*

wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/execution/JdbcExecutor.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.wayang.jdbc.compiler.FunctionCompiler;
3939
import org.apache.wayang.jdbc.operators.JdbcExecutionOperator;
4040
import org.apache.wayang.jdbc.operators.JdbcFilterOperator;
41+
import org.apache.wayang.jdbc.operators.JdbcJoinOperator;
4142
import org.apache.wayang.jdbc.operators.JdbcProjectionOperator;
4243
import org.apache.wayang.jdbc.platform.JdbcPlatformTemplate;
4344
import org.apache.logging.log4j.LogManager;
@@ -92,6 +93,7 @@ public void execute(ExecutionStage stage, OptimizationContext optimizationContex
9293
SqlQueryChannel.Instance tipChannelInstance = this.instantiateOutboundChannel(startTask, optimizationContext);
9394
Collection<ExecutionTask> filterTasks = new ArrayList<>(4);
9495
ExecutionTask projectionTask = null;
96+
Collection<ExecutionTask> joinTasks = new ArrayList<>();
9597
Set<ExecutionTask> allTasks = stage.getAllTasks();
9698
assert allTasks.size() <= 3;
9799
ExecutionTask nextTask = this.findJdbcExecutionOperatorTaskInStage(startTask, stage);
@@ -102,7 +104,8 @@ public void execute(ExecutionStage stage, OptimizationContext optimizationContex
102104
} else if (nextTask.getOperator() instanceof JdbcProjectionOperator) {
103105
assert projectionTask == null; //Allow one projection operator per stage for now.
104106
projectionTask = nextTask;
105-
107+
} else if (nextTask.getOperator() instanceof JdbcJoinOperator) {
108+
joinTasks.add(nextTask);
106109
} else {
107110
throw new WayangException(String.format("Unsupported JDBC execution task %s", nextTask.toString()));
108111
}
@@ -121,7 +124,11 @@ public void execute(ExecutionStage stage, OptimizationContext optimizationContex
121124
.map(this::getSqlClause)
122125
.collect(Collectors.toList());
123126
String projection = projectionTask == null ? "*" : this.getSqlClause(projectionTask.getOperator());
124-
String query = this.createSqlQuery(tableName, conditions, projection);
127+
Collection<String> joins = joinTasks.stream()
128+
.map(ExecutionTask::getOperator)
129+
.map(this::getSqlClause)
130+
.collect(Collectors.toList());
131+
String query = this.createSqlQuery(tableName, conditions, projection, joins);
125132
tipChannelInstance.setSqlQuery(query);
126133

127134
// Return the tipChannelInstance.
@@ -184,11 +191,18 @@ private SqlQueryChannel.Instance instantiateOutboundChannel(ExecutionTask task,
184191
* @param tableName the table to be queried
185192
* @param conditions conditions for the {@code WHERE} clause
186193
* @param projection projection for the {@code SELECT} clause
194+
* @param joins join clauses for multiple {@code JOIN} clauses
187195
* @return the SQL query
188196
*/
189-
protected String createSqlQuery(String tableName, Collection<String> conditions, String projection) {
197+
protected String createSqlQuery(String tableName, Collection<String> conditions, String projection, Collection<String> joins) {
190198
StringBuilder sb = new StringBuilder(1000);
191199
sb.append("SELECT ").append(projection).append(" FROM ").append(tableName);
200+
if (!joins.isEmpty()) {
201+
String separator = " ";
202+
for (String join : joins) {
203+
sb.append(separator).append(join);
204+
}
205+
}
192206
if (!conditions.isEmpty()) {
193207
sb.append(" WHERE ");
194208
String separator = "";

wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/operators/JdbcExecutionOperator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public interface JdbcExecutionOperator extends ExecutionOperator {
3434
* Creates a SQL clause for this instance. For {@link TableSource}s it returns an identifier for the table
3535
* usable in a {@code FROM} clause. For {@link JdbcProjectionOperator}s it returns a list usable in a
3636
* {@code SELECT} clause. For {@link JdbcFilterOperator}s it creates a condition usable in a {@code WHERE} clause.
37+
* For {@link JdbcJoinOperator} it returns a INNER JOIN statement usable in a {@code JOIN} clause.
3738
* Also, these different clauses should be compatible for connected {@link JdbcExecutionOperator}s.
3839
*
3940
* @param compiler used to create SQL code
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.wayang.jdbc.operators;
20+
21+
import org.apache.wayang.core.util.Tuple;
22+
import org.apache.wayang.basic.data.Record;
23+
import org.apache.wayang.core.types.DataSetType;
24+
import org.apache.wayang.basic.operators.JoinOperator;
25+
import org.apache.wayang.core.function.TransformationDescriptor;
26+
import org.apache.wayang.jdbc.compiler.FunctionCompiler;
27+
import org.apache.wayang.core.api.Configuration;
28+
import org.apache.wayang.core.optimizer.costs.LoadProfileEstimator;
29+
import org.apache.wayang.core.optimizer.costs.LoadProfileEstimators;
30+
31+
import java.sql.Connection;
32+
import java.util.Optional;
33+
34+
/**
35+
* PostgreSQL implementation for the {@link JoinOperator}.
36+
*/
37+
public abstract class JdbcJoinOperator<KeyType>
38+
extends JoinOperator<Record, Record, KeyType>
39+
implements JdbcExecutionOperator {
40+
41+
/**
42+
* Creates a new instance.
43+
*
44+
* @see JoinOperator#JoinOperator(Record, Record...)
45+
*/
46+
public JdbcJoinOperator(
47+
TransformationDescriptor<Record, KeyType> keyDescriptor0,
48+
TransformationDescriptor<Record, KeyType> keyDescriptor1
49+
) {
50+
super(
51+
keyDescriptor0,
52+
keyDescriptor1,
53+
DataSetType.createDefault(Record.class),
54+
DataSetType.createDefault(Record.class)
55+
);
56+
}
57+
58+
/**
59+
* Copies an instance
60+
*
61+
* @param that that should be copied
62+
*/
63+
public JdbcJoinOperator(JoinOperator<Record, Record, KeyType> that) {
64+
super(that);
65+
}
66+
67+
@Override
68+
public String createSqlClause(Connection connection, FunctionCompiler compiler) {
69+
final Tuple<String, String> left = this.keyDescriptor0.getSqlImplementation();
70+
final Tuple<String, String> right = this.keyDescriptor1.getSqlImplementation();
71+
final String leftTableName = left.field0;
72+
final String leftKey = left.field1;
73+
final String rightTableName = right.field0;
74+
final String rightKey = right.field1;
75+
76+
return "JOIN " + leftTableName + " ON " +
77+
rightTableName + "." + rightKey
78+
+ "=" + leftTableName + "." + leftKey;
79+
}
80+
81+
@Override
82+
public String getLoadProfileEstimatorConfigurationKey() {
83+
return String.format("wayang.%s.join.load", this.getPlatform().getPlatformId());
84+
}
85+
86+
@Override
87+
public Optional<LoadProfileEstimator> createLoadProfileEstimator(Configuration configuration) {
88+
final Optional<LoadProfileEstimator> optEstimator =
89+
JdbcExecutionOperator.super.createLoadProfileEstimator(configuration);
90+
LoadProfileEstimators.nestUdfEstimator(optEstimator, this.keyDescriptor0, configuration);
91+
LoadProfileEstimators.nestUdfEstimator(optEstimator, this.keyDescriptor1, configuration);
92+
return optEstimator;
93+
}
94+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.wayang.jdbc.operators;
20+
21+
import org.junit.Assert;
22+
import org.junit.Test;
23+
import org.apache.wayang.basic.data.Record;
24+
import org.apache.wayang.core.api.Configuration;
25+
import org.apache.wayang.core.api.Job;
26+
import org.apache.wayang.core.function.PredicateDescriptor;
27+
import org.apache.wayang.core.optimizer.OptimizationContext;
28+
import org.apache.wayang.core.plan.executionplan.ExecutionTask;
29+
import org.apache.wayang.core.plan.executionplan.ExecutionStage;
30+
import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
31+
import org.apache.wayang.core.plan.wayangplan.OutputSlot;
32+
import org.apache.wayang.core.platform.ChannelInstance;
33+
import org.apache.wayang.core.platform.CrossPlatformExecutor;
34+
import org.apache.wayang.core.profiling.FullInstrumentationStrategy;
35+
import org.apache.wayang.java.channels.StreamChannel;
36+
import org.apache.wayang.java.execution.JavaExecutor;
37+
import org.apache.wayang.java.platform.JavaPlatform;
38+
import org.apache.wayang.jdbc.channels.SqlQueryChannel;
39+
import org.apache.wayang.jdbc.test.HsqldbJoinOperator;
40+
import org.apache.wayang.jdbc.test.HsqldbPlatform;
41+
import org.apache.wayang.jdbc.test.HsqldbTableSource;
42+
import org.apache.wayang.core.function.TransformationDescriptor;
43+
import org.apache.wayang.jdbc.execution.JdbcExecutor;
44+
import org.apache.wayang.core.profiling.NoInstrumentationStrategy;
45+
import org.apache.wayang.core.optimizer.DefaultOptimizationContext;
46+
47+
import java.sql.Connection;
48+
import java.sql.SQLException;
49+
import java.sql.Statement;
50+
import java.util.Arrays;
51+
import java.util.List;
52+
import java.util.stream.Collectors;
53+
import java.util.Collections;
54+
55+
import static org.mockito.Mockito.mock;
56+
import static org.mockito.Mockito.when;
57+
58+
/**
59+
* Test suite for {@link SqlToStreamOperator}.
60+
*/
61+
public class JdbcJoinOperatorTest extends OperatorTestBase {
62+
@Test
63+
public void testWithHsqldb() throws SQLException {
64+
Configuration configuration = new Configuration();
65+
66+
Job job = mock(Job.class);
67+
when(job.getConfiguration()).thenReturn(configuration);
68+
when(job.getCrossPlatformExecutor()).thenReturn(new CrossPlatformExecutor(job, new NoInstrumentationStrategy()));
69+
SqlQueryChannel.Descriptor sqlChannelDescriptor = HsqldbPlatform.getInstance().getSqlQueryChannelDescriptor();
70+
71+
HsqldbPlatform hsqldbPlatform = new HsqldbPlatform();
72+
73+
ExecutionStage sqlStage = mock(ExecutionStage.class);
74+
75+
// Create some test data.
76+
try (Connection jdbcConnection = hsqldbPlatform.createDatabaseDescriptor(configuration).createJdbcConnection()) {
77+
final Statement statement = jdbcConnection.createStatement();
78+
statement.execute("CREATE TABLE testA (a INT, b VARCHAR(6));");
79+
statement.execute("INSERT INTO testA VALUES (0, 'zero');");
80+
statement.execute("CREATE TABLE testB (a INT, b INT);");
81+
statement.execute("INSERT INTO testB VALUES (0, 100);");
82+
}
83+
84+
JdbcTableSource tableSourceA = new HsqldbTableSource("testA");
85+
JdbcTableSource tableSourceB = new HsqldbTableSource("testB");
86+
87+
ExecutionTask tableSourceATask = new ExecutionTask(tableSourceA);
88+
tableSourceATask.setOutputChannel(0, new SqlQueryChannel(sqlChannelDescriptor, tableSourceA.getOutput(0)));
89+
tableSourceATask.setStage(sqlStage);
90+
91+
ExecutionTask tableSourceBTask = new ExecutionTask(tableSourceB);
92+
tableSourceBTask.setOutputChannel(0, new SqlQueryChannel(sqlChannelDescriptor, tableSourceB.getOutput(0)));
93+
tableSourceBTask.setStage(sqlStage);
94+
95+
final ExecutionOperator joinOperator = new HsqldbJoinOperator<Integer>(
96+
new TransformationDescriptor<Record, Integer>(
97+
(record) -> (Integer) record.getField(0),
98+
Record.class,
99+
Integer.class
100+
).withSqlImplementation("testA", "a"),
101+
new TransformationDescriptor<Record, Integer>(
102+
(record) -> (Integer) record.getField(0),
103+
Record.class,
104+
Integer.class
105+
).withSqlImplementation("testB", "a")
106+
);
107+
108+
ExecutionTask joinTask = new ExecutionTask(joinOperator);
109+
tableSourceATask.getOutputChannel(0).addConsumer(joinTask, 0);
110+
tableSourceBTask.getOutputChannel(0).addConsumer(joinTask, 1);
111+
joinTask.setOutputChannel(0, new SqlQueryChannel(sqlChannelDescriptor, joinOperator.getOutput(0)));
112+
joinTask.setStage(sqlStage);
113+
114+
when(sqlStage.getStartTasks()).thenReturn(Collections.singleton(tableSourceATask));
115+
when(sqlStage.getTerminalTasks()).thenReturn(Collections.singleton(joinTask));
116+
117+
ExecutionStage nextStage = mock(ExecutionStage.class);
118+
119+
SqlToStreamOperator sqlToStreamOperator = new SqlToStreamOperator(HsqldbPlatform.getInstance());
120+
ExecutionTask sqlToStreamTask = new ExecutionTask(sqlToStreamOperator);
121+
joinTask.getOutputChannel(0).addConsumer(sqlToStreamTask, 0);
122+
sqlToStreamTask.setStage(nextStage);
123+
124+
JdbcExecutor executor = new JdbcExecutor(HsqldbPlatform.getInstance(), job);
125+
executor.execute(sqlStage, new DefaultOptimizationContext(job), job.getCrossPlatformExecutor());
126+
127+
SqlQueryChannel.Instance sqlQueryChannelInstance =
128+
(SqlQueryChannel.Instance) job.getCrossPlatformExecutor().getChannelInstance(sqlToStreamTask.getInputChannel(0));
129+
130+
System.out.println();
131+
132+
Assert.assertEquals(
133+
"SELECT * FROM testA JOIN testA ON testB.a=testA.a;",
134+
sqlQueryChannelInstance.getSqlQuery()
135+
);
136+
}
137+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.wayang.jdbc.test;
20+
21+
import org.apache.wayang.core.platform.ChannelDescriptor;
22+
import org.apache.wayang.basic.data.Record;
23+
import org.apache.wayang.jdbc.test.HsqldbPlatform;
24+
import org.apache.wayang.jdbc.operators.JdbcJoinOperator;
25+
import org.apache.wayang.core.types.DataSetType;
26+
import org.apache.wayang.core.function.TransformationDescriptor;
27+
28+
/**
29+
* Test implementation of {@link JdbcJoinOperator}.
30+
*/
31+
public class HsqldbJoinOperator<KeyType> extends JdbcJoinOperator<KeyType> {
32+
33+
public HsqldbJoinOperator(
34+
TransformationDescriptor<Record, KeyType> keyDescriptor0,
35+
TransformationDescriptor<Record, KeyType> keyDescriptor1
36+
) {
37+
super(keyDescriptor0,keyDescriptor1);
38+
}
39+
40+
@Override
41+
public HsqldbPlatform getPlatform() {
42+
return HsqldbPlatform.getInstance();
43+
}
44+
45+
}

0 commit comments

Comments
 (0)