Skip to content

Commit f68bf88

Browse files
authored
Merge pull request #370 from damik3/sql-to-rdd-channel
Implemented sql to rdd channel conversion.
2 parents cd57bf3 + 774a25d commit f68bf88

File tree

7 files changed

+330
-7
lines changed

7 files changed

+330
-7
lines changed

wayang-platforms/wayang-jdbc-template/pom.xml

+5
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,11 @@
7878
<artifactId>hadoop-common</artifactId>
7979
<scope>test</scope>
8080
</dependency>
81+
<dependency>
82+
<groupId>org.apache.wayang</groupId>
83+
<artifactId>wayang-spark_2.12</artifactId>
84+
<version>0.7.1-SNAPSHOT</version>
85+
</dependency>
8186
</dependencies>
8287

8388

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.wayang.jdbc.operators;
20+
21+
import org.apache.spark.api.java.JavaRDD;
22+
import org.apache.wayang.basic.data.Record;
23+
import org.apache.wayang.core.optimizer.OptimizationContext;
24+
import org.apache.wayang.core.plan.wayangplan.UnaryToUnaryOperator;
25+
import org.apache.wayang.core.platform.ChannelDescriptor;
26+
import org.apache.wayang.core.platform.ChannelInstance;
27+
import org.apache.wayang.core.platform.lineage.ExecutionLineageNode;
28+
import org.apache.wayang.core.types.DataSetType;
29+
import org.apache.wayang.core.util.JsonSerializable;
30+
import org.apache.wayang.core.util.Tuple;
31+
import org.apache.wayang.core.util.json.WayangJsonObj;
32+
import org.apache.wayang.jdbc.channels.SqlQueryChannel;
33+
import org.apache.wayang.jdbc.platform.JdbcPlatformTemplate;
34+
import org.apache.wayang.spark.channels.RddChannel;
35+
import org.apache.wayang.spark.execution.SparkExecutor;
36+
import org.apache.wayang.spark.operators.SparkExecutionOperator;
37+
38+
import java.sql.Connection;
39+
import java.util.Collection;
40+
import java.util.Collections;
41+
import java.util.Iterator;
42+
import java.util.List;
43+
import java.util.stream.Collectors;
44+
import java.util.stream.StreamSupport;
45+
46+
public class SqlToRddOperator extends UnaryToUnaryOperator<Record, Record> implements SparkExecutionOperator, JsonSerializable {
47+
48+
private final JdbcPlatformTemplate jdbcPlatform;
49+
50+
public SqlToRddOperator(JdbcPlatformTemplate jdbcPlatform) {
51+
this(jdbcPlatform, DataSetType.createDefault(Record.class));
52+
}
53+
54+
public SqlToRddOperator(JdbcPlatformTemplate jdbcPlatform, DataSetType<Record> dataSetType) {
55+
super(dataSetType, dataSetType, false);
56+
this.jdbcPlatform = jdbcPlatform;
57+
}
58+
59+
protected SqlToRddOperator(SqlToRddOperator that) {
60+
super(that);
61+
this.jdbcPlatform = that.jdbcPlatform;
62+
}
63+
64+
@Override
65+
public List<ChannelDescriptor> getSupportedInputChannels(int index) {
66+
return Collections.singletonList(this.jdbcPlatform.getSqlQueryChannelDescriptor());
67+
}
68+
69+
@Override
70+
public List<ChannelDescriptor> getSupportedOutputChannels(int index) {
71+
return Collections.singletonList(RddChannel.UNCACHED_DESCRIPTOR);
72+
}
73+
74+
@Override
75+
public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> evaluate(
76+
ChannelInstance[] inputs,
77+
ChannelInstance[] outputs,
78+
SparkExecutor executor,
79+
OptimizationContext.OperatorContext operatorContext) {
80+
// Cast the inputs and outputs.
81+
final SqlQueryChannel.Instance input = (SqlQueryChannel.Instance) inputs[0];
82+
final RddChannel.Instance output = (RddChannel.Instance) outputs[0];
83+
84+
JdbcPlatformTemplate producerPlatform = (JdbcPlatformTemplate) input.getChannel().getProducer().getPlatform();
85+
final Connection connection = producerPlatform
86+
.createDatabaseDescriptor(executor.getConfiguration())
87+
.createJdbcConnection();
88+
89+
Iterator<Record> resultSetIterator = new SqlToStreamOperator.ResultSetIterator(connection, input.getSqlQuery());
90+
Iterable<Record> resultSetIterable = () -> resultSetIterator;
91+
92+
// Convert the ResultSet to a JavaRDD.
93+
JavaRDD<Record> resultSetRDD = executor.sc.parallelize(
94+
StreamSupport.stream(resultSetIterable.spliterator(), false).collect(Collectors.toList()),
95+
executor.getNumDefaultPartitions()
96+
);
97+
98+
output.accept(resultSetRDD, executor);
99+
100+
// TODO: Add load profile estimators
101+
ExecutionLineageNode queryLineageNode = new ExecutionLineageNode(operatorContext);
102+
queryLineageNode.addPredecessor(input.getLineage());
103+
ExecutionLineageNode outputLineageNode = new ExecutionLineageNode(operatorContext);
104+
output.getLineage().addPredecessor(outputLineageNode);
105+
106+
return queryLineageNode.collectAndMark();
107+
}
108+
109+
@Override
110+
public boolean containsAction() {
111+
return false;
112+
}
113+
114+
@Override
115+
public WayangJsonObj toJson() {
116+
return null;
117+
}
118+
}

wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/operators/SqlToStreamOperator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ public Collection<String> getLoadProfileEstimatorConfigurationKeys() {
144144
/**
145145
* Exposes a {@link ResultSet} as an {@link Iterator}.
146146
*/
147-
private static class ResultSetIterator implements Iterator<Record>, AutoCloseable {
147+
public static class ResultSetIterator implements Iterator<Record>, AutoCloseable {
148148

149149
/**
150150
* Keeps around the {@link ResultSet} of the SQL query.

wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/operators/OperatorTestBase.java

+15-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
package org.apache.wayang.jdbc.operators;
2020

21-
import org.junit.BeforeClass;
2221
import org.apache.wayang.core.api.Configuration;
2322
import org.apache.wayang.core.api.Job;
2423
import org.apache.wayang.core.optimizer.DefaultOptimizationContext;
@@ -31,6 +30,10 @@
3130
import org.apache.wayang.java.execution.JavaExecutor;
3231
import org.apache.wayang.java.operators.JavaExecutionOperator;
3332
import org.apache.wayang.java.platform.JavaPlatform;
33+
import org.apache.wayang.spark.execution.SparkExecutor;
34+
import org.apache.wayang.spark.operators.SparkExecutionOperator;
35+
import org.apache.wayang.spark.platform.SparkPlatform;
36+
import org.junit.BeforeClass;
3437

3538
import static org.mockito.Mockito.mock;
3639
import static org.mockito.Mockito.when;
@@ -52,6 +55,11 @@ protected static JavaExecutor createJavaExecutor() {
5255
return new JavaExecutor(JavaPlatform.getInstance(), job);
5356
}
5457

58+
protected static SparkExecutor createSparkExecutor() {
59+
final Job job = createJob();
60+
return new SparkExecutor(SparkPlatform.getInstance(), job);
61+
}
62+
5563
private static Job createJob() {
5664
final Job job = mock(Job.class);
5765
when(job.getConfiguration()).thenReturn(configuration);
@@ -70,4 +78,10 @@ protected static void evaluate(JavaExecutionOperator operator,
7078
operator.evaluate(inputs, outputs, createJavaExecutor(), createOperatorContext(operator));
7179
}
7280

81+
protected static void evaluate(SparkExecutionOperator operator,
82+
ChannelInstance[] inputs,
83+
ChannelInstance[] outputs) {
84+
operator.evaluate(inputs, outputs, createSparkExecutor(), createOperatorContext(operator));
85+
}
86+
7387
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.wayang.jdbc.operators;
20+
21+
import org.apache.wayang.basic.data.Record;
22+
import org.apache.wayang.core.api.Configuration;
23+
import org.apache.wayang.core.api.Job;
24+
import org.apache.wayang.core.function.PredicateDescriptor;
25+
import org.apache.wayang.core.optimizer.OptimizationContext;
26+
import org.apache.wayang.core.plan.executionplan.ExecutionTask;
27+
import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
28+
import org.apache.wayang.core.plan.wayangplan.OutputSlot;
29+
import org.apache.wayang.core.platform.ChannelInstance;
30+
import org.apache.wayang.core.platform.CrossPlatformExecutor;
31+
import org.apache.wayang.core.profiling.FullInstrumentationStrategy;
32+
import org.apache.wayang.jdbc.channels.SqlQueryChannel;
33+
import org.apache.wayang.jdbc.test.HsqldbFilterOperator;
34+
import org.apache.wayang.jdbc.test.HsqldbPlatform;
35+
import org.apache.wayang.spark.channels.RddChannel;
36+
import org.apache.wayang.spark.execution.SparkExecutor;
37+
import org.apache.wayang.spark.platform.SparkPlatform;
38+
import org.junit.Assert;
39+
import org.junit.Test;
40+
41+
import java.sql.Connection;
42+
import java.sql.SQLException;
43+
import java.sql.Statement;
44+
import java.util.Arrays;
45+
import java.util.List;
46+
47+
import static org.mockito.Mockito.mock;
48+
import static org.mockito.Mockito.when;
49+
50+
public class SqlToRddOperatorTest extends OperatorTestBase {
51+
52+
@Test
53+
public void testWithHsqldb() throws SQLException {
54+
Configuration configuration = new Configuration();
55+
56+
Job job = mock(Job.class);
57+
when(job.getConfiguration()).thenReturn(configuration);
58+
59+
CrossPlatformExecutor cpe = new CrossPlatformExecutor(job, new FullInstrumentationStrategy());
60+
when(job.getCrossPlatformExecutor()).thenReturn(cpe);
61+
final SparkExecutor sparkExecutor = new SparkExecutor(SparkPlatform.getInstance(), job);
62+
63+
HsqldbPlatform hsqldbPlatform = new HsqldbPlatform();
64+
65+
// Create some test data.
66+
try (Connection jdbcConnection = hsqldbPlatform.createDatabaseDescriptor(configuration).createJdbcConnection()) {
67+
final Statement statement = jdbcConnection.createStatement();
68+
statement.execute("CREATE TABLE testSqlToRddWithHsqldb (a INT, b VARCHAR(6));");
69+
statement.execute("INSERT INTO testSqlToRddWithHsqldb VALUES (0, 'zero');");
70+
statement.execute("INSERT INTO testSqlToRddWithHsqldb VALUES (1, 'one');");
71+
statement.execute("INSERT INTO testSqlToRddWithHsqldb VALUES (2, 'two');");
72+
}
73+
74+
final ExecutionOperator filterOperator = new HsqldbFilterOperator(
75+
new PredicateDescriptor<>(x -> false, Record.class)
76+
);
77+
final SqlQueryChannel sqlQueryChannel = new SqlQueryChannel(
78+
HsqldbPlatform.getInstance().getSqlQueryChannelDescriptor(),
79+
filterOperator.getOutput(0)
80+
);
81+
SqlQueryChannel.Instance sqlQueryChannelInstance = sqlQueryChannel.createInstance(
82+
hsqldbPlatform.createExecutor(job),
83+
mock(OptimizationContext.OperatorContext.class),
84+
0
85+
);
86+
sqlQueryChannelInstance.setSqlQuery("SELECT * FROM testSqlToRddWithHsqldb;");
87+
ExecutionTask producer = new ExecutionTask(filterOperator);
88+
producer.setOutputChannel(0, sqlQueryChannel);
89+
90+
RddChannel.Instance rddChannelInstance =
91+
new RddChannel(RddChannel.UNCACHED_DESCRIPTOR, mock(OutputSlot.class)).createInstance(
92+
sparkExecutor,
93+
mock(OptimizationContext.OperatorContext.class),
94+
0
95+
);
96+
97+
SqlToRddOperator sqlToRddOperator = new SqlToRddOperator(HsqldbPlatform.getInstance());
98+
evaluate(
99+
sqlToRddOperator,
100+
new ChannelInstance[]{sqlQueryChannelInstance},
101+
new ChannelInstance[]{rddChannelInstance}
102+
);
103+
104+
List<Record> output = rddChannelInstance.<Record>provideRdd().collect();
105+
List<Record> expected = Arrays.asList(
106+
new Record(0, "zero"),
107+
new Record(1, "one"),
108+
new Record(2, "two")
109+
);
110+
111+
Assert.assertEquals(expected, output);
112+
}
113+
114+
@Test
115+
public void testWithEmptyHsqldb() throws SQLException {
116+
Configuration configuration = new Configuration();
117+
118+
Job job = mock(Job.class);
119+
when(job.getConfiguration()).thenReturn(configuration);
120+
121+
CrossPlatformExecutor cpe = new CrossPlatformExecutor(job, new FullInstrumentationStrategy());
122+
when(job.getCrossPlatformExecutor()).thenReturn(cpe);
123+
final SparkExecutor sparkExecutor = new SparkExecutor(SparkPlatform.getInstance(), job);
124+
125+
HsqldbPlatform hsqldbPlatform = new HsqldbPlatform();
126+
127+
// Create some test data.
128+
try (Connection jdbcConnection = hsqldbPlatform.createDatabaseDescriptor(configuration).createJdbcConnection()) {
129+
final Statement statement = jdbcConnection.createStatement();
130+
statement.execute("CREATE TABLE testSqlToRddWithEmptyHsqldb (a INT, b VARCHAR(6));");
131+
}
132+
133+
final ExecutionOperator filterOperator = new HsqldbFilterOperator(
134+
new PredicateDescriptor<>(x -> false, Record.class)
135+
);
136+
final SqlQueryChannel sqlQueryChannel = new SqlQueryChannel(
137+
HsqldbPlatform.getInstance().getSqlQueryChannelDescriptor(),
138+
filterOperator.getOutput(0)
139+
);
140+
SqlQueryChannel.Instance sqlQueryChannelInstance = sqlQueryChannel.createInstance(
141+
hsqldbPlatform.createExecutor(job),
142+
mock(OptimizationContext.OperatorContext.class),
143+
0
144+
);
145+
sqlQueryChannelInstance.setSqlQuery("SELECT * FROM testSqlToRddWithEmptyHsqldb;");
146+
ExecutionTask producer = new ExecutionTask(filterOperator);
147+
producer.setOutputChannel(0, sqlQueryChannel);
148+
149+
RddChannel.Instance rddChannelInstance =
150+
new RddChannel(RddChannel.UNCACHED_DESCRIPTOR, mock(OutputSlot.class)).createInstance(
151+
sparkExecutor,
152+
mock(OptimizationContext.OperatorContext.class),
153+
0
154+
);
155+
156+
SqlToRddOperator sqlToRddOperator = new SqlToRddOperator(HsqldbPlatform.getInstance());
157+
evaluate(
158+
sqlToRddOperator,
159+
new ChannelInstance[]{sqlQueryChannelInstance},
160+
new ChannelInstance[]{rddChannelInstance}
161+
);
162+
163+
List<Record> output = rddChannelInstance.<Record>provideRdd().collect();
164+
Assert.assertTrue(output.isEmpty());
165+
}
166+
167+
}

wayang-platforms/wayang-postgres/src/main/java/org/apache/wayang/postgres/channels/ChannelConversions.java

+12-3
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,13 @@
2121
import org.apache.wayang.core.optimizer.channels.ChannelConversion;
2222
import org.apache.wayang.core.optimizer.channels.DefaultChannelConversion;
2323
import org.apache.wayang.java.channels.StreamChannel;
24+
import org.apache.wayang.jdbc.operators.SqlToRddOperator;
2425
import org.apache.wayang.jdbc.operators.SqlToStreamOperator;
2526
import org.apache.wayang.postgres.platform.PostgresPlatform;
27+
import org.apache.wayang.spark.channels.RddChannel;
2628

29+
import java.util.Arrays;
2730
import java.util.Collection;
28-
import java.util.Collections;
2931

3032
/**
3133
* Register for the {@link ChannelConversion}s supported for this platform.
@@ -38,8 +40,15 @@ public class ChannelConversions {
3840
() -> new SqlToStreamOperator(PostgresPlatform.getInstance())
3941
);
4042

41-
public static final Collection<ChannelConversion> ALL = Collections.singleton(
42-
SQL_TO_STREAM_CONVERSION
43+
public static final ChannelConversion SQL_TO_UNCACHED_RDD_CONVERSION = new DefaultChannelConversion(
44+
PostgresPlatform.getInstance().getSqlQueryChannelDescriptor(),
45+
RddChannel.UNCACHED_DESCRIPTOR,
46+
() -> new SqlToRddOperator(PostgresPlatform.getInstance())
47+
);
48+
49+
public static final Collection<ChannelConversion> ALL = Arrays.asList(
50+
SQL_TO_STREAM_CONVERSION,
51+
SQL_TO_UNCACHED_RDD_CONVERSION
4352
);
4453

4554
}

0 commit comments

Comments
 (0)