Skip to content

Commit 4e78b4c

Browse files
committed
Use file based schema and named args for SqlContext main
1 parent 7312ade commit 4e78b4c

5 files changed

Lines changed: 177 additions & 28 deletions

File tree

pom.xml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@
106106
<scala.mayor.version>2.12</scala.mayor.version>
107107
<spark.version>3.4.4</spark.version>
108108
<flink.version>1.20.0</flink.version>
109-
<calcite.version>1.39.0</calcite.version>
109+
<calcite.version>1.39.0</calcite.version>
110110

111111
<java.version>17</java.version>
112112
<source.level>17</source.level>
@@ -785,6 +785,12 @@
785785
<artifactId>guava</artifactId>
786786
<version>${guava.version}</version>
787787
</dependency>
788+
<!-- https://mvnrepository.com/artifact/commons-cli/commons-cli -->
789+
<dependency>
790+
<groupId>commons-cli</groupId>
791+
<artifactId>commons-cli</artifactId>
792+
<version>1.3.1</version>
793+
</dependency>
788794
</dependencies>
789795
</dependencyManagement>
790796

wayang-api/wayang-api-scala-java/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,11 @@
105105
<artifactId>wayang-spark</artifactId>
106106
<version>1.0.1-SNAPSHOT</version>
107107
</dependency>
108+
<dependency>
109+
<groupId>org.apache.wayang</groupId>
110+
<artifactId>wayang-tensorflow</artifactId>
111+
<version>1.0.1-SNAPSHOT</version>
112+
</dependency>
108113
<dependency>
109114
<groupId>org.apache.spark</groupId>
110115
<artifactId>spark-core_2.12</artifactId>
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.wayang.api.util
20+
21+
import org.apache.wayang.commons.util.profiledb.model.Experiment
22+
import org.apache.wayang.basic.WayangBasics
23+
import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval
24+
import org.apache.wayang.core.plugin.{DynamicPlugin, Plugin}
25+
import org.apache.wayang.java.Java
26+
import org.apache.wayang.tensorflow.Tensorflow
27+
import org.apache.wayang.postgres.Postgres
28+
import org.apache.wayang.spark.Spark
29+
import org.apache.wayang.sqlite3.Sqlite3
30+
31+
/**
32+
* Utility to parse parameters of the apps.
33+
*/
34+
object Parameters {
35+
36+
private val yamlId = """yaml\((.*)\)""".r
37+
38+
val yamlPluginHel = "yaml(<YAML plugin URL>)"
39+
40+
private val intPattern = """[+-]?\d+""".r
41+
42+
private val longPattern = """[+-]?\d+L""".r
43+
44+
private val doublePattern = """[+-]?\d+\.\d*""".r
45+
46+
private val booleanPattern = """(?:true)|(?:false)""".r
47+
48+
private val probabilisticDoubleIntervalPattern = """(\d+)\.\.(\d+)(~\d+.\d+)?""".r
49+
50+
private val experiment =
51+
"""exp\(([^,;]+)(?:;tags=([^,;]+(?:,[^,;]+)*))?(?:;conf=([^,;:]+:[^,;:]+(?:,[^,;:]+:[^,;:]+)*))?\)""".r
52+
53+
val experimentHelp = "exp(<ID>[,tags=<tag>,...][,conf=<key>:<value>,...])"
54+
55+
/**
56+
* Load a plugin.
57+
*
58+
* @param id name of the plugin
59+
* @return the loaded [[Plugin]]
60+
*/
61+
def loadPlugin(id: String): Plugin = id match {
62+
case "basic-graph" => WayangBasics.graphPlugin
63+
case "java" => Java.basicPlugin
64+
case "java-graph" => Java.graphPlugin
65+
case "java-conversions" => Java.channelConversionPlugin
66+
case "spark" => Spark.basicPlugin
67+
case "spark-graph" => Spark.graphPlugin
68+
case "spark-conversions" => Spark.conversionPlugin
69+
case "postgres" => Postgres.plugin
70+
case "postgres-conversions" => Postgres.conversionPlugin
71+
case "sqlite3" => Sqlite3.plugin
72+
case "sqlite3-conversions" => Sqlite3.conversionPlugin
73+
case "tensorflow" => Tensorflow.plugin
74+
case "tensorflow-conversions" => Tensorflow.channelConversionPlugin
75+
case yamlId(url) => DynamicPlugin.loadYaml(url)
76+
case other => throw new IllegalArgumentException(s"Could not load platform '$other'.")
77+
}
78+
79+
/**
80+
* Loads the specified [[Plugin]]s..
81+
*
82+
* @param platformIds a comma-separated list of platform IDs
83+
* @return the loaded [[Plugin]]s
84+
*/
85+
def loadPlugins(platformIds: String): Seq[Plugin] = loadPlugins(platformIds.split(","))
86+
87+
/**
88+
* Loads the specified [[Plugin]]s.
89+
*
90+
* @param platformIds platform IDs
91+
* @return the loaded [[Plugin]]s
92+
*/
93+
def loadPlugins(platformIds: Seq[String]): Seq[Plugin] = platformIds.map(loadPlugin)
94+
95+
96+
/**
97+
* Parses a given [[String]] into a specific basic type.
98+
*
99+
* @param str the [[String]]
100+
* @return the parsed value
101+
*/
102+
def parseAny(str: String): AnyRef = {
103+
str match {
104+
case "null" => null
105+
case intPattern() => java.lang.Integer.valueOf(str)
106+
case longPattern() => java.lang.Long.valueOf(str.take(str.length - 1))
107+
case doublePattern() => java.lang.Double.valueOf(str)
108+
case booleanPattern() => java.lang.Boolean.valueOf(str)
109+
case probabilisticDoubleIntervalPattern(lower, upper, conf) =>
110+
new ProbabilisticDoubleInterval(lower.toDouble, upper.toDouble, if (conf == null) 1d else conf.substring(1).toDouble)
111+
case other: String => other
112+
}
113+
}
114+
115+
}

wayang-api/wayang-api-sql/pom.xml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
<version>1.0.1-SNAPSHOT</version>
2626
</parent>
2727
<modelVersion>4.0.0</modelVersion>
28-
28+
2929
<artifactId>wayang-api-sql</artifactId>
3030
<dependencies>
3131
<dependency>
@@ -49,6 +49,11 @@
4949
<version>1.0.1-SNAPSHOT</version>
5050
<scope>compile</scope>
5151
</dependency>
52+
<dependency>
53+
<groupId>org.apache.wayang</groupId>
54+
<artifactId>wayang-api-scala-java</artifactId>
55+
<version>1.0.1-SNAPSHOT</version>
56+
</dependency>
5257
<dependency>
5358
<groupId>org.apache.wayang</groupId>
5459
<artifactId>wayang-postgres</artifactId>

wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/context/SqlContext.java

Lines changed: 44 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -38,22 +38,28 @@
3838
import org.apache.wayang.basic.data.Record;
3939
import org.apache.wayang.core.api.Configuration;
4040
import org.apache.wayang.core.plugin.Plugin;
41+
import org.apache.wayang.api.util.Parameters;
4142
import org.apache.wayang.core.api.WayangContext;
4243
import org.apache.wayang.core.util.ReflectionUtils;
4344
import org.apache.wayang.core.plan.wayangplan.WayangPlan;
4445
import org.apache.wayang.java.Java;
4546
import org.apache.wayang.postgres.Postgres;
4647
import org.apache.wayang.spark.Spark;
48+
import org.apache.commons.cli.*;
49+
50+
4751

4852
import org.json.simple.JSONObject;
4953
import org.json.simple.parser.JSONParser;
5054

55+
import scala.collection.JavaConversions;
5156
import java.io.BufferedWriter;
5257
import java.io.IOException;
5358
import java.nio.file.Files;
5459
import java.nio.file.Paths;
5560
import java.sql.SQLException;
5661
import java.util.ArrayList;
62+
import java.util.Arrays;
5763
import java.util.Collection;
5864
import java.util.Properties;
5965
import java.util.concurrent.atomic.AtomicInteger;
@@ -103,19 +109,40 @@ public static void main(final String[] args) throws Exception {
103109
throw new IllegalArgumentException(
104110
"Usage: ./bin/wayang-submit org.apache.wayang.api.sql.SqlContext <SQL statement path> <JDBC driver> <JDBC URL> <JDBC user> <JDBC password> <Result output path> [platforms...]");
105111

106-
final String queryPath = args[0];
107-
final String jdbcDriver = args[1];
108-
final String jdbcUrl = args[2];
109-
final String jdbcUser = args[3];
110-
final String jdbcPassword = args[4];
111-
final String outputPath = args[5];
112+
//Specify the named arguments
113+
Options options = new Options();
114+
options.addOption("p", "platforms", true, "[platforms...]");
115+
options.addOption("q", "query", true, "SQL statement path");
116+
options.addOption("jdbcDriver", true, "JDBC driver");
117+
options.addOption("jdbcUrl", true, "JDBC URL");
118+
options.addOption("jdbcPassword", true, "JDBC URL");
119+
options.addOption("o", "outputPath", true, "Output path");
120+
options.addOption("d", "data", true, "Data path for file-based schema");
121+
options.addOption("c", "config", true, "File path for config file");
122+
123+
CommandLineParser parser = new DefaultParser();
124+
CommandLine cmd = parser.parse(options, args);
125+
126+
final String queryPath = cmd.getOptionValue("q");
127+
final String jdbcDriver = cmd.getOptionValue("jdbcDriver");
128+
final String jdbcUrl = cmd.getOptionValue("jdbcUrl");
129+
final String jdbcUser = cmd.getOptionValue("jdbcUser");
130+
final String jdbcPassword = cmd.getOptionValue("jdbcPassword");
131+
final String outputPath = cmd.getOptionValue("o");
132+
final String dataPath = cmd.getOptionValue("d");
112133

113134
final String query = StringUtils.chop(
114135
Files.readString(Paths.get(queryPath))
115136
.stripTrailing());
116137

117138
final String driverPlatform = jdbcDriver.split("\\.")[0];
118139

140+
final Configuration configuration = new Configuration();
141+
142+
if (cmd.hasOption("c")) {
143+
configuration.load(cmd.getOptionValue("c"));
144+
}
145+
119146
final String calciteModel = String.format(
120147
"{\r\n" +
121148
"\"calcite\": {\r\n" +
@@ -132,15 +159,20 @@ public static void main(final String[] args) throws Exception {
132159
" \"jdbcUser\": \"%s\",\n" +
133160
" \"jdbcPassword\": \"%s\"\n" +
134161
" }\n" +
162+
" },\n" +
163+
" {\n" +
164+
" \"name\": \"fs\",\n" +
165+
" \"type\": \"custom\", \n" +
166+
" \"factory\": \"org.apache.calcite.adapter.file.FileSchemaFactory\",\n" +
167+
" \"operand\": {\n" +
168+
" \"directory\": \"" + dataPath + "\"\n" +
169+
" }\n" +
135170
" }\n" +
136171
" ]\n" +
137172
"}\r\n" +
138173
"}",
139174
jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword);
140175

141-
final Configuration configuration = new Configuration();
142-
configuration.load(ReflectionUtils.loadResource("wayang-defaults.properties"));
143-
144176
configuration.setProperty("wayang.calcite.model", calciteModel);
145177
configuration.setProperty(String.format("wayang.%s.jdbc.url", driverPlatform), jdbcUrl);
146178
configuration.setProperty(String.format("wayang.%s.jdbc.user", driverPlatform), jdbcUser);
@@ -153,23 +185,9 @@ public static void main(final String[] args) throws Exception {
153185
final SqlContext context = new SqlContext(parseModel,
154186
List.of(Java.channelConversionPlugin(), Postgres.conversionPlugin()));
155187

156-
for (int i = 6; i < args.length; i++) {
157-
final String platform = args[i];
158-
159-
switch (platform.toLowerCase()) {
160-
case "spark":
161-
context.withPlugin(Spark.basicPlugin());
162-
break;
163-
case "java":
164-
context.withPlugin(Java.basicPlugin());
165-
break;
166-
case "postgres":
167-
context.withPlugin(Postgres.plugin());
168-
break;
169-
default:
170-
throw new IllegalArgumentException("platform not supported " + platform);
171-
}
172-
}
188+
189+
List<Plugin> plugins = JavaConversions.seqAsJavaList(Parameters.loadPlugins(cmd.getOptionValue("p")));
190+
plugins.stream().forEach(plug -> context.register(plug));
173191

174192
final Collection<Record> result = context.executeSql(query);
175193

0 commit comments

Comments
 (0)