Skip to content

Commit 42e2fa6

Browse files
authored
Merge pull request #699 from Spatial-Data-MP/main
Extend Wayang with spatial operators
2 parents e726cdf + e75b6ed commit 42e2fa6

54 files changed

Lines changed: 5520 additions & 16 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@
9696
<scala.version>2.12.17</scala.version>
9797
<scala.mayor.version>2.12</scala.mayor.version>
9898
<spark.version>3.4.4</spark.version>
99+
<sedona.version>1.6.1</sedona.version>
99100
<flink.version>1.20.0</flink.version>
100101
<calcite.version>1.39.0</calcite.version>
101102

wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala

Lines changed: 78 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,13 @@ import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval
3434
import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator
3535
import org.apache.wayang.core.optimizer.costs.LoadProfileEstimator
3636
import org.apache.wayang.core.plan.wayangplan._
37+
import org.apache.wayang.core.api.spatial.{SpatialGeometry, SpatialPredicate}
3738
import org.apache.wayang.core.platform.Platform
3839
import org.apache.wayang.core.util.{Tuple => WayangTuple}
3940
import org.apache.wayang.basic.data.{Record, Tuple2 => WayangTuple2}
40-
import org.apache.wayang.basic.model.{DLModel, LogisticRegressionModel,DecisionTreeRegressionModel};
41+
import org.apache.wayang.basic.model.{DLModel, LogisticRegressionModel,DecisionTreeRegressionModel}
4142
import org.apache.wayang.commons.util.profiledb.model.Experiment
42-
import com.google.protobuf.ByteString;
43+
import com.google.protobuf.ByteString
4344
import org.apache.wayang.api.python.function._
4445
import org.tensorflow.ndarray.NdArray
4546

@@ -632,6 +633,81 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I
632633
joinOperator
633634
}
634635

636+
/**
637+
* Applies a spatial filter to this instance.
638+
*
639+
* @param keySelector UDF to extract spatial geometry from data quanta
640+
* @param predicateType the spatial predicate type
641+
* @param filterGeometry the geometry to filter against
642+
* @param columnName optional SQL column name for database pushdown
643+
* @return a new instance representing the filtered output
644+
*/
645+
def spatialFilter(keySelector: Out => SpatialGeometry,
646+
predicateType: SpatialPredicate,
647+
filterGeometry: SpatialGeometry,
648+
columnName: String = null): DataQuanta[Out] =
649+
spatialFilterJava(toSerializableFunction(keySelector), predicateType, filterGeometry, columnName)
650+
651+
/**
652+
* Applies a spatial filter to this instance.
653+
*
654+
* @param keySelector UDF to extract spatial geometry from data quanta
655+
* @param predicateType the spatial predicate type
656+
* @param filterGeometry the geometry to filter against
657+
* @param columnName optional SQL column name for database pushdown
658+
* @return a new instance representing the filtered output
659+
*/
660+
def spatialFilterJava(keySelector: SerializableFunction[Out, _ <: SpatialGeometry],
661+
predicateType: SpatialPredicate,
662+
filterGeometry: SpatialGeometry,
663+
columnName: String = null): DataQuanta[Out] = {
664+
val op = new SpatialFilterOperator(predicateType, keySelector, dataSetType[Out], filterGeometry)
665+
if (columnName != null) op.getKeyDescriptor.withSqlImplementation(null, columnName)
666+
this.connectTo(op, 0)
667+
wrap[Out](op)
668+
}
669+
670+
/**
671+
* Feeds this and a further instance into a [[SpatialJoinOperator]].
672+
*
673+
* @param thisKeyUdf UDF to extract spatial geometry from this instance's elements
674+
* @param that the other instance
675+
* @param thatKeyUdf UDF to extract spatial geometry from `that` instance's elements
676+
* @param predicateType the spatial predicate type for the join
677+
* @return a new instance representing the SpatialJoinOperator's output
678+
*/
679+
def spatialJoin[ThatOut: ClassTag](
680+
thisKeyUdf: Out => SpatialGeometry,
681+
that: DataQuanta[ThatOut],
682+
thatKeyUdf: ThatOut => SpatialGeometry,
683+
predicateType: SpatialPredicate): DataQuanta[WayangTuple2[Out, ThatOut]] =
684+
spatialJoinJava(toSerializableFunction(thisKeyUdf), that, toSerializableFunction(thatKeyUdf), predicateType)
685+
686+
/**
687+
* Feeds this and a further instance into a [[SpatialJoinOperator]].
688+
*
689+
* @param thisKeyUdf UDF to extract spatial geometry from this instance's elements
690+
* @param that the other instance
691+
* @param thatKeyUdf UDF to extract spatial geometry from `that` instance's elements
692+
* @param predicateType the spatial predicate type for the join
693+
* @return a new instance representing the SpatialJoinOperator's output
694+
*/
695+
def spatialJoinJava[ThatOut: ClassTag](
696+
thisKeyUdf: SerializableFunction[Out, _ <: SpatialGeometry],
697+
that: DataQuanta[ThatOut],
698+
thatKeyUdf: SerializableFunction[ThatOut, _ <: SpatialGeometry],
699+
predicateType: SpatialPredicate): DataQuanta[WayangTuple2[Out, ThatOut]] = {
700+
require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.")
701+
val op = new SpatialJoinOperator(
702+
new TransformationDescriptor(thisKeyUdf.asInstanceOf[SerializableFunction[Out, SpatialGeometry]], basicDataUnitType[Out], basicDataUnitType[SpatialGeometry]),
703+
new TransformationDescriptor(thatKeyUdf.asInstanceOf[SerializableFunction[ThatOut, SpatialGeometry]], basicDataUnitType[ThatOut], basicDataUnitType[SpatialGeometry]),
704+
predicateType
705+
)
706+
this.connectTo(op, 0)
707+
that.connectTo(op, 1)
708+
wrap[WayangTuple2[Out, ThatOut]](op)
709+
}
710+
635711
def predict[ThatOut: ClassTag](
636712
that: DataQuanta[ThatOut],
637713
inputType: Class[_ <: Any],

wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala

Lines changed: 93 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.apache.wayang.basic.data.{Record, Tuple2 => RT2}
3030
import org.apache.wayang.basic.model.{DLModel, Model, LogisticRegressionModel,DecisionTreeRegressionModel}
3131
import org.apache.wayang.basic.operators.{DLTrainingOperator, GlobalReduceOperator, LocalCallbackSink, MapOperator, SampleOperator, LogisticRegressionOperator,DecisionTreeRegressionOperator, LinearSVCOperator}
3232
import org.apache.wayang.commons.util.profiledb.model.Experiment
33+
import org.apache.wayang.core.api.spatial.{SpatialGeometry, SpatialPredicate}
3334
import org.apache.wayang.core.function.FunctionDescriptor.{SerializableBiFunction, SerializableBinaryOperator, SerializableFunction, SerializableIntUnaryOperator, SerializablePredicate}
3435
import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval
3536
import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator
@@ -281,6 +282,57 @@ trait DataQuantaBuilder[+This <: DataQuantaBuilder[_, Out], Out] extends Logging
281282
thatKeyUdf: SerializableFunction[ThatOut, Key]) =
282283
new JoinDataQuantaBuilder(this, that, thisKeyUdf, thatKeyUdf)
283284

285+
/**
286+
* Feed the built [[DataQuanta]] into a spatial filter operator.
287+
* Requires the wayang-spatial plugin to be loaded.
288+
*
289+
* @param keyUdf function to extract geometry from elements
290+
* @param predicate the spatial predicate type
291+
* @param filterGeometry the geometry to filter against
292+
* @return a [[DataQuantaBuilder]] representing the filtered output
293+
*/
294+
def spatialFilter(
295+
keyUdf: SerializableFunction[Out, _ <: SpatialGeometry],
296+
predicate: SpatialPredicate,
297+
filterGeometry: SpatialGeometry
298+
): SpatialFilterDataQuantaBuilder[Out] =
299+
new SpatialFilterDataQuantaBuilder(this, keyUdf, predicate, filterGeometry)
300+
301+
/**
302+
* Feed the built [[DataQuanta]] into a spatial filter operator with SQL pushdown support.
303+
*
304+
* @param keyUdf function to extract geometry from elements
305+
* @param predicate the spatial predicate type
306+
* @param filterGeometry the geometry to filter against
307+
* @param sqlGeometryColumn the name of the geometry column in the database for SQL pushdown
308+
* @return a [[SpatialFilterDataQuantaBuilder]] representing the filtered output
309+
*/
310+
def spatialFilter(
311+
keyUdf: SerializableFunction[Out, _ <: SpatialGeometry],
312+
predicate: SpatialPredicate,
313+
filterGeometry: SpatialGeometry,
314+
sqlGeometryColumn: String
315+
): SpatialFilterDataQuantaBuilder[Out] =
316+
new SpatialFilterDataQuantaBuilder(this, keyUdf, predicate, filterGeometry)
317+
.withSqlGeometryColumnName(sqlGeometryColumn)
318+
319+
/**
320+
* Feed the built [[DataQuanta]] of this and the given instance into a spatial join operator.
321+
*
322+
* @param thisKeyUdf function to extract geometry from this instance's elements
323+
* @param that the other [[DataQuantaBuilder]] to join with
324+
* @param thatKeyUdf function to extract geometry from `that` instance's elements
325+
* @param predicate the spatial predicate type
326+
* @return a [[SpatialJoinDataQuantaBuilder]] representing the joined output as Tuple2
327+
*/
328+
def spatialJoin[ThatOut](
329+
thisKeyUdf: SerializableFunction[Out, _ <: SpatialGeometry],
330+
that: DataQuantaBuilder[_, ThatOut],
331+
thatKeyUdf: SerializableFunction[ThatOut, _ <: SpatialGeometry],
332+
predicate: SpatialPredicate
333+
): SpatialJoinDataQuantaBuilder[Out, ThatOut] =
334+
new SpatialJoinDataQuantaBuilder(this, that, thisKeyUdf, thatKeyUdf, predicate)
335+
284336
/**
285337
* Feed the built [[DataQuanta]] of this and the given instance into a
286338
* [[org.apache.wayang.basic.operators.DLTrainingOperator]].
@@ -510,12 +562,12 @@ trait DataQuantaBuilder[+This <: DataQuantaBuilder[_, Out], Out] extends Logging
510562
* @param catalog Iceberg Catalog
511563
* @param schema Iceberg Schema of the table to create
512564
* @param tableIdentifier Iceberg Table Identifier of the table to create
513-
* @param outputFileFormat File format of the output data files
565+
* @param outputFileFormat File format of the output data files
514566
* @return the collected data quanta
515567
*/
516568

517-
def writeIcebergTable(catalog: Catalog,
518-
schema: Schema,
569+
def writeIcebergTable(catalog: Catalog,
570+
schema: Schema,
519571
tableIdentifier: TableIdentifier,
520572
outputFileFormat: FileFormat,
521573
jobName: String): Unit = {
@@ -1959,3 +2011,41 @@ class KeyedDataQuantaBuilder[Out, Key](private val dataQuantaBuilder: DataQuanta
19592011
dataQuantaBuilder.coGroup(this.keyExtractor, that.dataQuantaBuilder, that.keyExtractor)
19602012

19612013
}
2014+
2015+
class SpatialFilterDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T],
2016+
keySelector: SerializableFunction[T, _ <: SpatialGeometry],
2017+
predicateType: SpatialPredicate,
2018+
filterGeometry: SpatialGeometry)
2019+
(implicit javaPlanBuilder: JavaPlanBuilder)
2020+
extends BasicDataQuantaBuilder[SpatialFilterDataQuantaBuilder[T], T] {
2021+
2022+
private var columnName: String = _
2023+
2024+
def withSqlGeometryColumnName(columnName: String): SpatialFilterDataQuantaBuilder[T] = {
2025+
this.columnName = columnName
2026+
this
2027+
}
2028+
2029+
override protected def build: DataQuanta[T] = {
2030+
val dq = inputDataQuanta.dataQuanta()
2031+
dq.spatialFilterJava(keySelector, predicateType, filterGeometry, this.columnName)
2032+
}
2033+
}
2034+
2035+
class SpatialJoinDataQuantaBuilder[In0, In1](inputDataQuanta0: DataQuantaBuilder[_, In0],
2036+
inputDataQuanta1: DataQuantaBuilder[_, In1],
2037+
keyUdf0: SerializableFunction[In0, _ <: SpatialGeometry],
2038+
keyUdf1: SerializableFunction[In1, _ <: SpatialGeometry],
2039+
predicateType: SpatialPredicate)
2040+
(implicit javaPlanBuilder: JavaPlanBuilder)
2041+
extends BasicDataQuantaBuilder[SpatialJoinDataQuantaBuilder[In0, In1], RT2[In0, In1]] {
2042+
2043+
override protected def build: DataQuanta[RT2[In0, In1]] = {
2044+
val dq0 = inputDataQuanta0.dataQuanta()
2045+
val dq1 = inputDataQuanta1.dataQuanta()
2046+
applyTargetPlatforms(
2047+
dq0.spatialJoinJava(keyUdf0, dq1, keyUdf1, predicateType)(inputDataQuanta1.classTag),
2048+
this.getTargetPlatforms()
2049+
)
2050+
}
2051+
}

wayang-api/wayang-api-scala-java/src/test/java/org/apache/wayang/api/JavaApiTest.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,22 @@ void testMapReduceBy() {
113113
assertEquals(WayangCollections.asSet(4 + 16, 1 + 9), WayangCollections.asSet(outputCollection));
114114
}
115115

116+
@Test
117+
void testFilter() {
118+
WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
119+
JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
120+
121+
final List<Integer> inputValues = Arrays.asList(1, 2, 3, 4, 5, 6);
122+
123+
final Collection<Integer> outputValues = builder
124+
.loadCollection(inputValues).withName("Load input values")
125+
.filter(i -> (i & 1) == 0).withName("Filter even numbers")
126+
.collect();
127+
128+
Set<Integer> expectedValues = WayangCollections.asSet(2, 4, 6);
129+
assertEquals(expectedValues, WayangCollections.asSet(outputValues));
130+
}
131+
116132
@Test
117133
void testBroadcast2() {
118134
WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());

wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/sources/fs/CsvRowConverter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ public static Object convert(RelDataType fieldType, String string) {
132132
} catch (ParseException e) {
133133
return null;
134134
}
135+
case GEOMETRY:
135136
case VARCHAR:
136137
default:
137138
return string;

wayang-benchmark/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,11 @@
5454
<artifactId>wayang-postgres</artifactId>
5555
<version>1.1.2-SNAPSHOT</version>
5656
</dependency>
57+
<dependency>
58+
<groupId>org.apache.wayang</groupId>
59+
<artifactId>wayang-spatial</artifactId>
60+
<version>1.1.2-SNAPSHOT</version>
61+
</dependency>
5762
<dependency>
5863
<groupId>org.apache.wayang</groupId>
5964
<artifactId>wayang-sqlite3</artifactId>
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.wayang.apps.spatial;
20+
21+
import org.apache.wayang.api.JavaPlanBuilder;
22+
import org.apache.wayang.core.api.spatial.SpatialGeometry;
23+
import org.apache.wayang.spark.Spark;
24+
import org.apache.wayang.spatial.data.WayangGeometry;
25+
import org.apache.wayang.core.api.Configuration;
26+
import org.apache.wayang.core.api.WayangContext;
27+
import org.apache.wayang.core.api.spatial.SpatialPredicate;
28+
import org.apache.wayang.java.Java;
29+
import org.apache.wayang.spatial.Spatial;
30+
31+
import java.util.Arrays;
32+
import java.util.Collection;
33+
34+
public class SpatialFilter {
35+
public static void main(String[] args) {
36+
System.out.println("Running Spatial Filter Benchmark with args " + Arrays.toString(args));
37+
38+
String fileUrl = args[1];
39+
String platform = args[2];
40+
String selectivity = args[3];
41+
42+
WayangContext wayangContext = new WayangContext(new Configuration())
43+
.withPlugin(Java.basicPlugin())
44+
.withPlugin(Spark.basicPlugin())
45+
.withPlugin(Spatial.plugin());
46+
47+
JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext)
48+
.withJobName("filter test")
49+
.withUdfJarOf(SpatialFilter.class);
50+
51+
SpatialGeometry queryGeometry = WayangGeometry.fromStringInput(
52+
"POLYGON((0.0 0.0, " + selectivity + " 0.0, " + selectivity + " " + selectivity + ", 0.0 " + selectivity + ", 0.0 0.0))"
53+
);
54+
55+
Collection<Long> outputcount =
56+
planBuilder.readTextFile(fileUrl)
57+
.spatialFilter(
58+
(input -> WayangGeometry.fromStringInput((input.split("\",")[0]).replace("\"", ""))),
59+
SpatialPredicate.INTERSECTS,
60+
queryGeometry
61+
).withTargetPlatform(
62+
switch (platform) {
63+
case "java" -> Java.platform();
64+
case "spark" -> Spark.platform();
65+
default -> Java.platform();
66+
}
67+
)
68+
.count()
69+
.collect();
70+
71+
System.out.println("Spatial Filter Result Size: " + outputcount);
72+
}
73+
}

0 commit comments

Comments
 (0)