Skip to content

Commit 5cee47f

Browse files
authored
Merge branch 'apache:main' into main
2 parents 75e605d + f68bf88 commit 5cee47f

File tree

9 files changed

+337
-14
lines changed

9 files changed

+337
-14
lines changed

README.md

+3-3
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,8 @@ You first have to build the binaries as shown [here](guides/tutorial.md).
6060
Once you have the binaries built, follow these steps to install Wayang:
6161

6262
```shell
63-
tar -xvf wayang-0.6.1-snapshot.tar.gz
64-
cd wayang-0.6.1-SNAPSHOT
63+
tar -xvf wayang-0.7.1-snapshot.tar.gz
64+
cd wayang-0.7.1-SNAPSHOT
6565
```
6666

6767
In linux
@@ -100,7 +100,7 @@ Wayang is available via Maven Central. To use it with Maven, include the followi
100100
<dependency>
101101
<groupId>org.apache.wayang</groupId>
102102
<artifactId>wayang-***</artifactId>
103-
<version>0.6.0</version>
103+
<version>0.7.1</version>
104104
</dependency>
105105
```
106106
Note the `***`: Wayang ships with multiple modules that can be included in your app, depending on how you want to use it:

guides/tutorial.md

+4-4
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,14 @@ Running following commands to build Wayang and generate the tar.gz
2929
cd incubator-wayang
3030
./mvnw clean package -pl :wayang-assembly -Pdistribution
3131
```
32-
Then you can find the `wayang-assembly-0.6.1-SNAPSHOT-dist.tar.gz` under `wayang-assembly/target` directory.
32+
Then you can find the `wayang-assembly-0.7.1-SNAPSHOT-dist.tar.gz` under `wayang-assembly/target` directory.
3333

3434

3535
# Prepare the environment
3636
## Wayang
3737
```shell
38-
tar -xvf wayang-assembly-0.6.1-SNAPSHOT-dist.tar.gz
39-
cd wayang-0.6.1-SNAPSHOT
38+
tar -xvf wayang-assembly-0.7.1-SNAPSHOT-dist.tar.gz
39+
cd wayang-0.7.1-SNAPSHOT
4040
```
4141

4242
In linux
@@ -60,7 +60,7 @@ source ~/.zshrc
6060
To execute the WordCount example with Apache Wayang, you need to execute your program with the 'wayang-submit' command:
6161

6262
```shell
63-
cd wayang-0.6.1-SNAPSHOT
63+
cd wayang-0.7.1-SNAPSHOT
6464
./bin/wayang-submit org.apache.wayang.apps.wordcount.Main java file://$(pwd)/README.md
6565
```
6666
Then you should be able to see outputs like this:

wayang-platforms/wayang-jdbc-template/pom.xml

+5
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,11 @@
7878
<artifactId>hadoop-common</artifactId>
7979
<scope>test</scope>
8080
</dependency>
81+
<dependency>
82+
<groupId>org.apache.wayang</groupId>
83+
<artifactId>wayang-spark_2.12</artifactId>
84+
<version>0.7.1-SNAPSHOT</version>
85+
</dependency>
8186
</dependencies>
8287

8388

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.wayang.jdbc.operators;
20+
21+
import org.apache.spark.api.java.JavaRDD;
22+
import org.apache.wayang.basic.data.Record;
23+
import org.apache.wayang.core.optimizer.OptimizationContext;
24+
import org.apache.wayang.core.plan.wayangplan.UnaryToUnaryOperator;
25+
import org.apache.wayang.core.platform.ChannelDescriptor;
26+
import org.apache.wayang.core.platform.ChannelInstance;
27+
import org.apache.wayang.core.platform.lineage.ExecutionLineageNode;
28+
import org.apache.wayang.core.types.DataSetType;
29+
import org.apache.wayang.core.util.JsonSerializable;
30+
import org.apache.wayang.core.util.Tuple;
31+
import org.apache.wayang.core.util.json.WayangJsonObj;
32+
import org.apache.wayang.jdbc.channels.SqlQueryChannel;
33+
import org.apache.wayang.jdbc.platform.JdbcPlatformTemplate;
34+
import org.apache.wayang.spark.channels.RddChannel;
35+
import org.apache.wayang.spark.execution.SparkExecutor;
36+
import org.apache.wayang.spark.operators.SparkExecutionOperator;
37+
38+
import java.sql.Connection;
39+
import java.util.Collection;
40+
import java.util.Collections;
41+
import java.util.Iterator;
42+
import java.util.List;
43+
import java.util.stream.Collectors;
44+
import java.util.stream.StreamSupport;
45+
46+
public class SqlToRddOperator extends UnaryToUnaryOperator<Record, Record> implements SparkExecutionOperator, JsonSerializable {
47+
48+
private final JdbcPlatformTemplate jdbcPlatform;
49+
50+
public SqlToRddOperator(JdbcPlatformTemplate jdbcPlatform) {
51+
this(jdbcPlatform, DataSetType.createDefault(Record.class));
52+
}
53+
54+
public SqlToRddOperator(JdbcPlatformTemplate jdbcPlatform, DataSetType<Record> dataSetType) {
55+
super(dataSetType, dataSetType, false);
56+
this.jdbcPlatform = jdbcPlatform;
57+
}
58+
59+
protected SqlToRddOperator(SqlToRddOperator that) {
60+
super(that);
61+
this.jdbcPlatform = that.jdbcPlatform;
62+
}
63+
64+
@Override
65+
public List<ChannelDescriptor> getSupportedInputChannels(int index) {
66+
return Collections.singletonList(this.jdbcPlatform.getSqlQueryChannelDescriptor());
67+
}
68+
69+
@Override
70+
public List<ChannelDescriptor> getSupportedOutputChannels(int index) {
71+
return Collections.singletonList(RddChannel.UNCACHED_DESCRIPTOR);
72+
}
73+
74+
@Override
75+
public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> evaluate(
76+
ChannelInstance[] inputs,
77+
ChannelInstance[] outputs,
78+
SparkExecutor executor,
79+
OptimizationContext.OperatorContext operatorContext) {
80+
// Cast the inputs and outputs.
81+
final SqlQueryChannel.Instance input = (SqlQueryChannel.Instance) inputs[0];
82+
final RddChannel.Instance output = (RddChannel.Instance) outputs[0];
83+
84+
JdbcPlatformTemplate producerPlatform = (JdbcPlatformTemplate) input.getChannel().getProducer().getPlatform();
85+
final Connection connection = producerPlatform
86+
.createDatabaseDescriptor(executor.getConfiguration())
87+
.createJdbcConnection();
88+
89+
Iterator<Record> resultSetIterator = new SqlToStreamOperator.ResultSetIterator(connection, input.getSqlQuery());
90+
Iterable<Record> resultSetIterable = () -> resultSetIterator;
91+
92+
// Convert the ResultSet to a JavaRDD.
93+
JavaRDD<Record> resultSetRDD = executor.sc.parallelize(
94+
StreamSupport.stream(resultSetIterable.spliterator(), false).collect(Collectors.toList()),
95+
executor.getNumDefaultPartitions()
96+
);
97+
98+
output.accept(resultSetRDD, executor);
99+
100+
// TODO: Add load profile estimators
101+
ExecutionLineageNode queryLineageNode = new ExecutionLineageNode(operatorContext);
102+
queryLineageNode.addPredecessor(input.getLineage());
103+
ExecutionLineageNode outputLineageNode = new ExecutionLineageNode(operatorContext);
104+
output.getLineage().addPredecessor(outputLineageNode);
105+
106+
return queryLineageNode.collectAndMark();
107+
}
108+
109+
@Override
110+
public boolean containsAction() {
111+
return false;
112+
}
113+
114+
@Override
115+
public WayangJsonObj toJson() {
116+
return null;
117+
}
118+
}

wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/operators/SqlToStreamOperator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ public Collection<String> getLoadProfileEstimatorConfigurationKeys() {
144144
/**
145145
* Exposes a {@link ResultSet} as an {@link Iterator}.
146146
*/
147-
private static class ResultSetIterator implements Iterator<Record>, AutoCloseable {
147+
public static class ResultSetIterator implements Iterator<Record>, AutoCloseable {
148148

149149
/**
150150
* Keeps around the {@link ResultSet} of the SQL query.

wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/operators/OperatorTestBase.java

+15-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
package org.apache.wayang.jdbc.operators;
2020

21-
import org.junit.BeforeClass;
2221
import org.apache.wayang.core.api.Configuration;
2322
import org.apache.wayang.core.api.Job;
2423
import org.apache.wayang.core.optimizer.DefaultOptimizationContext;
@@ -31,6 +30,10 @@
3130
import org.apache.wayang.java.execution.JavaExecutor;
3231
import org.apache.wayang.java.operators.JavaExecutionOperator;
3332
import org.apache.wayang.java.platform.JavaPlatform;
33+
import org.apache.wayang.spark.execution.SparkExecutor;
34+
import org.apache.wayang.spark.operators.SparkExecutionOperator;
35+
import org.apache.wayang.spark.platform.SparkPlatform;
36+
import org.junit.BeforeClass;
3437

3538
import static org.mockito.Mockito.mock;
3639
import static org.mockito.Mockito.when;
@@ -52,6 +55,11 @@ protected static JavaExecutor createJavaExecutor() {
5255
return new JavaExecutor(JavaPlatform.getInstance(), job);
5356
}
5457

58+
protected static SparkExecutor createSparkExecutor() {
59+
final Job job = createJob();
60+
return new SparkExecutor(SparkPlatform.getInstance(), job);
61+
}
62+
5563
private static Job createJob() {
5664
final Job job = mock(Job.class);
5765
when(job.getConfiguration()).thenReturn(configuration);
@@ -70,4 +78,10 @@ protected static void evaluate(JavaExecutionOperator operator,
7078
operator.evaluate(inputs, outputs, createJavaExecutor(), createOperatorContext(operator));
7179
}
7280

81+
protected static void evaluate(SparkExecutionOperator operator,
82+
ChannelInstance[] inputs,
83+
ChannelInstance[] outputs) {
84+
operator.evaluate(inputs, outputs, createSparkExecutor(), createOperatorContext(operator));
85+
}
86+
7387
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.wayang.jdbc.operators;
20+
21+
import org.apache.wayang.basic.data.Record;
22+
import org.apache.wayang.core.api.Configuration;
23+
import org.apache.wayang.core.api.Job;
24+
import org.apache.wayang.core.function.PredicateDescriptor;
25+
import org.apache.wayang.core.optimizer.OptimizationContext;
26+
import org.apache.wayang.core.plan.executionplan.ExecutionTask;
27+
import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
28+
import org.apache.wayang.core.plan.wayangplan.OutputSlot;
29+
import org.apache.wayang.core.platform.ChannelInstance;
30+
import org.apache.wayang.core.platform.CrossPlatformExecutor;
31+
import org.apache.wayang.core.profiling.FullInstrumentationStrategy;
32+
import org.apache.wayang.jdbc.channels.SqlQueryChannel;
33+
import org.apache.wayang.jdbc.test.HsqldbFilterOperator;
34+
import org.apache.wayang.jdbc.test.HsqldbPlatform;
35+
import org.apache.wayang.spark.channels.RddChannel;
36+
import org.apache.wayang.spark.execution.SparkExecutor;
37+
import org.apache.wayang.spark.platform.SparkPlatform;
38+
import org.junit.Assert;
39+
import org.junit.Test;
40+
41+
import java.sql.Connection;
42+
import java.sql.SQLException;
43+
import java.sql.Statement;
44+
import java.util.Arrays;
45+
import java.util.List;
46+
47+
import static org.mockito.Mockito.mock;
48+
import static org.mockito.Mockito.when;
49+
50+
public class SqlToRddOperatorTest extends OperatorTestBase {
51+
52+
@Test
53+
public void testWithHsqldb() throws SQLException {
54+
Configuration configuration = new Configuration();
55+
56+
Job job = mock(Job.class);
57+
when(job.getConfiguration()).thenReturn(configuration);
58+
59+
CrossPlatformExecutor cpe = new CrossPlatformExecutor(job, new FullInstrumentationStrategy());
60+
when(job.getCrossPlatformExecutor()).thenReturn(cpe);
61+
final SparkExecutor sparkExecutor = new SparkExecutor(SparkPlatform.getInstance(), job);
62+
63+
HsqldbPlatform hsqldbPlatform = new HsqldbPlatform();
64+
65+
// Create some test data.
66+
try (Connection jdbcConnection = hsqldbPlatform.createDatabaseDescriptor(configuration).createJdbcConnection()) {
67+
final Statement statement = jdbcConnection.createStatement();
68+
statement.execute("CREATE TABLE testSqlToRddWithHsqldb (a INT, b VARCHAR(6));");
69+
statement.execute("INSERT INTO testSqlToRddWithHsqldb VALUES (0, 'zero');");
70+
statement.execute("INSERT INTO testSqlToRddWithHsqldb VALUES (1, 'one');");
71+
statement.execute("INSERT INTO testSqlToRddWithHsqldb VALUES (2, 'two');");
72+
}
73+
74+
final ExecutionOperator filterOperator = new HsqldbFilterOperator(
75+
new PredicateDescriptor<>(x -> false, Record.class)
76+
);
77+
final SqlQueryChannel sqlQueryChannel = new SqlQueryChannel(
78+
HsqldbPlatform.getInstance().getSqlQueryChannelDescriptor(),
79+
filterOperator.getOutput(0)
80+
);
81+
SqlQueryChannel.Instance sqlQueryChannelInstance = sqlQueryChannel.createInstance(
82+
hsqldbPlatform.createExecutor(job),
83+
mock(OptimizationContext.OperatorContext.class),
84+
0
85+
);
86+
sqlQueryChannelInstance.setSqlQuery("SELECT * FROM testSqlToRddWithHsqldb;");
87+
ExecutionTask producer = new ExecutionTask(filterOperator);
88+
producer.setOutputChannel(0, sqlQueryChannel);
89+
90+
RddChannel.Instance rddChannelInstance =
91+
new RddChannel(RddChannel.UNCACHED_DESCRIPTOR, mock(OutputSlot.class)).createInstance(
92+
sparkExecutor,
93+
mock(OptimizationContext.OperatorContext.class),
94+
0
95+
);
96+
97+
SqlToRddOperator sqlToRddOperator = new SqlToRddOperator(HsqldbPlatform.getInstance());
98+
evaluate(
99+
sqlToRddOperator,
100+
new ChannelInstance[]{sqlQueryChannelInstance},
101+
new ChannelInstance[]{rddChannelInstance}
102+
);
103+
104+
List<Record> output = rddChannelInstance.<Record>provideRdd().collect();
105+
List<Record> expected = Arrays.asList(
106+
new Record(0, "zero"),
107+
new Record(1, "one"),
108+
new Record(2, "two")
109+
);
110+
111+
Assert.assertEquals(expected, output);
112+
}
113+
114+
@Test
115+
public void testWithEmptyHsqldb() throws SQLException {
116+
Configuration configuration = new Configuration();
117+
118+
Job job = mock(Job.class);
119+
when(job.getConfiguration()).thenReturn(configuration);
120+
121+
CrossPlatformExecutor cpe = new CrossPlatformExecutor(job, new FullInstrumentationStrategy());
122+
when(job.getCrossPlatformExecutor()).thenReturn(cpe);
123+
final SparkExecutor sparkExecutor = new SparkExecutor(SparkPlatform.getInstance(), job);
124+
125+
HsqldbPlatform hsqldbPlatform = new HsqldbPlatform();
126+
127+
// Create some test data.
128+
try (Connection jdbcConnection = hsqldbPlatform.createDatabaseDescriptor(configuration).createJdbcConnection()) {
129+
final Statement statement = jdbcConnection.createStatement();
130+
statement.execute("CREATE TABLE testSqlToRddWithEmptyHsqldb (a INT, b VARCHAR(6));");
131+
}
132+
133+
final ExecutionOperator filterOperator = new HsqldbFilterOperator(
134+
new PredicateDescriptor<>(x -> false, Record.class)
135+
);
136+
final SqlQueryChannel sqlQueryChannel = new SqlQueryChannel(
137+
HsqldbPlatform.getInstance().getSqlQueryChannelDescriptor(),
138+
filterOperator.getOutput(0)
139+
);
140+
SqlQueryChannel.Instance sqlQueryChannelInstance = sqlQueryChannel.createInstance(
141+
hsqldbPlatform.createExecutor(job),
142+
mock(OptimizationContext.OperatorContext.class),
143+
0
144+
);
145+
sqlQueryChannelInstance.setSqlQuery("SELECT * FROM testSqlToRddWithEmptyHsqldb;");
146+
ExecutionTask producer = new ExecutionTask(filterOperator);
147+
producer.setOutputChannel(0, sqlQueryChannel);
148+
149+
RddChannel.Instance rddChannelInstance =
150+
new RddChannel(RddChannel.UNCACHED_DESCRIPTOR, mock(OutputSlot.class)).createInstance(
151+
sparkExecutor,
152+
mock(OptimizationContext.OperatorContext.class),
153+
0
154+
);
155+
156+
SqlToRddOperator sqlToRddOperator = new SqlToRddOperator(HsqldbPlatform.getInstance());
157+
evaluate(
158+
sqlToRddOperator,
159+
new ChannelInstance[]{sqlQueryChannelInstance},
160+
new ChannelInstance[]{rddChannelInstance}
161+
);
162+
163+
List<Record> output = rddChannelInstance.<Record>provideRdd().collect();
164+
Assert.assertTrue(output.isEmpty());
165+
}
166+
167+
}

0 commit comments

Comments
 (0)