|
25 | 25 | import org.apache.wayang.core.api.WayangContext; |
26 | 26 | import org.apache.wayang.core.api.spatial.SpatialPredicate; |
27 | 27 | import org.apache.wayang.java.Java; |
28 | | -import org.apache.wayang.java.platform.JavaPlatform; |
29 | 28 | import org.apache.wayang.spark.Spark; |
| 29 | +import org.apache.wayang.spatial.Spatial; |
30 | 30 | import org.apache.wayang.spatial.data.WayangGeometry; |
31 | 31 |
|
32 | 32 | import java.util.Arrays; |
|
35 | 35 | public class SpatialJoin { |
36 | 36 |
|
37 | 37 | public static void main(String[] args) { |
| 38 | + System.out.println("Running Spatial Join Benchmark with args " + Arrays.toString(args)); |
38 | 39 |
|
39 | | - System.out.println(Arrays.toString(args)); |
40 | | - |
41 | | - if (args.length <= 3) { |
42 | | - System.err.print("Missing Paths: <input file1 URL> <input file2 URL> <platform>"); |
43 | | - System.exit(1); |
44 | | - } |
45 | | - |
46 | | - WayangContext wayangContext = new WayangContext(new Configuration()); |
47 | | - |
48 | | - String platform = args[2]; |
49 | | - switch (platform) { |
50 | | - case "java": |
51 | | - System.out.println("Activate only Java plugin"); |
52 | | - wayangContext.withPlugin(Java.basicPlugin()); |
53 | | - break; |
54 | | - case "spark": |
55 | | - System.out.println("Activate only Spark plugin"); |
56 | | - wayangContext.withPlugin(Spark.basicPlugin()); |
57 | | - break; |
58 | | - default: |
59 | | - System.out.println("Activate both Java and Spark plugin"); |
60 | | - wayangContext.withPlugin(Java.basicPlugin()); |
61 | | - wayangContext.withPlugin(Spark.basicPlugin()); |
62 | | - |
63 | | - } |
64 | | - |
65 | | - JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext) |
66 | | - .withJobName("Filter Test") |
67 | | - .withUdfJarOf(SpatialJoin.class) |
68 | | - .withUdfJarOf(JavaPlatform.class); |
| 40 | + WayangContext wayangContext = new WayangContext(new Configuration()) |
| 41 | + .withPlugin(Java.basicPlugin()) |
| 42 | + .withPlugin(Spark.basicPlugin()) |
| 43 | + .withPlugin(Spatial.plugin()); |
69 | 44 |
|
| 45 | + JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext); |
70 | 46 |
|
71 | 47 | String file1Url = args[1]; |
72 | 48 | String file2Url = args[2]; |
| 49 | + String platform = args[3]; |
73 | 50 | DataQuantaBuilder<UnarySourceDataQuantaBuilder<?, String>, String> table1 = planBuilder.readTextFile(file1Url); |
74 | 51 | DataQuantaBuilder<UnarySourceDataQuantaBuilder<?, String>, String> table2 = planBuilder.readTextFile(file2Url); |
75 | 52 |
|
76 | | - |
77 | | - // Query Berlin |
78 | | - WayangGeometry queryGeometry = WayangGeometry.fromStringInput( |
79 | | -// "POLYGON((-84.07287597656251 37.16644514778088, -81.79870605468751 37.16644514778088, -81.79870605468751 38.15788469869244, -84.07287597656251 38.15788469869244, -84.07287597656251 37.16644514778088))" |
80 | | - "POLYGON((12.777099609375 52.219050335542484, 13.991088867187502 52.219050335542484, 13.991088867187502 52.71766191466581, 12.777099609375 52.71766191466581, 12.777099609375 52.219050335542484))" |
81 | | -// "POLYGON((13.054504394531252 52.305791671751265, 13.23577880859375 52.33433208908722, 13.342895507812502 52.359499525558654, 13.521423339843752 52.37459311076614, 13.609313964843752 52.33433208908722, 13.669738769531252 52.320903597434054, 13.746643066406252 52.371239426380214, 13.787841796875002 52.40476481199653, 13.807067871093752 52.44830975509531, 13.807067871093752 52.48679443193377, 13.675231933593752 52.503516406073174, 13.686218261718752 52.54028236828442, 13.634033203125002 52.58035560366049, 13.537902832031252 52.612054291512536, 13.54339599609375 52.66372397759699, 13.47198486328125 52.69536233532457, 13.430786132812502 52.67871342471301, 13.359375000000002 52.645396558286066, 13.31817626953125 52.67371751370322, 13.24676513671875 52.67871342471301, 13.191833496093752 52.64872938781106, 13.16986083984375 52.612054291512536, 13.114929199218752 52.612054291512536, 13.09295654296875 52.57201003157308, 13.09295654296875 52.54529352469354, 13.08197021484375 52.50853175834131, 13.136901855468752 52.50853175834131, 13.065490722656252 52.473412273757006, 13.08197021484375 52.44663574493768, 13.046264648437502 52.40308914740344, 13.065490722656252 52.362854101276355, 13.054504394531252 52.305791671751265))" |
82 | | - ); |
83 | | - |
84 | 53 | Collection<Long> outputcount = table1 |
85 | 54 | .spatialJoin( |
86 | | - (line -> WayangGeometry.fromStringInput(line.split("\",")[0].replace("\"", ""))), |
87 | | -// line -> WGeometry.fromStringInput(line), |
| 55 | + WayangGeometry::fromStringInput, |
88 | 56 | table2, |
89 | | -// line -> WGeometry.fromStringInput(line), |
90 | | - (line -> WayangGeometry.fromStringInput(line.split("\",")[0].replace("\"", ""))), |
| 57 | + WayangGeometry::fromStringInput, |
91 | 58 | SpatialPredicate.INTERSECTS |
92 | | - ) |
93 | | - .withTargetPlatform(Spark.platform()) |
| 59 | + ).withTargetPlatform( |
| 60 | + switch (platform) { |
| 61 | + case "java" -> Java.platform(); |
| 62 | + case "spark" -> Spark.platform(); |
| 63 | + default -> Java.platform(); |
| 64 | + }) |
94 | 65 | .count() |
95 | | - .withTargetPlatform(Spark.platform()) |
96 | 66 | .collect(); |
97 | | -// .collect(); |
98 | | -// .map(pair -> pair.field0 + "\n" + pair.field1)//left.getWKT() + "\n" + right.getWKT(); |
99 | | -// .collect(); |
100 | | -// .writeTextFile("/incubator-wayang/wayang-applications/src/main/java/org/apache/wayang/applications/output.txt", input -> input, "join results"); |
101 | | - System.out.println("Spatial Join (intersects): " + outputcount); |
| 67 | + System.out.println("Spatial Join Result Size: " + outputcount); |
102 | 68 | } |
103 | 69 | } |
0 commit comments